1:- module(sparkle,[
    2      sparql_endpoint/2
    3   ,  sparql_endpoint/3
    4   ,  current_sparql_endpoint/5
    5   ,  query_goal/3     % Endpoint, Context, Opts
    6   ,  query_phrase/3   % Endpoint, QueryPhrase, Result
    7   ,  query_sparql/3 % Endpoint,QueryText,Result
    8   ,  (??)/1
    9   ,  (??)/2
   10   ,  op(1150,fx,??)
   11   ,  op(1150,xfy,??)
   12	]).

Query to SPARQL endpoints with a more Prolog-like syntax

Samer Abdallah, Dept. of Computer Science, UCL (2014) Based on Yves Raimond's swic package, but completely re-written.

This module provides a little language for expressing SPARQL queries and a database of known SPARQL endpoints. Queries can be executed across multiple endpoints in parallel. When using auto-paging, multiple queries are made automatically to fetch new bindings as they are needed. For example,

EP ?? rdf(A,B,C).

will retrieve all triples from all endpoints in parallel, fetching 100 bindings at a time from each endpoint (assuming the setting sparkle:limit takes it's default value of 100). */

   32:- use_module(library(sandbox)).   33:- use_module(library(settings)).   34:- use_module(library(semweb/sparql_client)).   35:- use_module(library(dcg_core)).   36:- use_module(library(dcg_codes)).   37:- use_module(sparql_dcg).   38:- use_module(concurrency).   39
   40
   41:- dynamic sparql_endpoint/5.   42:- multifile sparql_endpoint/5.   43:- set_prolog_flag(double_quotes, codes).   44
   45:- setting(limit,integer,100,'Default SPARQL SELECT limit').   46:- setting(select_options,list,[distinct(true)],'Default select options').   47
   48:- meta_predicate query_phrase(+,//,-).   49
   50sandbox:safe_meta(sparql_dcg:phrase_to_sparql(Phr,_),[Phr]).
   51sandbox:safe_primitive(sparql_dcg:select(_,_,_,_,_)).
   52sandbox:safe_primitive(sparql_dcg:describe(_,_,_,_)).
   53sandbox:safe_primitive(sparql_dcg:describe(_,_,_)).
   54sandbox:safe_primitive(sparql_dcg:ask(_,_,_)).
 ??(+Goal:sparql_goal) is nondet
Equivalent to _ ?? Goal. Will query all endpoints in parallel. Identical bindings may be returned multiple times. See query_goal/3 for details.
   60??(Spec) :- ??(_,Spec).
 ??(EP, +Goal:sparql_goal) is nondet
Equivalent to query_goal(EP,Goal,Opts) where Opts is the value of the setting sparkle:select_options. See query_goal/3 for details. IF EP is unbound on entry, it is bound to the endpoint from which the current bindings were obtained.
   67??(EP,Spec) :- 
   68   spec_goal_opts(Spec,Goal,Opts),
   69   setting(select_options,Opts0),
   70   merge_options(Opts,Opts0,Opts1),
   71   query_goal(EP,Goal,Opts1).
   72
   73spec_goal_opts(Opts ?? Goal, Goal, Opts) :- !.
   74spec_goal_opts(Goal,Goal,[]).
   75
   76/*
   77 * Assert/declare a new sparql end point
   78 */
 sparql_endpoint(+EP:ground, +URL:atom, +Options) is det
 sparql_endpoint(+EP:ground, +URL:atom) is det
Declares EP as a short name for a SPARQL endpoint with the given URL. No options are defined at the moment.
   85sparql_endpoint(EP,Url) :- sparql_endpoint(EP,Url,[]).
   86sparql_endpoint(EP,Url,Options) :-
   87   url_endpoint(Url,Host,Port,Path), 
   88	(	sparql_endpoint(EP,Host,Port,Path,_)
   89   -> format('% WARNING: Updating already registered SPARQL end point ~w.\n',[Url]),
   90      retractall(sparql_endpoint(EP,Host,Port,Path,_))
   91   ),
   92	debug(sparkle,'Asserting SPARQL end point ~w: ~w ~w ~w ~w.',[EP,Host,Port,Path,Options]),
   93   assert(sparql_endpoint(EP,Host,Port,Path,Options)).
   94
   95user:term_expansion(:-(sparql_endpoint(EP,Url)), Expanded) :- 
   96   endpoint_declaration(EP,Url,[],Expanded).
   97user:term_expansion(:-(sparql_endpoint(EP,Url,Options)), Expanded) :- 
   98   endpoint_declaration(EP,Url,Options,Expanded).
   99
  100endpoint_declaration(EP,Url,Options, sparkle:sparql_endpoint(EP,Host,Port,Path,Options)) :-
  101	debug(sparkle,'Declaring SPARQL end point ~w: ~w ~w ~w ~w.',[EP,Host,Port,Path,Options]),
  102   url_endpoint(Url,Host,Port,Path).
  103
  104url_endpoint(Url,Host,Port,Path) :-
  105	parse_url(Url,Parsed),
  106	member(host(Host),Parsed),
  107	member(path(Path),Parsed),
  108	(member(port(Port),Parsed);Port=80).
 current_sparql_endpoint(-EP:ground, -Host:atom, -Port:natural, -Path:atom, -Options:list) is nondet
Succeeds once for each known endpoint.
  114current_sparql_endpoint(EP,Host,Port,Path,Options) :-
  115   sparql_endpoint(EP,Host,Port,Path,Options).
  116
  117
  118% ----------------------------------------------------
  119% Goal-based queries 
  120% These get translated into phrase-based queries.
 query_goal(+EP, +Goal:sparql_goal, +Opts) is nondet
query_goal(-EP, +Goal:sparql_goal, +Opts) is nondet
Runs a SPARQL query against one or more SPARLQ endpoints. Goal is converted into a textual SPARQL query using the DCG defined in sparql_dcg.pl.

If EP is ground on entry, the query is run against the specified endpoint. If EP is unbound on entry, the query is run agains all endpoints in parallel, possibly returning multiple results from each.

(The following applies only to queries that return bindings, not to simple boolean questions, which return only true or false.) Options are as follows:

limit(L:natural)
At-most this many bindings will be returned per SPARQL call.
offset(O:natural)
Begin returning bindings from the Oth result on.
autopage(Auto:bool)
If false, a single SPARQL call is made using any limit and offset options if supplied. If true, the the offset option is ignored and multiple SPARQL queries are made as necessary to supply results, using the limit option to determine the number of results retrieved from the endpoint at a time. Other options are passed to phrase_to_sparql/2.
  148query_goal(EP,Goal,Opts) :- 
  149   findall(EP,sparql_endpoint(EP,_,_,_,_),EPs),
  150   term_variables(Goal,Vars),
  151   (  Vars = [] % if no variables, do an ASK query, otherwise, SELECT
  152   -> phrase_to_sparql(ask(Goal),SPARQL),
  153      parallel_query(simple_query(SPARQL),EPs,EP-true)
  154   ;  Result =.. [row|Vars],
  155      setting(limit,DefaultLimit),
  156      call_dcg((  option_default_select(limit(Limit),DefaultLimit),
  157                  option_default_select(autopage(Auto),true),
  158                  (  {Auto=true}
  159                  -> {Query = autopage_query(Limit,SPARQL)},
  160                     option_default_select(offset(_),_)
  161                  ;  {Query = simple_query(SPARQL)},
  162                     cons(limit(Limit))
  163                  ) 
  164               ), Opts, Opts1),
  165      phrase_to_sparql(select(Vars,Goal,Opts1),SPARQL),
  166      parallel_query(Query,EPs,EP-Result)
  167   ).
  168
  169cons(X,T,[X|T]).
  170option_default_select(Opt,Def,O1,O2) :- select_option(Opt,O1,O2,Def).
  171simple_query(SPARQL,EP,EP-Result) :- query_sparql(EP,SPARQL,Result).
  172autopage_query(Limit,SPARQL,EP,EP-Result) :- autopage(EP,SPARQL,Limit,0,Result).
  173
  174autopage(EP,SPARQL,Limit,Offset,Result) :-
  175   format(string(Q),'~s LIMIT ~d OFFSET ~d',[SPARQL,Limit,Offset]),
  176   findall(R,query_sparql(EP,Q,R),Results),
  177   (  member(Result,Results)
  178   ;  length(Results,Limit),     % no next page if length(Results) < Limit
  179      Offset1 is Offset + Limit, % next batch of results
  180      autopage(EP,SPARQL,Limit,Offset1,Result)
  181   ).
  182
  183parallel_query(_,[],_) :- !, fail.
  184parallel_query(P,[X],Y) :- !, call(P,X,Y).
  185parallel_query(P,Xs,Y) :-
  186   maplist(par_goal(P,Y),Xs,Goals),
  187   concurrent_or(Y,Goals,[on_error(continue)]).
  188
  189par_goal(P,Y,X,call(P,X,Y)).
 query_phrase(+EP, +Q:sparqle_phrase(R), R) is nondet
query_phrase(-EP, +Q:sparqle_phrase(R), R) is nondet
Phrase-based queries using the DCG defined in sparql_dcg.pl. The return type depends on the query:
select(V:list(var), sparql_goal, options) :: sparql_phrase(row(N)) :- length(V,N).
describe(resource,sparql_goal)            :: sparql_phrase(rdf).
describe(resource)                        :: sparql_phrase(rdf).
ask(sparql_goal)                          :: sparql_phrase(bool).

rdf  ---> rdf(resource,resource,object).
bool ---> true; false.

row(N) is the type of terms of functor row/N.

  209query_phrase(EP,Phrase,Result) :- 
  210   phrase_to_sparql(Phrase,SPARQL),
  211   query_sparql(EP,SPARQL,Result).
  212
  213
  214phrase_to_sparql(Phrase,SPARQL) :-
  215   term_variables(Phrase,Vars),
  216   copy_term(t(Vars,Phrase),t(Vars1,Phrase1)),
  217   numbervars(Vars1,0,_),
  218   (  phrase(Phrase1,Codes) -> true
  219   ;  throw(unrecognised_query(Phrase))
  220   ),
  221   string_codes(SPARQL,Codes),
  222   debug(sparkle,'SPARQL query: ~s',[SPARQL]).
  223
  224% ----------------------------------------------------
  225% In the end, everything comes through this.
 query_sparql(?EP, SPARQL, -Result) is nondet
Runs textual SPARQL query against an endpoint, exactly as with sparql_query/3. If EP is unbound on entry, all known endpoints will be tried sequentially.
  232query_sparql(EP,SPARQL,Result) :-
  233   sparql_endpoint(EP,Host,Port,Path,EPOpts),
  234   debug(sparkle,'Querying endpoint http://~w:~w~w',[Host,Port,Path]),
  235   sparql_query(SPARQL,Result,[host(Host),port(Port),path(Path)|EPOpts])