View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2024, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).   61
   62/** <module> Pengines: Web Logic Programming Made Easy
   63
   64The library(pengines) provides an  infrastructure   for  creating Prolog
   65engines in a (remote) pengine server  and accessing these engines either
   66from Prolog or JavaScript.
   67
   68@author Torbjörn Lager and Jan Wielemaker
   69*/
   70
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(sandbox),[safe_goal/1]).   91:- autoload(library(statistics),[thread_statistics/2]).   92:- autoload(library(term_to_json),[term_to_json/2]).   93:- autoload(library(thread_pool),
   94	    [thread_pool_create/3,thread_create_in_pool/4]).   95:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   96:- autoload(library(uri),
   97	    [ uri_components/2,
   98	      uri_query_components/2,
   99	      uri_data/3,
  100	      uri_data/4,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- autoload(library(http/http_dispatch),
  106	    [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json_dict/1]).  115
  116:- if(exists_source(library(uuid))).  117:- autoload(library(uuid), [uuid/2]).  118:- endif.  119
  120
  121:- meta_predicate
  122    pengine_create(:),
  123    pengine_rpc(+, +, :),
  124    pengine_event_loop(1, +).  125
  126:- multifile
  127    write_result/3,                 % +Format, +Event, +Dict
  128    event_to_json/3,                % +Event, -JSON, +Format
  129    prepare_module/3,               % +Module, +Application, +Options
  130    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  131    authentication_hook/3,          % +Request, +Application, -User
  132    not_sandboxed/2,                % +User, +App
  133    pengine_flush_output_hook/0.  134
  135:- predicate_options(pengine_create/1, 1,
  136                     [ id(-atom),
  137                       alias(atom),
  138                       application(atom),
  139                       destroy(boolean),
  140                       server(atom),
  141                       ask(compound),
  142                       template(compound),
  143                       chunk(integer;oneof([false])),
  144                       bindings(list),
  145                       src_list(list),
  146                       src_text(any),           % text
  147                       src_url(atom),
  148                       src_predicates(list)
  149                     ]).  150:- predicate_options(pengine_ask/3, 3,
  151                     [ template(any),
  152                       chunk(integer;oneof([false])),
  153                       bindings(list)
  154                     ]).  155:- predicate_options(pengine_next/2, 2,
  156                     [ chunk(integer),
  157                       pass_to(pengine_send/3, 3)
  158                     ]).  159:- predicate_options(pengine_stop/2, 2,
  160                     [ pass_to(pengine_send/3, 3)
  161                     ]).  162:- predicate_options(pengine_respond/3, 2,
  163                     [ pass_to(pengine_send/3, 3)
  164                     ]).  165:- predicate_options(pengine_rpc/3, 3,
  166                     [ chunk(integer;oneof([false])),
  167                       pass_to(pengine_create/1, 1)
  168                     ]).  169:- predicate_options(pengine_send/3, 3,
  170                     [ delay(number)
  171                     ]).  172:- predicate_options(pengine_event/2, 2,
  173                     [ listen(atom),
  174                       pass_to(system:thread_get_message/3, 3)
  175                     ]).  176:- predicate_options(pengine_pull_response/2, 2,
  177                     [ pass_to(http_open/3, 3)
  178                     ]).  179:- predicate_options(pengine_event_loop/2, 2,
  180                     []).                       % not yet implemented
  181
  182% :- debug(pengine(transition)).
  183:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  184
  185goal_expansion(random_delay, Expanded) :-
  186    (   debugging(pengine(delay))
  187    ->  Expanded = do_random_delay
  188    ;   Expanded = true
  189    ).
  190
  191do_random_delay :-
  192    Delay is random(20)/1000,
  193    sleep(Delay).
  194
  195:- meta_predicate                       % internal meta predicates
  196    solve(+, ?, 0, +),
  197    findnsols_no_empty(+, ?, 0, -),
  198    pengine_event_loop(+, 1, +).  199
  200/**  pengine_create(:Options) is det.
  201
  202    Creates a new pengine. Valid options are:
  203
  204    * id(-ID)
  205      ID gets instantiated to the id of the created pengine.  ID is
  206      atomic.
  207
  208    * alias(+Name)
  209      The pengine is named Name (an atom). A slave pengine (child) can
  210      subsequently be referred to by this name.
  211
  212    * application(+Application)
  213      Application in which the pengine runs.  See pengine_application/1.
  214
  215    * server(+URL)
  216      The pengine will run in (and in the Prolog context of) the pengine
  217      server located at URL.
  218
  219    * src_list(+List_of_clauses)
  220      Inject a list of Prolog clauses into the pengine.
  221
  222    * src_text(+Atom_or_string)
  223      Inject the clauses specified by a source text into the pengine.
  224
  225    * src_url(+URL)
  226      Inject the clauses specified in the file located at URL into the
  227      pengine.
  228
  229    * src_predicates(+List)
  230      Send the local predicates denoted by List to the remote pengine.
  231      List is a list of predicate indicators.
  232
  233Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  234non-local pengines) and thread_create/3. Note   that for thread_create/3
  235only options changing the stack-sizes can be used. In particular, do not
  236pass the detached or alias options..
  237
  238Successful creation of a pengine will return an _event term_ of the
  239following form:
  240
  241    * create(ID, Term)
  242      ID is the id of the pengine that was created.
  243      Term is not used at the moment.
  244
  245An error will be returned if the pengine could not be created:
  246
  247    * error(ID, Term)
  248      ID is invalid, since no pengine was created.
  249      Term is the exception's error term.
  250*/
  251
  252
  253pengine_create(M:Options0) :-
  254    translate_local_sources(Options0, Options, M),
  255    (   select_option(server(BaseURL), Options, RestOptions)
  256    ->  remote_pengine_create(BaseURL, RestOptions)
  257    ;   local_pengine_create(Options)
  258    ).
  259
  260%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  261%
  262%   Translate  the  `src_predicates`  and  `src_list`  options  into
  263%   `src_text`. We need to do that   anyway for remote pengines. For
  264%   local pengines, we could avoid  this   step,  but  there is very
  265%   little point in transferring source to a local pengine anyway as
  266%   local pengines can access any  Prolog   predicate  that you make
  267%   visible to the application.
  268%
  269%   Multiple sources are concatenated  to  end   up  with  a  single
  270%   src_text option.
  271
  272translate_local_sources(OptionsIn, Options, Module) :-
  273    translate_local_sources(OptionsIn, Sources, Options2, Module),
  274    (   Sources == []
  275    ->  Options = Options2
  276    ;   Sources = [Source]
  277    ->  Options = [src_text(Source)|Options2]
  278    ;   atomics_to_string(Sources, Source)
  279    ->  Options = [src_text(Source)|Options2]
  280    ).
  281
  282translate_local_sources([], [], [], _).
  283translate_local_sources([H0|T], [S0|S], Options, M) :-
  284    nonvar(H0),
  285    translate_local_source(H0, S0, M),
  286    !,
  287    translate_local_sources(T, S, Options, M).
  288translate_local_sources([H|T0], S, [H|T], M) :-
  289    translate_local_sources(T0, S, T, M).
  290
  291translate_local_source(src_predicates(PIs), Source, M) :-
  292    must_be(list, PIs),
  293    with_output_to(string(Source),
  294                   maplist(list_in_module(M), PIs)).
  295translate_local_source(src_list(Terms), Source, _) :-
  296    must_be(list, Terms),
  297    with_output_to(string(Source),
  298                   forall(member(Term, Terms),
  299                          format('~k .~n', [Term]))).
  300translate_local_source(src_text(Source), Source, _).
  301
  302list_in_module(M, PI) :-
  303    listing(M:PI).
  304
  305/**  pengine_send(+NameOrID, +Term) is det
  306
  307Same as pengine_send(NameOrID, Term, []).
  308*/
  309
  310pengine_send(Target, Event) :-
  311    pengine_send(Target, Event, []).
  312
  313
  314/**  pengine_send(+NameOrID, +Term, +Options) is det
  315
  316Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  317NameOrID. Options is a list of options:
  318
  319   * delay(+Time)
  320     The actual sending is delayed by Time seconds. Time is an integer
  321     or a float.
  322
  323Any remaining options are passed to http_open/3.
  324*/
  325
  326pengine_send(Target, Event, Options) :-
  327    must_be(atom, Target),
  328    pengine_send2(Target, Event, Options).
  329
  330pengine_send2(self, Event, Options) :-
  331    !,
  332    thread_self(Queue),
  333    delay_message(queue(Queue), Event, Options).
  334pengine_send2(Name, Event, Options) :-
  335    child(Name, Target),
  336    !,
  337    delay_message(pengine(Target), Event, Options).
  338pengine_send2(Target, Event, Options) :-
  339    delay_message(pengine(Target), Event, Options).
  340
  341delay_message(Target, Event, Options) :-
  342    option(delay(Delay), Options),
  343    !,
  344    alarm(Delay,
  345          send_message(Target, Event, Options),
  346          _AlarmID,
  347          [remove(true)]).
  348delay_message(Target, Event, Options) :-
  349    random_delay,
  350    send_message(Target, Event, Options).
  351
  352send_message(queue(Queue), Event, _) :-
  353    thread_send_message(Queue, pengine_request(Event)).
  354send_message(pengine(Pengine), Event, Options) :-
  355    (   pengine_remote(Pengine, Server)
  356    ->  remote_pengine_send(Server, Pengine, Event, Options)
  357    ;   pengine_thread(Pengine, Thread)
  358    ->  thread_send_message(Thread, pengine_request(Event))
  359    ;   existence_error(pengine, Pengine)
  360    ).
  361
  362%!  pengine_request(-Request) is det.
  363%
  364%   To be used by a pengine to wait  for the next request. Such messages
  365%   are placed in the  queue  by   pengine_send/2.  Keeps  the thread in
  366%   normal state if an event arrives within a second. Otherwise it waits
  367%   for the `idle_limit` setting while   using  thread_idle/2 to minimis
  368%   resources.
  369
  370pengine_request(Request) :-
  371    thread_self(Me),
  372    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  373    !.
  374pengine_request(Request) :-
  375    pengine_self(Self),
  376    get_pengine_application(Self, Application),
  377    setting(Application:idle_limit, IdleLimit0),
  378    IdleLimit is IdleLimit0-1,
  379    thread_self(Me),
  380    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  381                                       [timeout(IdleLimit)]),
  382                    long)
  383    ->  true
  384    ;   Request = destroy
  385    ).
  386
  387
  388%!  pengine_reply(+Event) is det.
  389%!  pengine_reply(+Queue, +Event) is det.
  390%
  391%   Reply Event to the parent of the   current  Pengine or the given
  392%   Queue.  Such  events  are  read   by    the   other   side  with
  393%   pengine_event/1.
  394%
  395%   If the message cannot be sent within the `idle_limit` setting of
  396%   the pengine, abort the pengine.
  397
  398pengine_reply(Event) :-
  399    pengine_parent(Queue),
  400    pengine_reply(Queue, Event).
  401
  402pengine_reply(_Queue, _Event0) :-
  403    nb_current(pengine_idle_limit_exceeded, true),
  404    !.
  405pengine_reply(Queue, Event0) :-
  406    arg(1, Event0, ID),
  407    wrap_first_answer(ID, Event0, Event),
  408    random_delay,
  409    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  410    (   pengine_self(ID),
  411        \+ pengine_detached(ID, _)
  412    ->  get_pengine_application(ID, Application),
  413        setting(Application:idle_limit, IdleLimit),
  414        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  415        (   thread_send_message(Queue, pengine_event(ID, Event),
  416                                [ timeout(IdleLimit)
  417                                ])
  418        ->  true
  419        ;   thread_self(Me),
  420            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  421                  [ID, Me]),
  422            nb_setval(pengine_idle_limit_exceeded, true),
  423            thread_detach(Me),
  424            abort
  425        )
  426    ;   thread_send_message(Queue, pengine_event(ID, Event))
  427    ).
  428
  429wrap_first_answer(ID, Event0, CreateEvent) :-
  430    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  431    arg(1, CreateEvent, ID),
  432    !,
  433    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  434wrap_first_answer(_ID, Event, Event).
  435
  436
  437empty_queue :-
  438    pengine_parent(Queue),
  439    empty_queue(Queue, 0, Discarded),
  440    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  441
  442empty_queue(Queue, C0, C) :-
  443    thread_get_message(Queue, _Term, [timeout(0)]),
  444    !,
  445    C1 is C0+1,
  446    empty_queue(Queue, C1, C).
  447empty_queue(_, C, C).
  448
  449
  450/** pengine_ask(+NameOrID, @Query, +Options) is det
  451
  452Asks pengine NameOrID a query Query.
  453
  454Options is a list of options:
  455
  456    * template(+Template)
  457      Template is a variable (or a term containing variables) shared
  458      with the query. By default, the template is identical to the
  459      query.
  460
  461    * chunk(+IntegerOrFalse)
  462      Retrieve solutions in chunks of Integer rather than one by one. 1
  463      means no chunking (default). Other integers indicate the maximum
  464      number of solutions to retrieve in one chunk.  If `false`, the
  465      Pengine goal is not executed using findall/3 and friends and
  466      we do not backtrack immediately over the goal.  As a result,
  467      changes to backtrackable global state are retained.  This is
  468      similar that using set_prolog_flag(toplevel_mode, recursive).
  469
  470    * bindings(+Bindings)
  471      Sets the global variable '$variable_names' to a list of
  472      `Name = Var` terms, providing access to the actual variable
  473      names.
  474
  475Any remaining options are passed to pengine_send/3.
  476
  477Note that the predicate pengine_ask/3 is deterministic, even for queries
  478that have more than one solution. Also,  the variables in Query will not
  479be bound. Instead, results will  be  returned   in  the  form  of _event
  480terms_.
  481
  482    * success(ID, Terms, Projection, Time, More)
  483      ID is the id of the pengine that succeeded in solving the query.
  484      Terms is a list holding instantiations of `Template`.  Projection
  485      is a list of variable names that should be displayed. Time is
  486      the CPU time used to produce the results and finally, More
  487      is either `true` or `false`, indicating whether we can expect the
  488      pengine to be able to return more solutions or not, would we call
  489      pengine_next/2.
  490
  491    * failure(ID)
  492      ID is the id of the pengine that failed for lack of a solutions.
  493
  494    * error(ID, Term)
  495      ID is the id of the pengine throwing the exception.
  496      Term is the exception's error term.
  497
  498    * output(ID, Term)
  499      ID is the id of a pengine running the query that called
  500      pengine_output/1. Term is the term that was passed in the first
  501      argument of pengine_output/1 when it was called.
  502
  503    * prompt(ID, Term)
  504      ID is the id of the pengine that called pengine_input/2 and Term is
  505      the prompt.
  506
  507Defined in terms of pengine_send/3, like so:
  508
  509==
  510pengine_ask(ID, Query, Options) :-
  511    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  512    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  513==
  514*/
  515
  516pengine_ask(ID, Query, Options) :-
  517    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  518    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  519
  520
  521pengine_ask_option(template(_)).
  522pengine_ask_option(chunk(_)).
  523pengine_ask_option(bindings(_)).
  524pengine_ask_option(breakpoints(_)).
  525
  526
  527/** pengine_next(+NameOrID, +Options) is det
  528
  529Asks pengine NameOrID for the  next  solution   to  a  query  started by
  530pengine_ask/3. Defined options are:
  531
  532    * chunk(+Count)
  533    Modify the chunk-size to Count before asking the next set of
  534    solutions.  This may not be used if the goal was started with
  535    chunk(false).
  536
  537Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  538re-executing the current goal is returned  to the caller's message queue
  539in the form of _event terms_.
  540
  541    * success(ID, Terms, Projection, Time, More)
  542      See pengine_ask/3.
  543
  544    * failure(ID)
  545      ID is the id of the pengine that failed for lack of more solutions.
  546
  547    * error(ID, Term)
  548      ID is the id of the pengine throwing the exception.
  549      Term is the exception's error term.
  550
  551    * output(ID, Term)
  552      ID is the id of a pengine running the query that called
  553      pengine_output/1. Term is the term that was passed in the first
  554      argument of pengine_output/1 when it was called.
  555
  556    * prompt(ID, Term)
  557      ID is the id of the pengine that called pengine_input/2 and Term
  558      is the prompt.
  559
  560Defined in terms of pengine_send/3, as follows:
  561
  562==
  563pengine_next(ID, Options) :-
  564    pengine_send(ID, next, Options).
  565==
  566
  567*/
  568
  569pengine_next(ID, Options) :-
  570    select_option(chunk(Count), Options, Options1),
  571    !,
  572    pengine_send(ID, next(Count), Options1).
  573pengine_next(ID, Options) :-
  574    pengine_send(ID, next, Options).
  575
  576
  577/** pengine_stop(+NameOrID, +Options) is det
  578
  579Tells pengine NameOrID to stop looking  for   more  solutions to a query
  580started by pengine_ask/3. Options are passed to pengine_send/3.
  581
  582Defined in terms of pengine_send/3, like so:
  583
  584==
  585pengine_stop(ID, Options) :-
  586    pengine_send(ID, stop, Options).
  587==
  588*/
  589
  590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  591
  592
  593/** pengine_abort(+NameOrID) is det
  594
  595Aborts the running query. The pengine goes   back  to state `2', waiting
  596for new queries.
  597
  598@see pengine_destroy/1.
  599*/
  600
  601pengine_abort(Name) :-
  602    (   child(Name, Pengine)
  603    ->  true
  604    ;   Pengine = Name
  605    ),
  606    (   pengine_remote(Pengine, Server)
  607    ->  remote_pengine_abort(Server, Pengine, [])
  608    ;   pengine_thread(Pengine, Thread),
  609        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  610        catch(thread_signal(Thread, throw(abort_query)), _, true)
  611    ).
  612
  613
  614/** pengine_destroy(+NameOrID) is det.
  615    pengine_destroy(+NameOrID, +Options) is det.
  616
  617Destroys the pengine NameOrID.  With the option force(true), the pengine
  618is killed using abort/0 and pengine_destroy/2 succeeds.
  619*/
  620
  621pengine_destroy(ID) :-
  622    pengine_destroy(ID, []).
  623
  624pengine_destroy(Name, Options) :-
  625    (   child(Name, ID)
  626    ->  true
  627    ;   ID = Name
  628    ),
  629    option(force(true), Options),
  630    !,
  631    (   pengine_thread(ID, Thread)
  632    ->  catch(thread_signal(Thread, abort),
  633              error(existence_error(thread, _), _), true)
  634    ;   true
  635    ).
  636pengine_destroy(ID, Options) :-
  637    catch(pengine_send(ID, destroy, Options),
  638          error(existence_error(pengine, ID), _),
  639          retractall(child(_,ID))).
  640
  641
  642/*================= pengines administration =======================
  643*/
  644
  645%!  current_pengine(?Id, ?Parent, ?Location)
  646%
  647%   Dynamic predicate that registers our known pengines.  Id is
  648%   an atomic unique datatype.  Parent is the id of our parent
  649%   pengine.  Location is one of
  650%
  651%     - thread(ThreadId)
  652%     - remote(URL)
  653
  654:- dynamic
  655    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  656    pengine_queue/4,                % Id, Queue, TimeOut, Time
  657    output_queue/3,                 % Id, Queue, Time
  658    pengine_user/2,                 % Id, User
  659    pengine_data/2,                 % Id, Data
  660    pengine_detached/2.             % Id, Data
  661:- volatile
  662    current_pengine/6,
  663    pengine_queue/4,
  664    output_queue/3,
  665    pengine_user/2,
  666    pengine_data/2,
  667    pengine_detached/2.  668
  669:- thread_local
  670    child/2.                        % ?Name, ?Child
  671
  672%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  673%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  674%!  pengine_unregister(+Id) is det.
  675
  676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  677    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  678
  679pengine_register_remote(Id, URL, Application, Destroy) :-
  680    thread_self(Queue),
  681    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  682
  683%!  pengine_unregister(+Id)
  684%
  685%   Called by the pengine thread  destruction.   If  we are a remote
  686%   pengine thread, our URL  equals  =http=   and  the  queue is the
  687%   message queue used to send events to the HTTP workers.
  688
  689pengine_unregister(Id) :-
  690    thread_self(Me),
  691    (   current_pengine(Id, Queue, Me, http, _, _)
  692    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  693    ;   true
  694    ),
  695    retractall(current_pengine(Id, _, Me, _, _, _)),
  696    retractall(pengine_user(Id, _)),
  697    retractall(pengine_data(Id, _)).
  698
  699pengine_unregister_remote(Id) :-
  700    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  701
  702%!  pengine_self(-Id) is det.
  703%
  704%   True if the current thread is a pengine with Id.
  705
  706pengine_self(Id) :-
  707    thread_self(Thread),
  708    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  709
  710pengine_parent(Parent) :-
  711    nb_getval(pengine_parent, Parent).
  712
  713pengine_thread(Pengine, Thread) :-
  714    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  715    Thread \== 0,
  716    !.
  717
  718pengine_remote(Pengine, URL) :-
  719    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  720
  721get_pengine_application(Pengine, Application) :-
  722    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  723    !.
  724
  725get_pengine_module(Pengine, Pengine).
  726
  727:- if(current_predicate(uuid/2)).  728pengine_uuid(Id) :-
  729    uuid(Id, [version(4)]).             % Version 4 is random.
  730:- else.  731pengine_uuid(Id) :-
  732    (   current_prolog_flag(max_integer, Max1)
  733    ->  Max is Max1-1
  734    ;   Max is 1<<128
  735    ),
  736    random_between(0, Max, Num),
  737    atom_number(Id, Num).
  738:- endif.  739
  740%!  protect_pengine(+Id, :Goal) is semidet.
  741%
  742%   Run Goal while protecting the Pengine  Id from being destroyed. Used
  743%   by the HTTP  I/O  routines  to   avoid  that  the  Pengine's  module
  744%   disappears while I/O is in progress. We  use a pool of locks because
  745%   the lock may be held relatively long by output routines.
  746%
  747%   This also runs Goal if the Pengine no longer exists. This deals with
  748%   Pengines terminated through destroy_or_continue/1.
  749%
  750%   @bug After destroy_or_continue/1 takes the destroy route, the module
  751%   may drop-out at any point in time,   resulting  in a possible crash.
  752%   Seems the only safe way out is   to  do (de)serialization inside the
  753%   Pengine.
  754
  755:- meta_predicate protect_pengine(+, 0).  756
  757protect_pengine(Id, Goal) :-
  758    term_hash(Id, Hash),
  759    LockN is Hash mod 64,
  760    atom_concat(pengine_done_, LockN, Lock),
  761    with_mutex(Lock,
  762               (   pengine_thread(Id, _)
  763               ->  Goal
  764               ;   Goal
  765               )).
  766
  767
  768/** pengine_application(+Application) is det.
  769
  770Directive that must be used to declare a pengine application module. The
  771module must not be associated to any   file.  The default application is
  772=pengine_sandbox=.  The  example  below  creates    a   new  application
  773=address_book=  and  imports  the  API  defined    in  the  module  file
  774=adress_book_api.pl= into the application.
  775
  776  ==
  777  :- pengine_application(address_book).
  778  :- use_module(address_book:adress_book_api).
  779  ==
  780*/
  781
  782pengine_application(Application) :-
  783    throw(error(context_error(nodirective,
  784                             pengine_application(Application)), _)).
  785
  786:- multifile
  787    system:term_expansion/2,
  788    current_application/1.  789
  790%!  current_pengine_application(?Application) is nondet.
  791%
  792%   True when Application is a currently defined application.
  793%
  794%   @see pengine_application/1
  795
  796current_pengine_application(Application) :-
  797    current_application(Application).
  798
  799
  800% Default settings for all applications
  801
  802:- setting(thread_pool_size, integer, 100,
  803           'Maximum number of pengines this application can run.').  804:- setting(thread_pool_stacks, list(compound), [],
  805           'Maximum stack sizes for pengines this application can run.').  806:- setting(slave_limit, integer, 3,
  807           'Maximum number of slave pengines a master pengine can create.').  808:- setting(time_limit, number, 300,
  809           'Maximum time to wait for output').  810:- setting(idle_limit, number, 300,
  811           'Pengine auto-destroys when idle for this time').  812:- setting(safe_goal_limit, number, 10,
  813           'Maximum time to try proving safety of the goal').  814:- setting(program_space, integer, 100_000_000,
  815           'Maximum memory used by predicates').  816:- setting(allow_from, list(atom), [*],
  817           'IP addresses from which remotes are allowed to connect').  818:- setting(deny_from, list(atom), [],
  819           'IP addresses from which remotes are NOT allowed to connect').  820:- setting(debug_info, boolean, false,
  821           'Keep information to support source-level debugging').  822
  823
  824system:term_expansion((:- pengine_application(Application)), Expanded) :-
  825    must_be(atom, Application),
  826    (   module_property(Application, file(_))
  827    ->  permission_error(create, pengine_application, Application)
  828    ;   true
  829    ),
  830    expand_term((:- setting(Application:thread_pool_size, integer,
  831                            setting(pengines:thread_pool_size),
  832                            'Maximum number of pengines this \c
  833                            application can run.')),
  834                ThreadPoolSizeSetting),
  835    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  836                            setting(pengines:thread_pool_stacks),
  837                            'Maximum stack sizes for pengines \c
  838                            this application can run.')),
  839                ThreadPoolStacksSetting),
  840    expand_term((:- setting(Application:slave_limit, integer,
  841                            setting(pengines:slave_limit),
  842                            'Maximum number of local slave pengines \c
  843                            a master pengine can create.')),
  844                SlaveLimitSetting),
  845    expand_term((:- setting(Application:time_limit, number,
  846                            setting(pengines:time_limit),
  847                            'Maximum time to wait for output')),
  848                TimeLimitSetting),
  849    expand_term((:- setting(Application:idle_limit, number,
  850                            setting(pengines:idle_limit),
  851                            'Pengine auto-destroys when idle for this time')),
  852                IdleLimitSetting),
  853    expand_term((:- setting(Application:safe_goal_limit, number,
  854                            setting(pengines:safe_goal_limit),
  855                            'Maximum time to try proving safety of the goal')),
  856                SafeGoalLimitSetting),
  857    expand_term((:- setting(Application:program_space, integer,
  858                            setting(pengines:program_space),
  859                            'Maximum memory used by predicates')),
  860                ProgramSpaceSetting),
  861    expand_term((:- setting(Application:allow_from, list(atom),
  862                            setting(pengines:allow_from),
  863                            'IP addresses from which remotes are allowed \c
  864                            to connect')),
  865                AllowFromSetting),
  866    expand_term((:- setting(Application:deny_from, list(atom),
  867                            setting(pengines:deny_from),
  868                            'IP addresses from which remotes are NOT \c
  869                            allowed to connect')),
  870                DenyFromSetting),
  871    expand_term((:- setting(Application:debug_info, boolean,
  872                            setting(pengines:debug_info),
  873                            'Keep information to support source-level \c
  874                            debugging')),
  875                DebugInfoSetting),
  876    flatten([ pengines:current_application(Application),
  877              ThreadPoolSizeSetting,
  878              ThreadPoolStacksSetting,
  879              SlaveLimitSetting,
  880              TimeLimitSetting,
  881              IdleLimitSetting,
  882              SafeGoalLimitSetting,
  883              ProgramSpaceSetting,
  884              AllowFromSetting,
  885              DenyFromSetting,
  886              DebugInfoSetting
  887            ], Expanded).
  888
  889% Register default application
  890
  891:- pengine_application(pengine_sandbox).  892
  893
  894/** pengine_property(?Pengine, ?Property) is nondet.
  895
  896True when Property is a property of   the  given Pengine. Enumerates all
  897pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  898properties are:
  899
  900  * self(ID)
  901    Identifier of the pengine.  This is the same as the first argument,
  902    and can be used to enumerate all known pengines.
  903  * alias(Name)
  904    Name is the alias name of the pengine, as provided through the
  905    `alias` option when creating the pengine.
  906  * thread(Thread)
  907    If the pengine is a local pengine, Thread is the Prolog thread
  908    identifier of the pengine.
  909  * remote(Server)
  910    If the pengine is remote, the URL of the server.
  911  * application(Application)
  912    Pengine runs the given application
  913  * module(Module)
  914    Temporary module used for running the Pengine.
  915  * destroy(Destroy)
  916    Destroy is =true= if the pengines is destroyed automatically
  917    after completing the query.
  918  * parent(Queue)
  919    Message queue to which the (local) pengine reports.
  920  * source(?SourceID, ?Source)
  921    Source is the source code with the given SourceID. May be present if
  922    the setting `debug_info` is present.
  923  * detached(?Time)
  924    Pengine was detached at Time.
  925*/
  926
  927
  928pengine_property(Id, Prop) :-
  929    nonvar(Id), nonvar(Prop),
  930    pengine_property2(Prop, Id),
  931    !.
  932pengine_property(Id, Prop) :-
  933    pengine_property2(Prop, Id).
  934
  935pengine_property2(self(Id), Id) :-
  936    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  937pengine_property2(module(Id), Id) :-
  938    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  939pengine_property2(alias(Alias), Id) :-
  940    child(Alias, Id),
  941    Alias \== Id.
  942pengine_property2(thread(Thread), Id) :-
  943    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  944    Thread \== 0.
  945pengine_property2(remote(Server), Id) :-
  946    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  947pengine_property2(application(Application), Id) :-
  948    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  949pengine_property2(destroy(Destroy), Id) :-
  950    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  951pengine_property2(parent(Parent), Id) :-
  952    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  953pengine_property2(source(SourceID, Source), Id) :-
  954    pengine_data(Id, source(SourceID, Source)).
  955pengine_property2(detached(When), Id) :-
  956    pengine_detached(Id, When).
  957
  958/** pengine_output(+Term) is det
  959
  960Sends Term to the parent pengine or thread.
  961*/
  962
  963pengine_output(Term) :-
  964    pengine_self(Me),
  965    pengine_reply(output(Me, Term)).
  966
  967
  968/** pengine_debug(+Format, +Args) is det
  969
  970Create a message using format/3 from Format   and  Args and send this to
  971the    client.    The    default    JavaScript    client    will    call
  972=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  973pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  974topic pengine(debug) is enabled by default.
  975
  976@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  977@see format/2 for format specifications
  978*/
  979
  980pengine_debug(Format, Args) :-
  981    pengine_parent(Queue),
  982    pengine_self(Self),
  983    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  984    (   var(E)
  985    ->  format(atom(Message), Format, Args)
  986    ;   message_to_string(E, Message)
  987    ),
  988    pengine_reply(Queue, debug(Self, Message)).
  989
  990
  991/*================= Local pengine =======================
  992*/
  993
  994%!  local_pengine_create(+Options)
  995%
  996%   Creates  a  local   Pengine,   which    is   a   thread  running
  997%   pengine_main/2.  It maintains two predicates:
  998%
  999%     - The global dynamic predicate id/2 relates Pengines to their
 1000%       childs.
 1001%     - The local predicate id/2 maps named childs to their ids.
 1002
 1003local_pengine_create(Options) :-
 1004    thread_self(Self),
 1005    option(application(Application), Options, pengine_sandbox),
 1006    create(Self, Child, Options, local, Application),
 1007    option(alias(Name), Options, Child),
 1008    assert(child(Name, Child)).
 1009
 1010
 1011%!  thread_pool:create_pool(+Application) is det.
 1012%
 1013%   On demand creation of a thread pool for a pengine application.
 1014
 1015:- multifile thread_pool:create_pool/1. 1016
 1017thread_pool:create_pool(Application) :-
 1018    current_application(Application),
 1019    setting(Application:thread_pool_size, Size),
 1020    setting(Application:thread_pool_stacks, Stacks),
 1021    thread_pool_create(Application, Size, Stacks).
 1022
 1023%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
 1024%
 1025%   Create a new pengine thread.
 1026%
 1027%   @arg Queue is the queue (or thread handle) to report to
 1028%   @arg Child is the identifier of the created pengine.
 1029%   @arg URL is one of =local= or =http=
 1030
 1031create(Queue, Child, Options, local, Application) :-
 1032    !,
 1033    pengine_child_id(Child),
 1034    create0(Queue, Child, Options, local, Application).
 1035create(Queue, Child, Options, URL, Application) :-
 1036    pengine_child_id(Child),
 1037    catch(create0(Queue, Child, Options, URL, Application),
 1038          Error,
 1039          create_error(Queue, Child, Error)).
 1040
 1041pengine_child_id(Child) :-
 1042    (   nonvar(Child)
 1043    ->  true
 1044    ;   pengine_uuid(Child)
 1045    ).
 1046
 1047create_error(Queue, Child, Error) :-
 1048    pengine_reply(Queue, error(Child, Error)).
 1049
 1050create0(Queue, Child, Options, URL, Application) :-
 1051    (  current_application(Application)
 1052    -> true
 1053    ;  existence_error(pengine_application, Application)
 1054    ),
 1055    (   URL \== http                    % pengine is _not_ a child of the
 1056                                        % HTTP server thread
 1057    ->  aggregate_all(count, child(_,_), Count),
 1058        setting(Application:slave_limit, Max),
 1059        (   Count >= Max
 1060        ->  throw(error(resource_error(max_pengines), _))
 1061        ;   true
 1062        )
 1063    ;   true
 1064    ),
 1065    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1066    thread_create_in_pool(
 1067        Application,
 1068        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1069        [ at_exit(pengine_done)
 1070        | RestOptions
 1071        ]),
 1072    option(destroy(Destroy), PengineOptions, true),
 1073    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1074    thread_send_message(ChildThread, pengine_registered(Child)),
 1075    (   option(id(Id), Options)
 1076    ->  Id = Child
 1077    ;   true
 1078    ).
 1079
 1080pengine_create_option(src_text(_)).
 1081pengine_create_option(src_url(_)).
 1082pengine_create_option(application(_)).
 1083pengine_create_option(destroy(_)).
 1084pengine_create_option(ask(_)).
 1085pengine_create_option(template(_)).
 1086pengine_create_option(bindings(_)).
 1087pengine_create_option(chunk(_)).
 1088pengine_create_option(alias(_)).
 1089pengine_create_option(user(_)).
 1090
 1091
 1092%!  pengine_done is det.
 1093%
 1094%   Called from the pengine thread   `at_exit`  option. Destroys _child_
 1095%   pengines  using  pengine_destroy/1.  Cleaning  up   the  Pengine  is
 1096%   synchronised by the `pengine_done` mutex. See read_event/6.
 1097
 1098:- public
 1099    pengine_done/0. 1100
 1101pengine_done :-
 1102    thread_self(Me),
 1103    (   thread_property(Me, status(exception(Ex))),
 1104        abort_exception(Ex),
 1105        thread_detach(Me),
 1106        pengine_self(Pengine)
 1107    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1108              error(_,_), true)
 1109    ;   true
 1110    ),
 1111    forall(child(_Name, Child),
 1112           pengine_destroy(Child)),
 1113    pengine_self(Id),
 1114    protect_pengine(Id, pengine_unregister(Id)).
 1115
 1116abort_exception('$aborted').
 1117abort_exception(unwind(abort)).
 1118
 1119%!  pengine_main(+Parent, +Options, +Application)
 1120%
 1121%   Run a pengine main loop. First acknowledges its creation and run
 1122%   pengine_main_loop/1.
 1123
 1124:- thread_local wrap_first_answer_in_create_event/2. 1125
 1126:- meta_predicate
 1127    pengine_prepare_source(:, +). 1128
 1129pengine_main(Parent, Options, Application) :-
 1130    fix_streams,
 1131    thread_get_message(pengine_registered(Self)),
 1132    nb_setval(pengine_parent, Parent),
 1133    pengine_register_user(Options),
 1134    set_prolog_flag(mitigate_spectre, true),
 1135    catch(in_temporary_module(
 1136              Self,
 1137              pengine_prepare_source(Application, Options),
 1138              pengine_create_and_loop(Self, Application, Options)),
 1139          prepare_source_failed,
 1140          pengine_terminate(Self)).
 1141
 1142pengine_create_and_loop(Self, Application, Options) :-
 1143    setting(Application:slave_limit, SlaveLimit),
 1144    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1145    (   option(ask(Query0), Options)
 1146    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1147        (   string(Query0)                      % string is not callable
 1148        ->  (   option(template(TemplateS), Options)
 1149            ->  Ask2 = Query0-TemplateS
 1150            ;   Ask2 = Query0
 1151            ),
 1152            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1153                  Error, true),
 1154            (   var(Error)
 1155            ->  true
 1156            ;   send_error(Error),
 1157                throw(prepare_source_failed)
 1158            )
 1159        ;   Query = Query0,
 1160            option(template(Template), Options, Query),
 1161            option(bindings(Bindings), Options, [])
 1162        ),
 1163        option(chunk(Chunk), Options, 1),
 1164        pengine_ask(Self, Query,
 1165                    [ template(Template),
 1166                      chunk(Chunk),
 1167                      bindings(Bindings)
 1168                    ])
 1169    ;   Extra = [],
 1170        pengine_reply(CreateEvent)
 1171    ),
 1172    pengine_main_loop(Self).
 1173
 1174
 1175%!  ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det.
 1176%
 1177%   Translate the AskSpec into a query, template and bindings. The trick
 1178%   is that we must parse using the  operator declarations of the source
 1179%   and we must make sure  variable   sharing  between  query and answer
 1180%   template are known.
 1181
 1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1183    !,
 1184    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1185    term_string(t(Template1,Ask1), AskTemplate,
 1186                [ variable_names(Bindings0),
 1187                  module(Module)
 1188                ]),
 1189    phrase(template_bindings(Template1, Bindings0), Bindings).
 1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1191    term_string(Ask1, Ask,
 1192                [ variable_names(Bindings),
 1193                  module(Module)
 1194                ]),
 1195    exclude(anon, Bindings, Bindings1),
 1196    dict_create(Template, swish_default_template, Bindings1).
 1197
 1198template_bindings(Var, Bindings) -->
 1199    { var(Var) }, !,
 1200    (   { var_binding(Bindings, Var, Binding)
 1201        }
 1202    ->  [Binding]
 1203    ;   []
 1204    ).
 1205template_bindings([H|T], Bindings) -->
 1206    !,
 1207    template_bindings(H, Bindings),
 1208    template_bindings(T, Bindings).
 1209template_bindings(Compoound, Bindings) -->
 1210    { compound(Compoound), !,
 1211      compound_name_arguments(Compoound, _, Args)
 1212    },
 1213    template_bindings(Args, Bindings).
 1214template_bindings(_, _) --> [].
 1215
 1216var_binding(Bindings, Var, Binding) :-
 1217    member(Binding, Bindings),
 1218    arg(2, Binding, V),
 1219    V == Var, !.
 1220
 1221%!  fix_streams is det.
 1222%
 1223%   If we are a pengine that is   created  from a web server thread,
 1224%   the current output points to a CGI stream.
 1225
 1226fix_streams :-
 1227    fix_stream(current_output).
 1228
 1229fix_stream(Name) :-
 1230    is_cgi_stream(Name),
 1231    !,
 1232    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1233    set_stream(user_output, alias(Name)).
 1234fix_stream(_).
 1235
 1236%!  pengine_prepare_source(:Application, +Options) is det.
 1237%
 1238%   Load the source into the pengine's module.
 1239%
 1240%   @throws =prepare_source_failed= if it failed to prepare the
 1241%           sources.
 1242
 1243pengine_prepare_source(Module:Application, Options) :-
 1244    setting(Application:program_space, SpaceLimit),
 1245    set_module(Module:program_space(SpaceLimit)),
 1246    delete_import_module(Module, user),
 1247    add_import_module(Module, Application, start),
 1248    catch(prep_module(Module, Application, Options), Error, true),
 1249    (   var(Error)
 1250    ->  true
 1251    ;   send_error(Error),
 1252        throw(prepare_source_failed)
 1253    ).
 1254
 1255prep_module(Module, Application, Options) :-
 1256    maplist(copy_flag(Module, Application), [var_prefix]),
 1257    forall(prepare_module(Module, Application, Options), true),
 1258    setup_call_cleanup(
 1259        '$set_source_module'(OldModule, Module),
 1260        maplist(process_create_option(Module), Options),
 1261        '$set_source_module'(OldModule)).
 1262
 1263copy_flag(Module, Application, Flag) :-
 1264    current_prolog_flag(Application:Flag, Value),
 1265    !,
 1266    set_prolog_flag(Module:Flag, Value).
 1267copy_flag(_, _, _).
 1268
 1269process_create_option(Application, src_text(Text)) :-
 1270    !,
 1271    pengine_src_text(Text, Application).
 1272process_create_option(Application, src_url(URL)) :-
 1273    !,
 1274    pengine_src_url(URL, Application).
 1275process_create_option(_, _).
 1276
 1277
 1278%!  prepare_module(+Module, +Application, +Options) is semidet.
 1279%
 1280%   Hook, called to initialize  the   temporary  private module that
 1281%   provides the working context of a pengine. This hook is executed
 1282%   by the pengine's thread.  Preparing the source consists of three
 1283%   steps:
 1284%
 1285%     1. Add Application as (first) default import module for Module
 1286%     2. Call this hook
 1287%     3. Compile the source provided by the the `src_text` and
 1288%        `src_url` options
 1289%
 1290%   @arg    Module is a new temporary module (see
 1291%           in_temporary_module/3) that may be (further) prepared
 1292%           by this hook.
 1293%   @arg    Application (also a module) associated to the pengine.
 1294%   @arg    Options is passed from the environment and should
 1295%           (currently) be ignored.
 1296
 1297
 1298pengine_main_loop(ID) :-
 1299    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1300
 1301pengine_aborted(ID) :-
 1302    thread_self(Self),
 1303    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1304    empty_queue,
 1305    destroy_or_continue(abort(ID)).
 1306
 1307
 1308%!  guarded_main_loop(+Pengine) is det.
 1309%
 1310%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1311%   events:
 1312%
 1313%     - destroy
 1314%     Terminate the pengine
 1315%     - ask(:Goal, +Options)
 1316%     Solve Goal.
 1317
 1318guarded_main_loop(ID) :-
 1319    pengine_request(Request),
 1320    (   Request = destroy
 1321    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1322        pengine_terminate(ID)
 1323    ;   Request = ask(Goal, Options)
 1324    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1325        ask(ID, Goal, Options)
 1326    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1327        pengine_reply(error(ID, error(protocol_error, _))),
 1328        guarded_main_loop(ID)
 1329    ).
 1330
 1331
 1332pengine_terminate(ID) :-
 1333    pengine_reply(destroy(ID)),
 1334    thread_self(Me),            % Make the thread silently disappear
 1335    thread_detach(Me).
 1336
 1337
 1338%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1339%
 1340%   Solve Goal. Note that because we can ask for a new goal in state
 1341%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1342%   need to be sure to  have  a   choice  point  before  we can call
 1343%   prolog_current_choice/1. This is the reason   why this predicate
 1344%   has two clauses.
 1345
 1346solve(Chunk, Template, Goal, ID) :-
 1347    prolog_current_choice(Choice),
 1348    (   integer(Chunk)
 1349    ->  State = count(Chunk)
 1350    ;   Chunk == false
 1351    ->  State = no_chunk
 1352    ;   domain_error(chunk, Chunk)
 1353    ),
 1354    statistics(cputime, Epoch),
 1355    Time = time(Epoch),
 1356    nb_current('$variable_names', Bindings),
 1357    filter_template(Template, Bindings, Template2),
 1358    '$current_typein_module'(CurrTypeIn),
 1359    (   '$set_typein_module'(ID),
 1360        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1361                                              set_projection(Goal, Bindings),
 1362                                              Result),
 1363                           Error, true),
 1364                     query_done(Det, CurrTypeIn)),
 1365        arg(1, Time, T0),
 1366        statistics(cputime, T1),
 1367        CPUTime is T1-T0,
 1368        forall(pengine_flush_output_hook, true),
 1369        (   var(Error)
 1370        ->  projection(Projection),
 1371            (   var(Det)
 1372            ->  pengine_reply(success(ID, Result, Projection,
 1373                                      CPUTime, true)),
 1374                more_solutions(ID, Choice, State, Time)
 1375            ;   !,                      % commit
 1376                destroy_or_continue(success(ID, Result, Projection,
 1377                                            CPUTime, false))
 1378            )
 1379        ;   !,                          % commit
 1380            (   Error == abort_query
 1381            ->  throw(Error)
 1382            ;   destroy_or_continue(error(ID, Error))
 1383            )
 1384        )
 1385    ;   !,                              % commit
 1386        arg(1, Time, T0),
 1387        statistics(cputime, T1),
 1388        CPUTime is T1-T0,
 1389        destroy_or_continue(failure(ID, CPUTime))
 1390    ).
 1391solve(_, _, _, _).                      % leave a choice point
 1392
 1393query_done(true, CurrTypeIn) :-
 1394    '$set_typein_module'(CurrTypeIn).
 1395
 1396
 1397%!  set_projection(:Goal, +Bindings)
 1398%
 1399%   findnsols_no_empty/4  copies  its  goal  and    template   to  avoid
 1400%   instantiation thereof when it stops after finding N solutions. Using
 1401%   this helper we can a renamed version of Bindings that we can set.
 1402
 1403set_projection(Goal, Bindings) :-
 1404    b_setval('$variable_names', Bindings),
 1405    call(Goal).
 1406
 1407projection(Projection) :-
 1408    nb_current('$variable_names', Bindings),
 1409    !,
 1410    maplist(var_name, Bindings, Projection).
 1411projection([]).
 1412
 1413%!  filter_template(+Template0, +Bindings, -Template) is det.
 1414%
 1415%   Establish the final template. This is   there  because hooks such as
 1416%   goal_expansion/2 and the SWISH query  hooks   can  modify the set of
 1417%   bindings.
 1418%
 1419%   @bug Projection and template handling is pretty messy.
 1420
 1421filter_template(Template0, Bindings, Template) :-
 1422    is_dict(Template0, swish_default_template),
 1423    !,
 1424    dict_create(Template, swish_default_template, Bindings).
 1425filter_template(Template, _Bindings, Template).
 1426
 1427findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1428    List = [Template],
 1429    call(Goal).
 1430findnsols_no_empty(State, Template, Goal, List) =>
 1431    findnsols(State, Template, Goal, List),
 1432    List \== [].
 1433
 1434destroy_or_continue(Event) :-
 1435    arg(1, Event, ID),
 1436    (   pengine_property(ID, destroy(true))
 1437    ->  thread_self(Me),
 1438        thread_detach(Me),
 1439        pengine_reply(destroy(ID, Event))
 1440    ;   pengine_reply(Event),
 1441        guarded_main_loop(ID)
 1442    ).
 1443
 1444%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1445%
 1446%   Called after a solution was found while  there can be more. This
 1447%   is state `6' of the state machine. It processes these events:
 1448%
 1449%     * stop
 1450%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1451%     * next
 1452%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1453%     providing at most the current `chunk` solutions.
 1454%     * next(Count)
 1455%     As `next`, but sets the new chunk-size to Count.
 1456%     * ask(Goal, Options)
 1457%     Ask another goal.  Note that we must commit the choice point
 1458%     of the previous goal asked for.
 1459
 1460more_solutions(ID, Choice, State, Time) :-
 1461    pengine_request(Event),
 1462    more_solutions(Event, ID, Choice, State, Time).
 1463
 1464more_solutions(stop, ID, _Choice, _State, _Time) :-
 1465    !,
 1466    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1467    destroy_or_continue(stop(ID)).
 1468more_solutions(next, ID, _Choice, _State, Time) :-
 1469    !,
 1470    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1471    statistics(cputime, T0),
 1472    nb_setarg(1, Time, T0),
 1473    fail.
 1474more_solutions(next(Count), ID, _Choice, State, Time) :-
 1475    Count > 0,
 1476    State = count(_),                   % else fallthrough to protocol error
 1477    !,
 1478    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1479    nb_setarg(1, State, Count),
 1480    statistics(cputime, T0),
 1481    nb_setarg(1, Time, T0),
 1482    fail.
 1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1484    !,
 1485    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1486    prolog_cut_to(Choice),
 1487    ask(ID, Goal, Options).
 1488more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1489    !,
 1490    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1491    pengine_terminate(ID).
 1492more_solutions(Event, ID, Choice, State, Time) :-
 1493    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1494    pengine_reply(error(ID, error(protocol_error, _))),
 1495    more_solutions(ID, Choice, State, Time).
 1496
 1497%!  ask(+Pengine, :Goal, +Options)
 1498%
 1499%   Migrate from state `2' to `3'.  This predicate validates that it
 1500%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1501%   prove the goal. It takes care of the chunk(N) option.
 1502
 1503ask(ID, Goal, Options) :-
 1504    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1505    !,
 1506    (   var(Error)
 1507    ->  option(template(Template), Options, Goal),
 1508        option(chunk(N), Options, 1),
 1509        solve(N, Template, Goal1, ID)
 1510    ;   pengine_reply(error(ID, Error)),
 1511        guarded_main_loop(ID)
 1512    ).
 1513
 1514%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1515%
 1516%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1517%   perform goal expansion and, if the   system  is sandboxed, check
 1518%   the sandbox.
 1519%
 1520%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1521%   to write, but this does not work correctly if the user wishes to
 1522%   expand `X:Y` while interpreting `X` not   as the module in which
 1523%   to run `Y`. This happens in the  CQL package. Possibly we should
 1524%   disallow this reinterpretation?
 1525
 1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1527    option(bindings(Bindings), Options, []),
 1528    b_setval('$variable_names', Bindings),
 1529    (   prepare_goal(Goal0, Goal1, Options)
 1530    ->  true
 1531    ;   Goal1 = Goal0
 1532    ),
 1533    get_pengine_module(ID, Module),
 1534    setup_call_cleanup(
 1535        '$set_source_module'(Old, Module),
 1536        expand_goal(Goal1, Goal),
 1537        '$set_source_module'(_, Old)),
 1538    (   pengine_not_sandboxed(ID)
 1539    ->  true
 1540    ;   get_pengine_application(ID, App),
 1541        setting(App:safe_goal_limit, Limit),
 1542        catch(call_with_time_limit(
 1543                  Limit,
 1544                  safe_goal(Module:Goal)), E, true)
 1545    ->  (   var(E)
 1546        ->  true
 1547        ;   E = time_limit_exceeded
 1548        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1549        ;   throw(E)
 1550        )
 1551    ).
 1552
 1553
 1554%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1555%
 1556%   Pre-preparation hook for running Goal0. The hook runs in the context
 1557%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1558%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1559%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1560%   Goal0 is used for further processing.
 1561%
 1562%   @arg Options provides the options as given to _ask_
 1563
 1564
 1565%%  pengine_not_sandboxed(+Pengine) is semidet.
 1566%
 1567%   True when pengine does not operate in sandboxed mode. This implies a
 1568%   user must be  registered  by   authentication_hook/3  and  the  hook
 1569%   pengines:not_sandboxed(User, Application) must succeed.
 1570
 1571pengine_not_sandboxed(ID) :-
 1572    pengine_user(ID, User),
 1573    pengine_property(ID, application(App)),
 1574    not_sandboxed(User, App),
 1575    !.
 1576
 1577%%  not_sandboxed(+User, +Application) is semidet.
 1578%
 1579%   This hook is called to see whether the Pengine must be executed in a
 1580%   protected environment. It is only called after authentication_hook/3
 1581%   has confirmed the authentity  of  the   current  user.  If this hook
 1582%   succeeds, both loading the code and  executing the query is executed
 1583%   without enforcing sandbox security.  Typically, one should:
 1584%
 1585%     1. Provide a safe user authentication hook.
 1586%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1587%        ensure that the network between the proxy and the pengine
 1588%        server can be trusted.
 1589
 1590
 1591/** pengine_pull_response(+Pengine, +Options) is det
 1592
 1593Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1594remote process, else does nothing at all.
 1595*/
 1596
 1597pengine_pull_response(Pengine, Options) :-
 1598    pengine_remote(Pengine, Server),
 1599    !,
 1600    remote_pengine_pull_response(Server, Pengine, Options).
 1601pengine_pull_response(_ID, _Options).
 1602
 1603
 1604/** pengine_input(+Prompt, -Term) is det
 1605
 1606Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be
 1607any term, compound as well as atomic.
 1608*/
 1609
 1610pengine_input(Prompt, Term) :-
 1611    pengine_self(Self),
 1612    pengine_parent(Parent),
 1613    pengine_reply(Parent, prompt(Self, Prompt)),
 1614    pengine_request(Request),
 1615    (   Request = input(Input)
 1616    ->  Term = Input
 1617    ;   Request == destroy
 1618    ->  abort
 1619    ;   throw(error(protocol_error,_))
 1620    ).
 1621
 1622
 1623/** pengine_respond(+Pengine, +Input, +Options) is det
 1624
 1625Sends a response in the form of the term Input to a slave (child) pengine
 1626that has prompted its master (parent) for input.
 1627
 1628Defined in terms of pengine_send/3, as follows:
 1629
 1630==
 1631pengine_respond(Pengine, Input, Options) :-
 1632    pengine_send(Pengine, input(Input), Options).
 1633==
 1634
 1635*/
 1636
 1637pengine_respond(Pengine, Input, Options) :-
 1638    pengine_send(Pengine, input(Input), Options).
 1639
 1640
 1641%!  send_error(+Error) is det.
 1642%
 1643%   Send an error to my parent.   Remove non-readable blobs from the
 1644%   error term first using replace_blobs/2. If  the error contains a
 1645%   stack-trace, this is resolved to a string before sending.
 1646
 1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1648    is_list(Frames),
 1649    !,
 1650    with_output_to(string(Stack),
 1651                   print_prolog_backtrace(current_output, Frames)),
 1652    pengine_self(Self),
 1653    replace_blobs(Formal, Formal1),
 1654    replace_blobs(Message, Message1),
 1655    pengine_reply(error(Self, error(Formal1,
 1656                                    context(prolog_stack(Stack), Message1)))).
 1657send_error(Error) :-
 1658    pengine_self(Self),
 1659    replace_blobs(Error, Error1),
 1660    pengine_reply(error(Self, Error1)).
 1661
 1662%!  replace_blobs(Term0, Term) is det.
 1663%
 1664%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1665%   for error messages that may hold   streams  and other handles to
 1666%   non-readable objects.
 1667
 1668replace_blobs(Blob, Atom) :-
 1669    blob(Blob, Type), Type \== text,
 1670    !,
 1671    format(atom(Atom), '~p', [Blob]).
 1672replace_blobs(Term0, Term) :-
 1673    compound(Term0),
 1674    !,
 1675    compound_name_arguments(Term0, Name, Args0),
 1676    maplist(replace_blobs, Args0, Args),
 1677    compound_name_arguments(Term, Name, Args).
 1678replace_blobs(Term, Term).
 1679
 1680
 1681/*================= Remote pengines =======================
 1682*/
 1683
 1684
 1685remote_pengine_create(BaseURL, Options) :-
 1686    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1687        (       option(ask(Query), PengineOptions0),
 1688                \+ option(template(_Template), PengineOptions0)
 1689        ->      PengineOptions = [template(Query)|PengineOptions0]
 1690        ;       PengineOptions = PengineOptions0
 1691        ),
 1692    options_to_dict(PengineOptions, PostData),
 1693    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1694    arg(1, Reply, ID),
 1695    (   option(id(ID2), Options)
 1696    ->  ID = ID2
 1697    ;   true
 1698    ),
 1699    option(alias(Name), Options, ID),
 1700    assert(child(Name, ID)),
 1701    (   (   functor(Reply, create, _)   % actually created
 1702        ;   functor(Reply, output, _)   % compiler messages
 1703        )
 1704    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1705        option(destroy(Destroy), PengineOptions, true),
 1706        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1707    ;   true
 1708    ),
 1709    thread_self(Queue),
 1710    pengine_reply(Queue, Reply).
 1711
 1712options_to_dict(Options, Dict) :-
 1713    select_option(ask(Ask), Options, Options1),
 1714    select_option(template(Template), Options1, Options2),
 1715    !,
 1716    no_numbered_var_in(Ask+Template),
 1717    findall(AskString-TemplateString,
 1718            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1719            [ AskString-TemplateString ]),
 1720    options_to_dict(Options2, Dict0),
 1721    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1722options_to_dict(Options, Dict) :-
 1723    maplist(prolog_option, Options, Options1),
 1724    dict_create(Dict, _, Options1).
 1725
 1726no_numbered_var_in(Term) :-
 1727    sub_term(Sub, Term),
 1728    subsumes_term('$VAR'(_), Sub),
 1729    !,
 1730    domain_error(numbered_vars_free_term, Term).
 1731no_numbered_var_in(_).
 1732
 1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1734    numbervars(Ask+Template, 0, _),
 1735    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1736    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1737                                            Template, WOpts
 1738                                          ]),
 1739    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1740
 1741prolog_option(Option0, Option) :-
 1742    create_option_type(Option0, term),
 1743    !,
 1744    Option0 =.. [Name,Value],
 1745    format(string(String), '~k', [Value]),
 1746    Option =.. [Name,String].
 1747prolog_option(Option, Option).
 1748
 1749create_option_type(ask(_),         term).
 1750create_option_type(template(_),    term).
 1751create_option_type(application(_), atom).
 1752
 1753remote_pengine_send(BaseURL, ID, Event, Options) :-
 1754    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1755    thread_self(Queue),
 1756    pengine_reply(Queue, Reply).
 1757
 1758remote_pengine_pull_response(BaseURL, ID, Options) :-
 1759    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1760    thread_self(Queue),
 1761    pengine_reply(Queue, Reply).
 1762
 1763remote_pengine_abort(BaseURL, ID, Options) :-
 1764    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1765    thread_self(Queue),
 1766    pengine_reply(Queue, Reply).
 1767
 1768%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1769%
 1770%   Issue a GET request on Server and   unify Reply with the replied
 1771%   term.
 1772
 1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1774    !,
 1775    server_url(Server, Action, [id=ID], URL),
 1776    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1777              [ post(prolog(Event))     % makes it impossible to interrupt.
 1778              | Options
 1779              ]),
 1780    call_cleanup(
 1781        read_prolog_reply(Stream, Reply),
 1782        close(Stream)).
 1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1784    server_url(Server, Action, [id=ID|Params], URL),
 1785    http_open(URL, Stream, Options),
 1786    call_cleanup(
 1787        read_prolog_reply(Stream, Reply),
 1788        close(Stream)).
 1789
 1790remote_post_rec(Server, Action, Data, Reply, Options) :-
 1791    server_url(Server, Action, [], URL),
 1792    probe(Action, URL, Options),
 1793    http_open(URL, Stream,
 1794              [ post(json(Data))
 1795              | Options
 1796              ]),
 1797    call_cleanup(
 1798        read_prolog_reply(Stream, Reply),
 1799        close(Stream)).
 1800
 1801%!  probe(+Action, +URL) is det.
 1802%
 1803%   Probe the target. This is a  good   idea  before posting a large
 1804%   document and be faced with an authentication challenge. Possibly
 1805%   we should make this an option for simpler scenarios.
 1806
 1807probe(create, URL, Options) :-
 1808    !,
 1809    http_open(URL, Stream, [method(options)|Options]),
 1810    close(Stream).
 1811probe(_, _, _).
 1812
 1813read_prolog_reply(In, Reply) :-
 1814    set_stream(In, encoding(utf8)),
 1815    read(In, Reply0),
 1816    rebind_cycles(Reply0, Reply).
 1817
 1818rebind_cycles(@(Reply, Bindings), Reply) :-
 1819    is_list(Bindings),
 1820    !,
 1821    maplist(bind, Bindings).
 1822rebind_cycles(Reply, Reply).
 1823
 1824bind(Var = Value) :-
 1825    Var = Value.
 1826
 1827server_url(Server, Action, Params, URL) :-
 1828    atom_concat('pengine/', Action, PAction),
 1829    uri_edit([ path(PAction),
 1830               search(Params)
 1831             ], Server, URL).
 1832
 1833
 1834/** pengine_event(?EventTerm) is det.
 1835    pengine_event(?EventTerm, +Options) is det.
 1836
 1837Examines the pengine's event queue  and   if  necessary blocks execution
 1838until a term that unifies to Term  arrives   in  the queue. After a term
 1839from the queue has been unified to Term,   the  term is deleted from the
 1840queue.
 1841
 1842   Valid options are:
 1843
 1844   * timeout(+Time)
 1845     Time is a float or integer and specifies the maximum time to wait
 1846     in seconds. If no event has arrived before the time is up EventTerm
 1847     is bound to the atom =timeout=.
 1848   * listen(+Id)
 1849     Only listen to events from the pengine identified by Id.
 1850*/
 1851
 1852pengine_event(Event) :-
 1853    pengine_event(Event, []).
 1854
 1855pengine_event(Event, Options) :-
 1856    thread_self(Self),
 1857    option(listen(Id), Options, _),
 1858    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1859    ->  true
 1860    ;   Event = timeout
 1861    ),
 1862    update_remote_destroy(Event).
 1863
 1864update_remote_destroy(Event) :-
 1865    destroy_event(Event),
 1866    arg(1, Event, Id),
 1867    pengine_remote(Id, _Server),
 1868    !,
 1869    pengine_unregister_remote(Id).
 1870update_remote_destroy(_).
 1871
 1872destroy_event(destroy(_)).
 1873destroy_event(destroy(_,_)).
 1874destroy_event(create(_,Features)) :-
 1875    memberchk(answer(Answer), Features),
 1876    !,
 1877    nonvar(Answer),
 1878    destroy_event(Answer).
 1879
 1880
 1881/** pengine_event_loop(:Closure, +Options) is det
 1882
 1883Starts an event loop accepting event terms   sent to the current pengine
 1884or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1885closure thus acts as a _handler_  for   the  event. Some events are also
 1886treated specially:
 1887
 1888   * create(ID, Term)
 1889     The ID is placed in a list of active pengines.
 1890
 1891   * destroy(ID)
 1892     The ID is removed from the list of active pengines. When the last
 1893     pengine ID is removed, the loop terminates.
 1894
 1895   * output(ID, Term)
 1896     The predicate pengine_pull_response/2 is called.
 1897
 1898Valid options are:
 1899
 1900   * autoforward(+To)
 1901     Forwards received event terms to slaves. To is either =all=,
 1902     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1903     implemented]
 1904
 1905*/
 1906
 1907pengine_event_loop(Closure, Options) :-
 1908    child(_,_),
 1909    !,
 1910    pengine_event(Event),
 1911    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1912    ->  forall(child(_,ID), pengine_send(ID, Event))
 1913    ;   true
 1914    ),
 1915    pengine_event_loop(Event, Closure, Options).
 1916pengine_event_loop(_, _).
 1917
 1918:- meta_predicate
 1919    pengine_process_event(+, 1, -, +). 1920
 1921pengine_event_loop(Event, Closure, Options) :-
 1922    pengine_process_event(Event, Closure, Continue, Options),
 1923    (   Continue == true
 1924    ->  pengine_event_loop(Closure, Options)
 1925    ;   true
 1926    ).
 1927
 1928pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1929    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1930    (   select(answer(First), T, T1)
 1931    ->  ignore(call(Closure, create(ID, T1))),
 1932        pengine_process_event(First, Closure, Continue, Options)
 1933    ;   ignore(call(Closure, create(ID, T))),
 1934        Continue = true
 1935    ).
 1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1937    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1938    ignore(call(Closure, output(ID, Msg))),
 1939    pengine_pull_response(ID, []).
 1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1941    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1942    ignore(call(Closure, debug(ID, Msg))),
 1943    pengine_pull_response(ID, []).
 1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1945    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1946    ignore(call(Closure, prompt(ID, Term))).
 1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1948    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1949    ignore(call(Closure, success(ID, Sol, More))).
 1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1951    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1952    ignore(call(Closure, failure(ID))).
 1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1954    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1955    (   call(Closure, error(ID, Error))
 1956    ->  Continue = true
 1957    ;   forall(child(_,Child), pengine_destroy(Child)),
 1958        throw(Error)
 1959    ).
 1960pengine_process_event(stop(ID), Closure, true, _Options) :-
 1961    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1962    ignore(call(Closure, stop(ID))).
 1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1964    pengine_process_event(Event, Closure, _, Options),
 1965    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1966pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1967    retractall(child(_,ID)),
 1968    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1969    ignore(call(Closure, destroy(ID))).
 1970
 1971
 1972/** pengine_rpc(+URL, +Query) is nondet.
 1973    pengine_rpc(+URL, +Query, +Options) is nondet.
 1974
 1975Semantically equivalent to the sequence below,  except that the query is
 1976executed in (and in the Prolog context   of) the pengine server referred
 1977to by URL, rather than locally.
 1978
 1979  ==
 1980    copy_term_nat(Query, Copy),  % attributes are not copied to the server
 1981    call(Copy),			 % executed on server at URL
 1982    Query = Copy.
 1983  ==
 1984
 1985Valid options are:
 1986
 1987    - chunk(+IntegerOrFalse)
 1988      Can be used to reduce the number of network roundtrips being made.
 1989      See pengine_ask/3.
 1990    - timeout(+Time)
 1991      Wait at most Time seconds for the next event from the server.
 1992      The default is defined by the setting `pengines:time_limit`.
 1993
 1994Remaining  options  (except   the   server    option)   are   passed  to
 1995pengine_create/1.
 1996*/
 1997
 1998pengine_rpc(URL, Query) :-
 1999    pengine_rpc(URL, Query, []).
 2000
 2001pengine_rpc(URL, Query, M:Options0) :-
 2002    translate_local_sources(Options0, Options1, M),
 2003    (  option(timeout(_), Options1)
 2004    -> Options = Options1
 2005    ;  setting(time_limit, Limit),
 2006       Options = [timeout(Limit)|Options1]
 2007    ),
 2008    term_variables(Query, Vars),
 2009    Template =.. [v|Vars],
 2010    State = destroy(true),              % modified by process_event/4
 2011    setup_call_catcher_cleanup(
 2012        pengine_create([ ask(Query),
 2013                         template(Template),
 2014                         server(URL),
 2015                         id(Id)
 2016                       | Options
 2017                       ]),
 2018        wait_event(Template, State, [listen(Id)|Options]),
 2019        Why,
 2020        pengine_destroy_and_wait(State, Id, Why, Options)).
 2021
 2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
 2023    !,
 2024    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2025    pengine_destroy(Id, Options),
 2026    wait_destroy(Id, 10).
 2027pengine_destroy_and_wait(_, _, Why, _) :-
 2028    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2029
 2030wait_destroy(Id, _) :-
 2031    \+ child(_, Id),
 2032    !.
 2033wait_destroy(Id, N) :-
 2034    pengine_event(Event, [listen(Id),timeout(10)]),
 2035    !,
 2036    (   destroy_event(Event)
 2037    ->  retractall(child(_,Id))
 2038    ;   succ(N1, N)
 2039    ->  wait_destroy(Id, N1)
 2040    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2041        pengine_unregister_remote(Id),
 2042        retractall(child(_,Id))
 2043    ).
 2044
 2045wait_event(Template, State, Options) :-
 2046    pengine_event(Event, Options),
 2047    debug(pengine(event), 'Received ~p', [Event]),
 2048    process_event(Event, Template, State, Options).
 2049
 2050process_event(create(_ID, Features), Template, State, Options) :-
 2051    memberchk(answer(First), Features),
 2052    process_event(First, Template, State, Options).
 2053process_event(error(_ID, Error), _Template, _, _Options) :-
 2054    throw(Error).
 2055process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2056    fail.
 2057process_event(prompt(ID, Prompt), Template, State, Options) :-
 2058    pengine_rpc_prompt(ID, Prompt, Reply),
 2059    pengine_send(ID, input(Reply)),
 2060    wait_event(Template, State, Options).
 2061process_event(output(ID, Term), Template, State, Options) :-
 2062    pengine_rpc_output(ID, Term),
 2063    pengine_pull_response(ID, Options),
 2064    wait_event(Template, State, Options).
 2065process_event(debug(ID, Message), Template, State, Options) :-
 2066    debug(pengine(debug), '~w', [Message]),
 2067    pengine_pull_response(ID, Options),
 2068    wait_event(Template, State, Options).
 2069process_event(success(_ID, Solutions, _Proj, _Time, false),
 2070              Template, _, _Options) :-
 2071    !,
 2072    member(Template, Solutions).
 2073process_event(success(ID, Solutions, _Proj, _Time, true),
 2074              Template, State, Options) :-
 2075    (   member(Template, Solutions)
 2076    ;   pengine_next(ID, Options),
 2077        wait_event(Template, State, Options)
 2078    ).
 2079process_event(destroy(ID, Event), Template, State, Options) :-
 2080    !,
 2081    retractall(child(_,ID)),
 2082    nb_setarg(1, State, false),
 2083    debug(pengine(destroy), 'State: ~p~n', [State]),
 2084    process_event(Event, Template, State, Options).
 2085% compatibility with older versions of the protocol.
 2086process_event(success(ID, Solutions, Time, More),
 2087              Template, State, Options) :-
 2088    process_event(success(ID, Solutions, _Proj, Time, More),
 2089                  Template, State, Options).
 2090
 2091
 2092pengine_rpc_prompt(ID, Prompt, Term) :-
 2093    prompt(ID, Prompt, Term0),
 2094    !,
 2095    Term = Term0.
 2096pengine_rpc_prompt(_ID, Prompt, Term) :-
 2097    setup_call_cleanup(
 2098        prompt(Old, Prompt),
 2099        read(Term),
 2100        prompt(_, Old)).
 2101
 2102pengine_rpc_output(ID, Term) :-
 2103    output(ID, Term),
 2104    !.
 2105pengine_rpc_output(_ID, Term) :-
 2106    print(Term).
 2107
 2108%%  prompt(+ID, +Prompt, -Term) is semidet.
 2109%
 2110%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 2111%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 2112
 2113:- multifile prompt/3. 2114
 2115%%  output(+ID, +Term) is semidet.
 2116%
 2117%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 2118%   fails, it calls print/1 on Term.
 2119
 2120:- multifile output/2. 2121
 2122
 2123/*================= HTTP handlers =======================
 2124*/
 2125
 2126%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2127%   time_limit(inifinite) because pengines have their  own timeout. Also
 2128%   note that we use spawn. This  is   needed  because we can easily get
 2129%   many clients waiting for  some  action   on  a  pengine to complete.
 2130%   Without spawning, we would quickly exhaust   the  worker pool of the
 2131%   HTTP server.
 2132%
 2133%   FIXME: probably we should wait for a   short time for the pengine on
 2134%   the default worker thread. Only if  that   time  has expired, we can
 2135%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2136%   improve the performance and reduce the usage of threads.
 2137
 2138:- http_handler(root(pengine),               http_404([]),
 2139                [ id(pengines) ]). 2140:- http_handler(root(pengine/create),        http_pengine_create,
 2141                [ time_limit(infinite), spawn([]) ]). 2142:- http_handler(root(pengine/send),          http_pengine_send,
 2143                [ time_limit(infinite), spawn([]) ]). 2144:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
 2145                [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(root(pengine/abort),         http_pengine_abort,         []). 2147:- http_handler(root(pengine/detach),        http_pengine_detach,        []). 2148:- http_handler(root(pengine/list),          http_pengine_list,          []). 2149:- http_handler(root(pengine/ping),          http_pengine_ping,          []). 2150:- http_handler(root(pengine/destroy_all),   http_pengine_destroy_all,   []). 2151
 2152:- http_handler(root(pengine/'pengines.js'),
 2153                http_reply_file(library('http/web/js/pengines.js'), []), []). 2154:- http_handler(root(pengine/'plterm.css'),
 2155                http_reply_file(library('http/web/css/plterm.css'), []), []). 2156
 2157
 2158%%  http_pengine_create(+Request)
 2159%
 2160%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 2161%   pengine  creation  parameters  both  as  =application/json=  and  as
 2162%   =www-form-encoded=.  Accepted parameters:
 2163%
 2164%     | **Parameter** | **Default**       | **Comment**                |
 2165%     |---------------|-------------------|----------------------------|
 2166%     | format        | `prolog`          | Output format              |
 2167%     | application   | `pengine_sandbox` | Pengine application        |
 2168%     | chunk         | 1                 | Chunk-size for results     |
 2169%     | collate       | 0 (off)           | Join output events         |
 2170%     | solutions     | chunked           | If `all`, emit all results |
 2171%     | ask           | -                 | The query                  |
 2172%     | template      | -                 | Output template            |
 2173%     | src_text      | ""                | Program                    |
 2174%     | src_url       | -                 | Program to download        |
 2175%     | disposition   | -                 | Download location          |
 2176%
 2177%     Note that solutions=all internally  uses   chunking  to obtain the
 2178%     results from the pengine, but the results are combined in a single
 2179%     HTTP reply. This is currently only  implemented by the CSV backend
 2180%     that is part of SWISH for   downloading unbounded result sets with
 2181%     limited memory resources.
 2182%
 2183%     Using  `chunk=false`  simulates  the   _recursive  toplevel_.  See
 2184%     pengine_ask/3.
 2185
 2186http_pengine_create(Request) :-
 2187    reply_options(Request, [post]),
 2188    !.
 2189http_pengine_create(Request) :-
 2190    memberchk(content_type(CT), Request),
 2191    sub_atom(CT, 0, _, _, 'application/json'),
 2192    !,
 2193    http_read_json_dict(Request, Dict),
 2194    dict_atom_option(format, Dict, Format, prolog),
 2195    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2196    http_pengine_create(Request, Application, Format, Dict).
 2197http_pengine_create(Request) :-
 2198    Optional = [optional(true)],
 2199    OptString = [string|Optional],
 2200    Form = [ format(Format, [default(prolog)]),
 2201             application(Application, [default(pengine_sandbox)]),
 2202             chunk(_, [nonneg;oneof([false]), default(1)]),
 2203             collate(_, [number, default(0)]),
 2204             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2205             ask(_, OptString),
 2206             template(_, OptString),
 2207             src_text(_, OptString),
 2208             disposition(_, OptString),
 2209             src_url(_, Optional)
 2210           ],
 2211    http_parameters(Request, Form),
 2212    form_dict(Form, Dict),
 2213    http_pengine_create(Request, Application, Format, Dict).
 2214
 2215dict_atom_option(Key, Dict, Atom, Default) :-
 2216    (   get_dict(Key, Dict, String)
 2217    ->  atom_string(Atom, String)
 2218    ;   Atom = Default
 2219    ).
 2220
 2221form_dict(Form, Dict) :-
 2222    form_values(Form, Pairs),
 2223    dict_pairs(Dict, _, Pairs).
 2224
 2225form_values([], []).
 2226form_values([H|T], Pairs) :-
 2227    arg(1, H, Value),
 2228    nonvar(Value),
 2229    !,
 2230    functor(H, Name, _),
 2231    Pairs = [Name-Value|PairsT],
 2232    form_values(T, PairsT).
 2233form_values([_|T], Pairs) :-
 2234    form_values(T, Pairs).
 2235
 2236%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2237
 2238
 2239http_pengine_create(Request, Application, Format, Dict) :-
 2240    current_application(Application),
 2241    !,
 2242    allowed(Request, Application),
 2243    authenticate(Request, Application, UserOptions),
 2244    dict_to_options(Dict, Application, CreateOptions0),
 2245    append(UserOptions, CreateOptions0, CreateOptions),
 2246    pengine_uuid(Pengine),
 2247    message_queue_create(Queue, [max_size(25)]),
 2248    setting(Application:time_limit, TimeLimit),
 2249    get_time(Now),
 2250    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2251    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2252    create(Queue, Pengine, CreateOptions, http, Application),
 2253    create_wait_and_output_result(Pengine, Queue, Format,
 2254                                  TimeLimit, Dict),
 2255    gc_abandoned_queues.
 2256http_pengine_create(_Request, Application, Format, _Dict) :-
 2257    Error = existence_error(pengine_application, Application),
 2258    pengine_uuid(ID),
 2259    output_result(ID, Format, error(ID, error(Error, _))).
 2260
 2261
 2262dict_to_options(Dict, Application, CreateOptions) :-
 2263    dict_pairs(Dict, _, Pairs),
 2264    pairs_create_options(Pairs, Application, CreateOptions).
 2265
 2266pairs_create_options([], _, []) :- !.
 2267pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2268    Opt =.. [N,V],
 2269    pengine_create_option(Opt), N \== user,
 2270    !,
 2271    (   create_option_type(Opt, atom)
 2272    ->  atom_string(V, V0)               % term creation must be done if
 2273    ;   V = V0                           % we created the source and know
 2274    ),                                   % the operators.
 2275    pairs_create_options(T0, App, T).
 2276pairs_create_options([_|T0], App, T) :-
 2277    pairs_create_options(T0, App, T).
 2278
 2279%!  wait_and_output_result(+Pengine, +Queue,
 2280%!                         +Format, +TimeLimit, +Collate) is det.
 2281%
 2282%   Wait for the Pengine's Queue and if  there is a message, send it
 2283%   to the requester using  output_result/1.   If  Pengine  does not
 2284%   answer within the time specified   by  the setting =time_limit=,
 2285%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2286%   _).
 2287
 2288wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
 2289    Collate is min(Collate0, TimeLimit/10),
 2290    get_time(Epoch),
 2291    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2292                                 [ timeout(TimeLimit)
 2293                                 ]),
 2294              Error, true)
 2295    ->  (   var(Error)
 2296        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2297            (   collating_event(Collate, Event)
 2298            ->  Deadline is Epoch+TimeLimit,
 2299                collect_events(Pengine, Collate, Queue, Deadline, 100, More),
 2300                Events = [Event|More],
 2301                ignore(destroy_queue_from_http(Pengine, Events, Queue)),
 2302                protect_pengine(Pengine, output_result(Pengine, Format, Events))
 2303            ;   ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2304                protect_pengine(Pengine, output_result(Pengine, Format, Event))
 2305            )
 2306        ;   output_result(Pengine, Format, died(Pengine))
 2307        )
 2308    ;   time_limit_exceeded(Pengine, Format)
 2309    ).
 2310
 2311%!  collect_events(+Pengine, +CollateTime, +Queue, +Deadline, +Max, -Events)
 2312%
 2313%   Collect more events as long as they   are not separated by more than
 2314%   CollateTime seconds and collect at most Max.
 2315
 2316collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
 2317    !.
 2318collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
 2319    debug(pengine(wait), 'Waiting to collate events', []),
 2320    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2321                                 [ timeout(Collate)
 2322                                 ]),
 2323              Error, true)
 2324    ->  (   var(Error)
 2325        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2326            Events = [Event|More],
 2327            (   collating_event(Collate, Event)
 2328            ->  Max2 is Max - 1,
 2329                collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
 2330            ;   More = []
 2331            )
 2332        ;   Events = [died(Pengine)]
 2333        )
 2334    ;   get_time(Now),
 2335        Now > Deadline
 2336    ->  time_limit_event(Pengine, TimeLimitEvent),
 2337        Events = [TimeLimitEvent]
 2338    ;   Events = []
 2339    ).
 2340
 2341collating_event(0, _) :-
 2342    !,
 2343    fail.
 2344collating_event(_, output(_,_)).
 2345
 2346%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2347%!                                +TimeLimit, +Dict) is det.
 2348%
 2349%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2350%   results. Dict may contain a  `disposition`   key  to  denote the
 2351%   download location.
 2352
 2353create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2354    get_dict(solutions, Dict, all),
 2355    !,
 2356    between(1, infinite, Page),
 2357    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2358                                 [ timeout(TimeLimit)
 2359                                 ]),
 2360              Error, true)
 2361    ->  (   var(Error)
 2362        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2363            (   destroy_queue_from_http(Pengine, Event, Queue)
 2364            ->  !,
 2365                protect_pengine(Pengine,
 2366                                output_result_2(Format, page(Page, Event), Dict))
 2367            ;   is_more_event(Event)
 2368            ->  pengine_thread(Pengine, Thread),
 2369                thread_send_message(Thread, pengine_request(next)),
 2370                protect_pengine(Pengine,
 2371                                output_result_2(Format, page(Page, Event), Dict)),
 2372                fail
 2373            ;   !,
 2374                protect_pengine(Pengine,
 2375                                output_result_2(Format, page(Page, Event), Dict))
 2376            )
 2377        ;   !, output_result(Pengine, Format, died(Pengine))
 2378        )
 2379    ;   !, time_limit_exceeded(Pengine, Format)
 2380    ),
 2381    !.
 2382create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2383    wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
 2384
 2385is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2386is_more_event(create(_, Options)) :-
 2387    memberchk(answer(Event), Options),
 2388    is_more_event(Event).
 2389
 2390
 2391
 2392%!  time_limit_exceeded(+Pengine, +Format)
 2393%
 2394%   The Pengine did not reply within its time limit. Send a reply to the
 2395%   client in the requested format and interrupt the Pengine.
 2396%
 2397%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2398%   get the Pengine back to its main   loop.  Unfortunately we only have
 2399%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2400%   which cannot be caught and thus destroys the Pengine.
 2401
 2402time_limit_exceeded(Pengine, Format) :-
 2403    time_limit_event(Pengine, Event),
 2404    call_cleanup(
 2405        pengine_destroy(Pengine, [force(true)]),
 2406        output_result(Pengine, Format, Event)).
 2407
 2408time_limit_event(Pengine,
 2409                 destroy(Pengine, error(Pengine, time_limit_exceeded))).
 2410
 2411destroy_pengine_after_output(Pengine, Events) :-
 2412    is_list(Events),
 2413    last(Events, Last),
 2414    time_limit_event(Pengine,  Last),
 2415    !,
 2416    catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
 2417destroy_pengine_after_output(_, _).
 2418
 2419
 2420%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2421%
 2422%   Consider destroying the output queue   for Pengine after sending
 2423%   Event back to the HTTP client. We can destroy the queue if
 2424%
 2425%     - The pengine already died (output_queue/3 is present) and
 2426%       the queue is empty.
 2427%     - This is a final (destroy) event.
 2428%
 2429%   @tbd    If the client did not request all output, the queue will
 2430%           not be destroyed.  We need some timeout and GC for that.
 2431
 2432destroy_queue_from_http(ID, _, Queue) :-
 2433    output_queue(ID, Queue, _),
 2434    !,
 2435    destroy_queue_if_empty(Queue).
 2436destroy_queue_from_http(ID, Event, Queue) :-
 2437    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2438    is_destroy_event(Event),
 2439    !,
 2440    message_queue_property(Queue, size(Waiting)),
 2441    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2442    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2443
 2444is_destroy_event(destroy(_)).
 2445is_destroy_event(destroy(_,_)).
 2446is_destroy_event(create(_, Options)) :-
 2447    memberchk(answer(Event), Options),
 2448    is_destroy_event(Event).
 2449
 2450destroy_queue_if_empty(Queue) :-
 2451    thread_peek_message(Queue, _),
 2452    !.
 2453destroy_queue_if_empty(Queue) :-
 2454    retractall(output_queue(_, Queue, _)),
 2455    message_queue_destroy(Queue).
 2456
 2457%!  gc_abandoned_queues
 2458%
 2459%   Check whether there are queues  that   have  been abadoned. This
 2460%   happens if the stream contains output events and not all of them
 2461%   are read by the client.
 2462
 2463:- dynamic
 2464    last_gc/1. 2465
 2466gc_abandoned_queues :-
 2467    consider_queue_gc,
 2468    !,
 2469    get_time(Now),
 2470    (   output_queue(_, Queue, Time),
 2471        Now-Time > 15*60,
 2472        retract(output_queue(_, Queue, Time)),
 2473        message_queue_destroy(Queue),
 2474        fail
 2475    ;   retractall(last_gc(_)),
 2476        asserta(last_gc(Now))
 2477    ).
 2478gc_abandoned_queues.
 2479
 2480consider_queue_gc :-
 2481    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2482    N > 100,
 2483    (   last_gc(Time),
 2484        get_time(Now),
 2485        Now-Time > 5*60
 2486    ->  true
 2487    ;   \+ last_gc(_)
 2488    ).
 2489
 2490%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2491%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2492%
 2493%   Handle destruction of the message queue connecting the HTTP side
 2494%   to the pengine. We cannot delete the queue when the pengine dies
 2495%   because the queue may contain output  events. Termination of the
 2496%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2497%   orders. This means we need handle this using synchronization.
 2498%
 2499%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2500%     Called (indirectly) from pengine_done/1 if the pengine's
 2501%     thread dies.
 2502%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2503%     Called from destroy_queue/3, from wait_and_output_result/5,
 2504%     i.e., from the HTTP side.
 2505
 2506:- dynamic output_queue_destroyed/1. 2507
 2508sync_destroy_queue_from_http(ID, Queue) :-
 2509    (   output_queue(ID, Queue, _)
 2510    ->  destroy_queue_if_empty(Queue)
 2511    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2512    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2513              [Queue]),
 2514        get_time(Now),
 2515        asserta(output_queue(ID, Queue, Now))
 2516    ;   message_queue_destroy(Queue),
 2517        asserta(output_queue_destroyed(Queue))
 2518    ).
 2519
 2520%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2521%
 2522%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2523%   terminates. It is called while the mutex `pengine` held.
 2524
 2525sync_destroy_queue_from_pengine(ID, Queue) :-
 2526    (   retract(output_queue_destroyed(Queue))
 2527    ->  true
 2528    ;   get_time(Now),
 2529        asserta(output_queue(ID, Queue, Now))
 2530    ),
 2531    retractall(pengine_queue(ID, Queue, _, _)).
 2532
 2533
 2534http_pengine_send(Request) :-
 2535    reply_options(Request, [get,post]),
 2536    !.
 2537http_pengine_send(Request) :-
 2538    http_parameters(Request,
 2539                    [ id(ID, [ type(atom) ]),
 2540                      event(EventString, [optional(true)]),
 2541                      collate(Collate, [number, default(0)]),
 2542                      format(Format, [default(prolog)])
 2543                    ]),
 2544    catch(read_event(ID, Request, Format, EventString, Event),
 2545          Error,
 2546          true),
 2547    (   var(Error)
 2548    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2549        (   pengine_thread(ID, Thread)
 2550        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2551            random_delay,
 2552            broadcast(pengine(send(ID, Event))),
 2553            thread_send_message(Thread, pengine_request(Event)),
 2554            wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2555        ;   atom(ID)
 2556        ->  pengine_died(Format, ID)
 2557        ;   http_404([], Request)
 2558        )
 2559    ;   Error = error(existence_error(pengine, ID), _)
 2560    ->  pengine_died(Format, ID)
 2561    ;   output_result(ID, Format, error(ID, Error))
 2562    ).
 2563
 2564pengine_died(Format, Pengine) :-
 2565    output_result(Pengine, Format,
 2566                  error(Pengine, error(existence_error(pengine, Pengine),_))).
 2567
 2568
 2569%!  read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
 2570%
 2571%   Read an event on behalve of Pengine.  Note that the pengine's module
 2572%   should not be  deleted  while  we   are  reading  using  its  syntax
 2573%   (module). This is ensured using the `pengine_done` mutex.
 2574%
 2575%   @see pengine_done/0.
 2576
 2577read_event(Pengine, Request, Format, EventString, Event) :-
 2578    protect_pengine(
 2579        Pengine,
 2580        ( get_pengine_module(Pengine, Module),
 2581          read_event_2(Request, EventString, Module, Event0, Bindings)
 2582        )),
 2583    !,
 2584    fix_bindings(Format, Event0, Bindings, Event).
 2585read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2586    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2587    discard_post_data(Request),
 2588    existence_error(pengine, Pengine).
 2589
 2590
 2591%%  read_event_(+Request, +EventString, +Module, -Event, -Bindings)
 2592%
 2593%   Read the sent event. The event is a   Prolog  term that is either in
 2594%   the =event= parameter or as a posted document.
 2595
 2596read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2597    nonvar(EventString),
 2598    !,
 2599    term_string(Event, EventString,
 2600                [ variable_names(Bindings),
 2601                  module(Module)
 2602                ]).
 2603read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2604    option(method(post), Request),
 2605    http_read_data(Request,     Event,
 2606                   [ content_type('application/x-prolog'),
 2607                     module(Module),
 2608                     variable_names(Bindings)
 2609                   ]).
 2610
 2611%%  discard_post_data(+Request) is det.
 2612%
 2613%   If this is a POST request, discard the posted data.
 2614
 2615discard_post_data(Request) :-
 2616    option(method(post), Request),
 2617    !,
 2618    setup_call_cleanup(
 2619        open_null_stream(NULL),
 2620        http_read_data(Request, _, [to(stream(NULL))]),
 2621        close(NULL)).
 2622discard_post_data(_).
 2623
 2624%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2625%
 2626%   Generate the template for json(-s) Format  from the variables in
 2627%   the asked Goal. Variables starting  with an underscore, followed
 2628%   by an capital letter are ignored from the template.
 2629
 2630fix_bindings(Format,
 2631             ask(Goal, Options0), Bindings,
 2632             ask(Goal, NewOptions)) :-
 2633    json_lang(Format),
 2634    !,
 2635    exclude(anon, Bindings, NamedBindings),
 2636    template(NamedBindings, Template, Options0, Options1),
 2637    select_option(chunk(Paging), Options1, Options2, 1),
 2638    NewOptions = [ template(Template),
 2639                   chunk(Paging),
 2640                   bindings(NamedBindings)
 2641                 | Options2
 2642                 ].
 2643fix_bindings(_, Command, _, Command).
 2644
 2645template(_, Template, Options0, Options) :-
 2646    select_option(template(Template), Options0, Options),
 2647    !.
 2648template(Bindings, Template, Options, Options) :-
 2649    dict_create(Template, swish_default_template, Bindings).
 2650
 2651anon(Name=_) :-
 2652    sub_atom(Name, 0, _, _, '_'),
 2653    sub_atom(Name, 1, 1, _, Next),
 2654    char_type(Next, prolog_var_start).
 2655
 2656var_name(Name=_, Name).
 2657
 2658
 2659%!  json_lang(+Format) is semidet.
 2660%
 2661%   True if Format is a JSON variation.
 2662
 2663json_lang(json) :- !.
 2664json_lang(Format) :-
 2665    sub_atom(Format, 0, _, _, 'json-').
 2666
 2667%!  http_pengine_pull_response(+Request)
 2668%
 2669%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2670%   messages from the pengine.
 2671
 2672http_pengine_pull_response(Request) :-
 2673    reply_options(Request, [get]),
 2674    !.
 2675http_pengine_pull_response(Request) :-
 2676    http_parameters(Request,
 2677            [   id(ID, []),
 2678                format(Format, [default(prolog)]),
 2679                collate(Collate, [number, default(0)])
 2680            ]),
 2681    reattach(ID),
 2682    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2683        ->  true
 2684        ;   output_queue(ID, Queue, _),
 2685            TimeLimit = 0
 2686        )
 2687    ->  wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2688    ;   http_404([], Request)
 2689    ).
 2690
 2691%!  http_pengine_abort(+Request)
 2692%
 2693%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2694%   any time and the reply may  be   handled  by a pull_response. In
 2695%   that case, our  pengine  has  already   died  before  we  get to
 2696%   wait_and_output_result/5.
 2697
 2698http_pengine_abort(Request) :-
 2699    reply_options(Request, [get,post]),
 2700    !.
 2701http_pengine_abort(Request) :-
 2702    http_parameters(Request,
 2703            [   id(ID, [])
 2704            ]),
 2705    (   pengine_thread(ID, _Thread)
 2706    ->  broadcast(pengine(abort(ID))),
 2707        abort_pending_output(ID),
 2708        pengine_abort(ID),
 2709        reply_json_dict(true)
 2710    ;   http_404([], Request)
 2711    ).
 2712
 2713%!  http_pengine_detach(+Request)
 2714%
 2715%   Detach a Pengine while keeping it running.  This has the following
 2716%   consequences:
 2717%
 2718%     - `/destroy_all` including the id of this pengine is ignored.
 2719%     - Output from the pengine is stored in the queue without
 2720%       waiting for the queue to drain.
 2721%     - The Pengine becomes available through `/list`
 2722
 2723http_pengine_detach(Request) :-
 2724    reply_options(Request, [post]),
 2725    !.
 2726http_pengine_detach(Request) :-
 2727    http_parameters(Request,
 2728                    [ id(ID, [])
 2729                    ]),
 2730    http_read_json_dict(Request, ClientData),
 2731    (   pengine_property(ID, application(Application)),
 2732        allowed(Request, Application),
 2733        authenticate(Request, Application, _UserOptions)
 2734    ->  broadcast(pengine(detach(ID))),
 2735        get_time(Now),
 2736        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2737        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2738        message_queue_set(Queue, max_size(1000)),
 2739        pengine_reply(Queue, detached(ID)),
 2740        reply_json_dict(true)
 2741    ;   http_404([], Request)
 2742    ).
 2743
 2744reattach(ID) :-
 2745    (   retract(pengine_detached(ID, _Data)),
 2746        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2747    ->  message_queue_set(Queue, max_size(25))
 2748    ;   true
 2749    ).
 2750
 2751
 2752%!  http_pengine_destroy_all(+Request)
 2753%
 2754%   Destroy a list of pengines. Normally   called  by pengines.js if the
 2755%   browser window is closed.
 2756
 2757http_pengine_destroy_all(Request) :-
 2758    reply_options(Request, [get,post]),
 2759    !.
 2760http_pengine_destroy_all(Request) :-
 2761    http_parameters(Request,
 2762                    [ ids(IDsAtom, [])
 2763                    ]),
 2764    atomic_list_concat(IDs, ',', IDsAtom),
 2765    forall(( member(ID, IDs),
 2766             \+ pengine_detached(ID, _)
 2767           ),
 2768           pengine_destroy(ID, [force(true)])),
 2769    reply_json_dict("ok").
 2770
 2771%!  http_pengine_ping(+Request)
 2772%
 2773%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2774%   alive and event status(Pengine, Stats) is created, where `Stats`
 2775%   is the return of thread_statistics/2.
 2776
 2777http_pengine_ping(Request) :-
 2778    reply_options(Request, [get]),
 2779    !.
 2780http_pengine_ping(Request) :-
 2781    http_parameters(Request,
 2782                    [ id(Pengine, []),
 2783                      format(Format, [default(prolog)])
 2784                    ]),
 2785    (   pengine_thread(Pengine, Thread),
 2786        Error = error(_,_),
 2787        catch(thread_statistics(Thread, Stats), Error, fail)
 2788    ->  output_result(Pengine, Format, ping(Pengine, Stats))
 2789    ;   output_result(Pengine, Format, died(Pengine))
 2790    ).
 2791
 2792%!  http_pengine_list(+Request)
 2793%
 2794%   HTTP  handler  for  `/pengine/list`,   providing  information  about
 2795%   running Pengines.
 2796%
 2797%   @tbd Only list detached Pengines associated to the logged in user.
 2798
 2799http_pengine_list(Request) :-
 2800    reply_options(Request, [get]),
 2801    !.
 2802http_pengine_list(Request) :-
 2803    http_parameters(Request,
 2804                    [ status(Status, [default(detached), oneof([detached])]),
 2805                      application(Application, [default(pengine_sandbox)])
 2806                    ]),
 2807    allowed(Request, Application),
 2808    authenticate(Request, Application, _UserOptions),
 2809    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2810    reply_json_dict(json{pengines: Terms}).
 2811
 2812listed_pengine(Application, detached, State) :-
 2813    State = pengine{id:Id,
 2814                    detached:Time,
 2815                    queued:Queued,
 2816                    stats:Stats},
 2817
 2818    pengine_property(Id, application(Application)),
 2819    pengine_property(Id, detached(Time)),
 2820    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2821    message_queue_property(Queue, size(Queued)),
 2822    (   pengine_thread(Id, Thread),
 2823        catch(thread_statistics(Thread, Stats), _, fail)
 2824    ->  true
 2825    ;   Stats = thread{status:died}
 2826    ).
 2827
 2828
 2829%!  output_result(+Pengine, +Format, +EventTerm) is det.
 2830%!  output_result(+Pengine, +Format, +EventTerm, +OptionsDict) is det.
 2831%
 2832%   Formulate an HTTP response from a pengine event term. Format is
 2833%   one of =prolog=, =json= or =json-s=.
 2834%
 2835%   @arg EventTerm is either a single event or a list of events.
 2836
 2837:- dynamic
 2838    pengine_replying/2.             % +Pengine, +Thread
 2839
 2840output_result(Pengine, Format, Event) :-
 2841    thread_self(Thread),
 2842    cors_enable,            % contingent on http:cors setting
 2843    disable_client_cache,
 2844    setup_call_cleanup(
 2845        asserta(pengine_replying(Pengine, Thread), Ref),
 2846        catch(output_result_2(Format, Event, _{}),
 2847              pengine_abort_output,
 2848              true),
 2849        erase(Ref)),
 2850    destroy_pengine_after_output(Pengine, Event).
 2851
 2852output_result_2(Lang, Event, Dict) :-
 2853    write_result(Lang, Event, Dict),
 2854    !.
 2855output_result_2(prolog, Event, _) :-
 2856    !,
 2857    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2858    write_term(Event,
 2859               [ quoted(true),
 2860                 ignore_ops(true),
 2861                 fullstop(true),
 2862                 blobs(portray),
 2863                 portray_goal(portray_blob),
 2864                 nl(true)
 2865               ]).
 2866output_result_2(Lang, Event, _) :-
 2867    json_lang(Lang),
 2868    !,
 2869    (   event_term_to_json_data(Event, JSON, Lang)
 2870    ->  reply_json_dict(JSON)
 2871    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2872    ).
 2873output_result_2(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2874    domain_error(pengine_format, Lang).
 2875
 2876%!  portray_blob(+Blob, +Options) is det.
 2877%
 2878%   Portray non-text blobs that may  appear   in  output  terms. Not
 2879%   really sure about that. Basically such  terms need to be avoided
 2880%   as they are meaningless outside the process. The generated error
 2881%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2882%   Future versions may include more info, depending on `Type`.
 2883
 2884:- public portray_blob/2.               % called from write-term
 2885portray_blob(Blob, _Options) :-
 2886    blob(Blob, Type),
 2887    writeq('$BLOB'(Type)).
 2888
 2889%!  abort_pending_output(+Pengine) is det.
 2890%
 2891%   If we get an abort, it is possible that output is being produced
 2892%   for the client.  This predicate aborts these threads.
 2893
 2894abort_pending_output(Pengine) :-
 2895    forall(pengine_replying(Pengine, Thread),
 2896           abort_output_thread(Thread)).
 2897
 2898abort_output_thread(Thread) :-
 2899    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2900          error(existence_error(thread, _), _),
 2901          true).
 2902
 2903%!  write_result(+Lang, +Event, +Dict) is semidet.
 2904%
 2905%   Hook that allows for different output formats. The core Pengines
 2906%   library supports `prolog` and various   JSON  dialects. The hook
 2907%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2908%   hook must be used if  a   completely  different output format is
 2909%   desired.
 2910
 2911%!  disable_client_cache
 2912%
 2913%   Make sure the client will not cache our page.
 2914%
 2915%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2916
 2917disable_client_cache :-
 2918    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2919            Pragma: no-cache\r\n\c
 2920            Expires: 0\r\n').
 2921
 2922event_term_to_json_data(Events, JSON, Lang) :-
 2923    is_list(Events),
 2924    !,
 2925    events_to_json_data(Events, JSON, Lang).
 2926event_term_to_json_data(Event, JSON, Lang) :-
 2927    event_to_json(Event, JSON, Lang),
 2928    !.
 2929event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2930                        json{event:success, id:ID, time:Time,
 2931                             data:Bindings, more:More, projection:Projection},
 2932                        json) :-
 2933    !,
 2934    term_to_json(Bindings0, Bindings).
 2935event_term_to_json_data(destroy(ID, Event),
 2936                        json{event:destroy, id:ID, data:JSON},
 2937                        Style) :-
 2938    !,
 2939    event_term_to_json_data(Event, JSON, Style).
 2940event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2941    !,
 2942    (   select(answer(First0), Features0, Features1)
 2943    ->  event_term_to_json_data(First0, First, Style),
 2944        Features = [answer(First)|Features1]
 2945    ;   Features = Features0
 2946    ),
 2947    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2948event_term_to_json_data(destroy(ID, Event),
 2949                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2950    !,
 2951    event_term_to_json_data(Event, JSON, Style).
 2952event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2953    !,
 2954    Error0 = json{event:error, id:ID, data:Message},
 2955    add_error_details(ErrorTerm, Error0, Error),
 2956    message_to_string(ErrorTerm, Message).
 2957event_term_to_json_data(failure(ID, Time),
 2958                        json{event:failure, id:ID, time:Time}, _) :-
 2959    !.
 2960event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2961    functor(EventTerm, F, 1),
 2962    !,
 2963    arg(1, EventTerm, ID).
 2964event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2965    functor(EventTerm, F, 2),
 2966    arg(1, EventTerm, ID),
 2967    arg(2, EventTerm, Data),
 2968    term_to_json(Data, JSON).
 2969
 2970events_to_json_data([], [], _).
 2971events_to_json_data([E|T0], [J|T], Lang) :-
 2972    event_term_to_json_data(E, J, Lang),
 2973    events_to_json_data(T0, T, Lang).
 2974
 2975:- public add_error_details/3. 2976
 2977%%  add_error_details(+Error, +JSON0, -JSON)
 2978%
 2979%   Add format error code and  location   information  to an error. Also
 2980%   used by pengines_io.pl.
 2981
 2982add_error_details(Error, JSON0, JSON) :-
 2983    add_error_code(Error, JSON0, JSON1),
 2984    add_error_location(Error, JSON1, JSON).
 2985
 2986%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2987%
 2988%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2989%   code is the functor name of  the   formal  part  of the error, e.g.,
 2990%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2991%   information:
 2992%
 2993%     - existence_error(Type, Obj)
 2994%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2995%     atomic.
 2996
 2997add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 2998    atom(Type),
 2999    !,
 3000    to_atomic(Obj, Value),
 3001    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 3002add_error_code(error(Formal, _), Error0, Error) :-
 3003    callable(Formal),
 3004    !,
 3005    functor(Formal, Code, _),
 3006    Error = Error0.put(code, Code).
 3007add_error_code(_, Error, Error).
 3008
 3009% What to do with large integers?
 3010to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 3011to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 3012to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 3013to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 3014
 3015
 3016%%  add_error_location(+Error, +JSON0, -JSON) is det.
 3017%
 3018%   Add a =location= property if the  error   can  be  associated with a
 3019%   source location. The location is an   object  with properties =file=
 3020%   and =line= and, if available, the character location in the line.
 3021
 3022add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 3023    atom(Path), integer(Line),
 3024    !,
 3025    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 3026add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 3027    atom(Path), integer(Line), integer(Ch),
 3028    !,
 3029    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 3030add_error_location(_, Term, Term).
 3031
 3032
 3033%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 3034%
 3035%   Hook that translates a Pengine event  structure into a term suitable
 3036%   for reply_json_dict/1, according to the language specification Lang.
 3037%   This can be used to massage general Prolog terms, notably associated
 3038%   with `success(ID, Bindings, Projection, Time, More)` and `output(ID,
 3039%   Term)` into a format suitable for processing at the client side.
 3040
 3041%:- multifile pengines:event_to_json/3.
 3042
 3043
 3044                 /*******************************
 3045                 *        ACCESS CONTROL        *
 3046                 *******************************/
 3047
 3048%!  allowed(+Request, +Application) is det.
 3049%
 3050%   Check whether the peer is allowed to connect.  Returns a
 3051%   =forbidden= header if contact is not allowed.
 3052
 3053allowed(Request, Application) :-
 3054    setting(Application:allow_from, Allow),
 3055    match_peer(Request, Allow),
 3056    setting(Application:deny_from, Deny),
 3057    \+ match_peer(Request, Deny),
 3058    !.
 3059allowed(Request, _Application) :-
 3060    memberchk(request_uri(Here), Request),
 3061    throw(http_reply(forbidden(Here))).
 3062
 3063match_peer(_, Allowed) :-
 3064    memberchk(*, Allowed),
 3065    !.
 3066match_peer(_, []) :- !, fail.
 3067match_peer(Request, Allowed) :-
 3068    http_peer(Request, Peer),
 3069    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3070    (   memberchk(Peer, Allowed)
 3071    ->  true
 3072    ;   member(Pattern, Allowed),
 3073        match_peer_pattern(Pattern, Peer)
 3074    ).
 3075
 3076match_peer_pattern(Pattern, Peer) :-
 3077    ip_term(Pattern, IP),
 3078    ip_term(Peer, IP),
 3079    !.
 3080
 3081ip_term(Peer, Pattern) :-
 3082    split_string(Peer, ".", "", PartStrings),
 3083    ip_pattern(PartStrings, Pattern).
 3084
 3085ip_pattern([], []).
 3086ip_pattern([*], _) :- !.
 3087ip_pattern([S|T0], [N|T]) :-
 3088    number_string(N, S),
 3089    ip_pattern(T0, T).
 3090
 3091
 3092%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 3093%
 3094%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 3095%   an exception.
 3096
 3097authenticate(Request, Application, UserOptions) :-
 3098    authentication_hook(Request, Application, User),
 3099    !,
 3100    must_be(ground, User),
 3101    UserOptions = [user(User)].
 3102authenticate(_, _, []).
 3103
 3104%%  authentication_hook(+Request, +Application, -User) is semidet.
 3105%
 3106%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 3107%   discover whether the server is accessed   by  an authorized user. It
 3108%   can react in three ways:
 3109%
 3110%     - Succeed, binding User to a ground term.  The authentity of the
 3111%       user is available through pengine_user/1.
 3112%     - Fail.  The =/create= succeeds, but the pengine is not associated
 3113%       with a user.
 3114%     - Throw an exception to prevent creation of the pengine.  Two
 3115%       meaningful exceptions are:
 3116%         - throw(http_reply(authorise(basic(Realm))))
 3117%         Start a normal HTTP login challenge (reply 401)
 3118%         - throw(http_reply(forbidden(Path))))
 3119%         Reject the request using a 403 repply.
 3120%
 3121%   @see http_authenticate/3 can be used to implement this hook using
 3122%        default HTTP authentication data.
 3123
 3124pengine_register_user(Options) :-
 3125    option(user(User), Options),
 3126    !,
 3127    pengine_self(Me),
 3128    asserta(pengine_user(Me, User)).
 3129pengine_register_user(_).
 3130
 3131
 3132%%  pengine_user(-User) is semidet.
 3133%
 3134%   True when the pengine was create by  an HTTP request that authorized
 3135%   User.
 3136%
 3137%   @see authentication_hook/3 can be used to extract authorization from
 3138%        the HTTP header.
 3139
 3140pengine_user(User) :-
 3141    pengine_self(Me),
 3142    pengine_user(Me, User).
 3143
 3144%!  reply_options(+Request, +Methods) is semidet.
 3145%
 3146%   Reply the HTTP OPTIONS request
 3147
 3148reply_options(Request, Allowed) :-
 3149    option(method(options), Request),
 3150    !,
 3151    cors_enable(Request,
 3152                [ methods(Allowed)
 3153                ]),
 3154    format('Content-type: text/plain\r\n'),
 3155    format('~n').                   % empty body
 3156
 3157
 3158                 /*******************************
 3159                 *        COMPILE SOURCE        *
 3160                 *******************************/
 3161
 3162/** pengine_src_text(+SrcText, +Module) is det
 3163
 3164Asserts the clauses defined in SrcText in   the  private database of the
 3165current Pengine. This  predicate  processes   the  `src_text'  option of
 3166pengine_create/1.
 3167*/
 3168
 3169pengine_src_text(Src, Module) :-
 3170    pengine_self(Self),
 3171    format(atom(ID), 'pengine://~w/src', [Self]),
 3172    extra_load_options(Self, Options),
 3173    setup_call_cleanup(
 3174        open_chars_stream(Src, Stream),
 3175        load_files(Module:ID,
 3176                   [ stream(Stream),
 3177                     module(Module),
 3178                     silent(true)
 3179                   | Options
 3180                   ]),
 3181        close(Stream)),
 3182    keep_source(Self, ID, Src).
 3183
 3184system:'#file'(File, _Line) :-
 3185    prolog_load_context(stream, Stream),
 3186    set_stream(Stream, file_name(File)),
 3187    set_stream(Stream, record_position(false)),
 3188    set_stream(Stream, record_position(true)).
 3189
 3190%%   pengine_src_url(+URL, +Module) is det
 3191%
 3192%    Asserts the clauses defined in URL in   the private database of the
 3193%    current Pengine. This predicate processes   the `src_url' option of
 3194%    pengine_create/1.
 3195%
 3196%    @tbd: make a sensible guess at the encoding.
 3197
 3198pengine_src_url(URL, Module) :-
 3199    pengine_self(Self),
 3200    uri_encoded(path, URL, Path),
 3201    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3202    extra_load_options(Self, Options),
 3203    (   get_pengine_application(Self, Application),
 3204        setting(Application:debug_info, false)
 3205    ->  setup_call_cleanup(
 3206            http_open(URL, Stream, []),
 3207            ( set_stream(Stream, encoding(utf8)),
 3208              load_files(Module:ID,
 3209                         [ stream(Stream),
 3210                           module(Module)
 3211                         | Options
 3212                         ])
 3213            ),
 3214            close(Stream))
 3215    ;   setup_call_cleanup(
 3216            http_open(URL, TempStream, []),
 3217            ( set_stream(TempStream, encoding(utf8)),
 3218              read_string(TempStream, _, Src)
 3219            ),
 3220            close(TempStream)),
 3221        setup_call_cleanup(
 3222            open_chars_stream(Src, Stream),
 3223            load_files(Module:ID,
 3224                       [ stream(Stream),
 3225                         module(Module)
 3226                       | Options
 3227                       ]),
 3228            close(Stream)),
 3229        keep_source(Self, ID, Src)
 3230    ).
 3231
 3232
 3233extra_load_options(Pengine, Options) :-
 3234    pengine_not_sandboxed(Pengine),
 3235    !,
 3236    Options = [].
 3237extra_load_options(_, [sandboxed(true)]).
 3238
 3239
 3240keep_source(Pengine, ID, SrcText) :-
 3241    get_pengine_application(Pengine, Application),
 3242    setting(Application:debug_info, true),
 3243    !,
 3244    to_string(SrcText, SrcString),
 3245    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3246keep_source(_, _, _).
 3247
 3248to_string(String, String) :-
 3249    string(String),
 3250    !.
 3251to_string(Atom, String) :-
 3252    atom_string(Atom, String),
 3253    !.
 3254
 3255		 /*******************************
 3256		 *            SANDBOX		*
 3257		 *******************************/
 3258
 3259:- multifile
 3260    sandbox:safe_primitive/1. 3261
 3262sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3263sandbox:safe_primitive(pengines:pengine_output(_)).
 3264sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3265
 3266
 3267                 /*******************************
 3268                 *            MESSAGES          *
 3269                 *******************************/
 3270
 3271prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3272    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3273      'This is normally caused by an insufficiently instantiated'-[], nl,
 3274      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3275      'find all possible instantations of Var.'-[]
 3276    ]