(utf8). 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2025, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(statistics),[thread_statistics/2]). 91:- autoload(library(term_to_json),[term_to_json/2]). 92:- autoload(library(thread_pool), 93 [thread_pool_create/3,thread_create_in_pool/4]). 94:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 95:- autoload(library(uri), 96 [ uri_components/2, 97 uri_query_components/2, 98 uri_data/3, 99 uri_data/4, 100 uri_edit/3, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- use_module(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json_dict/1]). 115:- use_module(library(sandbox),[safe_goal/1]). 116 117:- if(exists_source(library(uuid))). 118:- autoload(library(uuid), [uuid/2]). 119:- endif. 120 121 122:- meta_predicate 123 pengine_create(), 124 pengine_rpc(, , ), 125 pengine_event_loop(, ). 126 127:- multifile 128 write_result/3, % +Format, +Event, +Dict 129 event_to_json/3, % +Event, -JSON, +Format 130 prepare_module/3, % +Module, +Application, +Options 131 prepare_goal/3, % +GoalIn, -GoalOut, +Options 132 authentication_hook/3, % +Request, +Application, -User 133 not_sandboxed/2, % +User, +App 134 pengine_flush_output_hook/0. 135 136:- predicate_options(pengine_create/1, 1, 137 [ id(-atom), 138 alias(atom), 139 application(atom), 140 destroy(boolean), 141 server(atom), 142 ask(compound), 143 template(compound), 144 chunk(integer;oneof([false])), 145 bindings(list), 146 src_list(list), 147 src_text(any), % text 148 src_url(atom), 149 src_predicates(list) 150 ]). 151:- predicate_options(pengine_ask/3, 3, 152 [ template(any), 153 chunk(integer;oneof([false])), 154 bindings(list) 155 ]). 156:- predicate_options(pengine_next/2, 2, 157 [ chunk(integer), 158 pass_to(pengine_send/3, 3) 159 ]). 160:- predicate_options(pengine_stop/2, 2, 161 [ pass_to(pengine_send/3, 3) 162 ]). 163:- predicate_options(pengine_respond/3, 2, 164 [ pass_to(pengine_send/3, 3) 165 ]). 166:- predicate_options(pengine_rpc/3, 3, 167 [ chunk(integer;oneof([false])), 168 pass_to(pengine_create/1, 1) 169 ]). 170:- predicate_options(pengine_send/3, 3, 171 [ delay(number) 172 ]). 173:- predicate_options(pengine_event/2, 2, 174 [ listen(atom), 175 pass_to(system:thread_get_message/3, 3) 176 ]). 177:- predicate_options(pengine_pull_response/2, 2, 178 [ pass_to(http_open/3, 3) 179 ]). 180:- predicate_options(pengine_event_loop/2, 2, 181 []). % not yet implemented 182 183% :- debug(pengine(transition)). 184:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 185 186goal_expansion(random_delay, Expanded) :- 187 ( debugging(pengine(delay)) 188 -> Expanded = do_random_delay 189 ; Expanded = true 190 ). 191 192do_random_delay :- 193 Delay is random(20)/1000, 194 sleep(Delay). 195 196:- meta_predicate % internal meta predicates 197 solve(, , , ), 198 findnsols_no_empty(, , , ), 199 pengine_event_loop(, , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
254pengine_create(M:Options0) :-
255 translate_local_sources(Options0, Options, M),
256 ( select_option(server(BaseURL), Options, RestOptions)
257 -> remote_pengine_create(BaseURL, RestOptions)
258 ; local_pengine_create(Options)
259 ).src_predicates and src_list options into
src_text. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
273translate_local_sources(OptionsIn, Options, Module) :- 274 translate_local_sources(OptionsIn, Sources, Options2, Module), 275 ( Sources == [] 276 -> Options = Options2 277 ; Sources = [Source] 278 -> Options = [src_text(Source)|Options2] 279 ; atomics_to_string(Sources, Source) 280 -> Options = [src_text(Source)|Options2] 281 ). 282 283translate_local_sources([], [], [], _). 284translate_local_sources([H0|T], [S0|S], Options, M) :- 285 nonvar(H0), 286 translate_local_source(H0, S0, M), 287 !, 288 translate_local_sources(T, S, Options, M). 289translate_local_sources([H|T0], S, [H|T], M) :- 290 translate_local_sources(T0, S, T, M). 291 292translate_local_source(src_predicates(PIs), Source, M) :- 293 must_be(list, PIs), 294 with_output_to(string(Source), 295 maplist(list_in_module(M), PIs)). 296translate_local_source(src_list(Terms), Source, _) :- 297 must_be(list, Terms), 298 with_output_to(string(Source), 299 forall(member(Term, Terms), 300 format('~k .~n', [Term]))). 301translate_local_source(src_text(Source), Source, _). 302 303list_in_module(M, PI) :- 304 listing(M:PI).
pengine_send(NameOrID, Term, []).
*/
311pengine_send(Target, Event) :-
312 pengine_send(Target, Event, []).Any remaining options are passed to http_open/3. */
327pengine_send(Target, Event, Options) :- 328 must_be(atom, Target), 329 pengine_send2(Target, Event, Options). 330 331pengine_send2(self, Event, Options) :- 332 !, 333 thread_self(Queue), 334 delay_message(queue(Queue), Event, Options). 335pengine_send2(Name, Event, Options) :- 336 child(Name, Target), 337 !, 338 delay_message(pengine(Target), Event, Options). 339pengine_send2(Target, Event, Options) :- 340 delay_message(pengine(Target), Event, Options). 341 342delay_message(Target, Event, Options) :- 343 option(delay(Delay), Options), 344 !, 345 alarm(Delay, 346 send_message(Target, Event, Options), 347 _AlarmID, 348 [remove(true)]). 349delay_message(Target, Event, Options) :- 350 random_delay, 351 send_message(Target, Event, Options). 352 353send_message(queue(Queue), Event, _) :- 354 thread_send_message(Queue, pengine_request(Event)). 355send_message(pengine(Pengine), Event, Options) :- 356 ( pengine_remote(Pengine, Server) 357 -> remote_pengine_send(Server, Pengine, Event, Options) 358 ; pengine_thread(Pengine, Thread) 359 -> thread_send_message(Thread, pengine_request(Event)) 360 ; existence_error(pengine, Pengine) 361 ).
idle_limit setting while using thread_idle/2 to minimis
resources.371pengine_request(Request) :- 372 thread_self(Me), 373 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 374 !. 375pengine_request(Request) :- 376 pengine_self(Self), 377 get_pengine_application(Self, Application), 378 setting(Application:idle_limit, IdleLimit0), 379 IdleLimit is IdleLimit0-1, 380 thread_self(Me), 381 ( thread_idle(thread_get_message(Me, pengine_request(Request), 382 [timeout(IdleLimit)]), 383 long) 384 -> true 385 ; Request = destroy 386 ).
If the message cannot be sent within the idle_limit setting of
the pengine, abort the pengine.
399pengine_reply(Event) :- 400 pengine_parent(Queue), 401 pengine_reply(Queue, Event). 402 403pengine_reply(_Queue, _Event0) :- 404 nb_current(pengine_idle_limit_exceeded, true), 405 !. 406pengine_reply(Queue, Event0) :- 407 arg(1, Event0, ID), 408 wrap_first_answer(ID, Event0, Event), 409 random_delay, 410 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 411 ( pengine_self(ID), 412 \+ pengine_detached(ID, _) 413 -> get_pengine_application(ID, Application), 414 setting(Application:idle_limit, IdleLimit), 415 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 416 ( thread_send_message(Queue, pengine_event(ID, Event), 417 [ timeout(IdleLimit) 418 ]) 419 -> true 420 ; thread_self(Me), 421 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 422 [ID, Me]), 423 nb_setval(pengine_idle_limit_exceeded, true), 424 thread_detach(Me), 425 abort 426 ) 427 ; thread_send_message(Queue, pengine_event(ID, Event)) 428 ). 429 430wrap_first_answer(ID, Event0, CreateEvent) :- 431 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 432 arg(1, CreateEvent, ID), 433 !, 434 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 435wrap_first_answer(_ID, Event, Event). 436 437 438empty_queue :- 439 pengine_parent(Queue), 440 empty_queue(Queue, 0, Discarded), 441 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 442 443empty_queue(Queue, C0, C) :- 444 thread_get_message(Queue, _Term, [timeout(0)]), 445 !, 446 C1 is C0+1, 447 empty_queue(Queue, C1, C). 448empty_queue(_, C, C).
Options is a list of options:
false, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive).Name = Var terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true or false, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :-
partition(pengine_ask_option, Options, AskOptions, SendOptions),
pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
517pengine_ask(ID, Query, Options) :- 518 partition(pengine_ask_option, Options, AskOptions, SendOptions), 519 pengine_send(ID, ask(Query, AskOptions), SendOptions). 520 521 522pengine_ask_option(template(_)). 523pengine_ask_option(chunk(_)). 524pengine_ask_option(bindings(_)). 525pengine_ask_option(breakpoints(_)).
chunk(false).Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :-
pengine_send(ID, next, Options).
*/
570pengine_next(ID, Options) :- 571 select_option(chunk(Count), Options, Options1), 572 !, 573 pengine_send(ID, next(Count), Options1). 574pengine_next(ID, Options) :- 575 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :-
pengine_send(ID, stop, Options).
*/
591pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
602pengine_abort(Name) :-
603 ( child(Name, Pengine)
604 -> true
605 ; Pengine = Name
606 ),
607 ( pengine_remote(Pengine, Server)
608 -> remote_pengine_abort(Server, Pengine, [])
609 ; pengine_thread(Pengine, Thread),
610 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
611 catch(thread_signal(Thread, throw(abort_query)), _, true)
612 ).force(true), the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/622pengine_destroy(ID) :- 623 pengine_destroy(ID, []). 624 625pengine_destroy(Name, Options) :- 626 ( child(Name, ID) 627 -> true 628 ; ID = Name 629 ), 630 option(force(true), Options), 631 !, 632 ( pengine_thread(ID, Thread) 633 -> catch(thread_signal(Thread, abort), 634 error(existence_error(thread, _), _), true) 635 ; true 636 ). 637pengine_destroy(ID, Options) :- 638 catch(pengine_send(ID, destroy, Options), 639 error(existence_error(pengine, ID), _), 640 retractall(child(_,ID))). 641 642 643/*================= pengines administration ======================= 644*/
thread(ThreadId)remote(URL)655:- dynamic 656 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 657 pengine_queue/4, % Id, Queue, TimeOut, Time 658 output_queue/3, % Id, Queue, Time 659 pengine_user/2, % Id, User 660 pengine_data/2, % Id, Data 661 pengine_detached/2. % Id, Data 662:- volatile 663 current_pengine/6, 664 pengine_queue/4, 665 output_queue/3, 666 pengine_user/2, 667 pengine_data/2, 668 pengine_detached/2. 669 670:- thread_local 671 child/2. % ?Name, ?Child
677pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 678 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 679 680pengine_register_remote(Id, URL, Application, Destroy) :- 681 thread_self(Queue), 682 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http and the queue is the
message queue used to send events to the HTTP workers.690pengine_unregister(Id) :- 691 thread_self(Me), 692 ( current_pengine(Id, Queue, Me, http, _, _) 693 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 694 ; true 695 ), 696 retractall(current_pengine(Id, _, Me, _, _, _)), 697 retractall(pengine_user(Id, _)), 698 retractall(pengine_data(Id, _)). 699 700pengine_unregister_remote(Id) :- 701 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
707pengine_self(Id) :- 708 thread_self(Thread), 709 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 710 711pengine_parent(Parent) :- 712 nb_getval(pengine_parent, Parent). 713 714pengine_thread(Pengine, Thread) :- 715 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 716 Thread \== 0, 717 !. 718 719pengine_remote(Pengine, URL) :- 720 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 721 722get_pengine_application(Pengine, Application) :- 723 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 724 !. 725 726get_pengine_module(Pengine, Pengine). 727 728:- if(current_predicate(uuid/2)). 729pengine_uuid(Id) :- 730 uuid(Id, [version(4)]). % Version 4 is random. 731:- else. 732pengine_uuid(Id) :- 733 ( current_prolog_flag(max_integer, Max1) 734 -> Max is Max1-1 735 ; Max is 1<<128 736 ), 737 random_between(0, Max, Num), 738 atom_number(Id, Num). 739:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
756:- meta_predicate protect_pengine(, ). 757 758protect_pengine(Id, Goal) :- 759 term_hash(Id, Hash), 760 LockN is Hash mod 64, 761 atom_concat(pengine_done_, LockN, Lock), 762 with_mutex(Lock, 763 ( pengine_thread(Id, _) 764 -> Goal 765 ; Goal 766 )).
pengine_sandbox. The example below creates a new application
address_book and imports the API defined in the module file
adress_book_api.pl into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
783pengine_application(Application) :- 784 throw(error(context_error(nodirective, 785 pengine_application(Application)), _)). 786 787:- multifile 788 system:term_expansion/2, 789 current_application/1.
797current_pengine_application(Application) :- 798 current_application(Application). 799 800 801% Default settings for all applications 802 803:- setting(thread_pool_size, integer, 100, 804 'Maximum number of pengines this application can run.'). 805:- setting(thread_pool_stacks, list(compound), [], 806 'Maximum stack sizes for pengines this application can run.'). 807:- setting(slave_limit, integer, 3, 808 'Maximum number of slave pengines a master pengine can create.'). 809:- setting(time_limit, number, 300, 810 'Maximum time to wait for output'). 811:- setting(idle_limit, number, 300, 812 'Pengine auto-destroys when idle for this time'). 813:- setting(safe_goal_limit, number, 10, 814 'Maximum time to try proving safety of the goal'). 815:- setting(program_space, integer, 100_000_000, 816 'Maximum memory used by predicates'). 817:- setting(allow_from, list(atom), [*], 818 'IP addresses from which remotes are allowed to connect'). 819:- setting(deny_from, list(atom), [], 820 'IP addresses from which remotes are NOT allowed to connect'). 821:- setting(debug_info, boolean, false, 822 'Keep information to support source-level debugging'). 823 824 825systemterm_expansion((:- pengine_application(Application)), Expanded) :- 826 must_be(atom, Application), 827 ( module_property(Application, file(_)) 828 -> permission_error(create, pengine_application, Application) 829 ; true 830 ), 831 expand_term((:- setting(Application:thread_pool_size, integer, 832 setting(pengines:thread_pool_size), 833 'Maximum number of pengines this \c 834 application can run.')), 835 ThreadPoolSizeSetting), 836 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 837 setting(pengines:thread_pool_stacks), 838 'Maximum stack sizes for pengines \c 839 this application can run.')), 840 ThreadPoolStacksSetting), 841 expand_term((:- setting(Application:slave_limit, integer, 842 setting(pengines:slave_limit), 843 'Maximum number of local slave pengines \c 844 a master pengine can create.')), 845 SlaveLimitSetting), 846 expand_term((:- setting(Application:time_limit, number, 847 setting(pengines:time_limit), 848 'Maximum time to wait for output')), 849 TimeLimitSetting), 850 expand_term((:- setting(Application:idle_limit, number, 851 setting(pengines:idle_limit), 852 'Pengine auto-destroys when idle for this time')), 853 IdleLimitSetting), 854 expand_term((:- setting(Application:safe_goal_limit, number, 855 setting(pengines:safe_goal_limit), 856 'Maximum time to try proving safety of the goal')), 857 SafeGoalLimitSetting), 858 expand_term((:- setting(Application:program_space, integer, 859 setting(pengines:program_space), 860 'Maximum memory used by predicates')), 861 ProgramSpaceSetting), 862 expand_term((:- setting(Application:allow_from, list(atom), 863 setting(pengines:allow_from), 864 'IP addresses from which remotes are allowed \c 865 to connect')), 866 AllowFromSetting), 867 expand_term((:- setting(Application:deny_from, list(atom), 868 setting(pengines:deny_from), 869 'IP addresses from which remotes are NOT \c 870 allowed to connect')), 871 DenyFromSetting), 872 expand_term((:- setting(Application:debug_info, boolean, 873 setting(pengines:debug_info), 874 'Keep information to support source-level \c 875 debugging')), 876 DebugInfoSetting), 877 flatten([ pengines:current_application(Application), 878 ThreadPoolSizeSetting, 879 ThreadPoolStacksSetting, 880 SlaveLimitSetting, 881 TimeLimitSetting, 882 IdleLimitSetting, 883 SafeGoalLimitSetting, 884 ProgramSpaceSetting, 885 AllowFromSetting, 886 DenyFromSetting, 887 DebugInfoSetting 888 ], Expanded). 889 890% Register default application 891 892:- pengine_application(pengine_sandbox).
alias option when creating the pengine.true if the pengines is destroyed automatically
after completing the query.debug_info is present.929pengine_property(Id, Prop) :- 930 nonvar(Id), nonvar(Prop), 931 pengine_property2(Prop, Id), 932 !. 933pengine_property(Id, Prop) :- 934 pengine_property2(Prop, Id). 935 936pengine_property2(self(Id), Id) :- 937 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 938pengine_property2(module(Id), Id) :- 939 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 940pengine_property2(alias(Alias), Id) :- 941 child(Alias, Id), 942 Alias \== Id. 943pengine_property2(thread(Thread), Id) :- 944 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 945 Thread \== 0. 946pengine_property2(remote(Server), Id) :- 947 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 948pengine_property2(application(Application), Id) :- 949 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 950pengine_property2(destroy(Destroy), Id) :- 951 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 952pengine_property2(parent(Parent), Id) :- 953 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 954pengine_property2(source(SourceID, Source), Id) :- 955 pengine_data(Id, source(SourceID, Source)). 956pengine_property2(detached(When), Id) :- 957 pengine_detached(Id, When).
964pengine_output(Term) :-
965 pengine_self(Me),
966 pengine_reply(output(Me, Term)).console.log(Message) if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message]). The debug
topic pengine(debug) is enabled by default.
981pengine_debug(Format, Args) :- 982 pengine_parent(Queue), 983 pengine_self(Self), 984 catch(safe_goal(format(atom(_), Format, Args)), E, true), 985 ( var(E) 986 -> format(atom(Message), Format, Args) 987 ; message_to_string(E, Message) 988 ), 989 pengine_reply(Queue, debug(Self, Message)). 990 991 992/*================= Local pengine ======================= 993*/
1004local_pengine_create(Options) :-
1005 thread_self(Self),
1006 option(application(Application), Options, pengine_sandbox),
1007 create(Self, Child, Options, local, Application),
1008 option(alias(Name), Options, Child),
1009 assert(child(Name, Child)).1016:- multifile thread_pool:create_pool/1. 1017 1018thread_poolcreate_pool(Application) :- 1019 current_application(Application), 1020 setting(Application:thread_pool_size, Size), 1021 setting(Application:thread_pool_stacks, Stacks), 1022 thread_pool_create(Application, Size, Stacks).
1032create(Queue, Child, Options, local, Application) :- 1033 !, 1034 pengine_child_id(Child), 1035 create0(Queue, Child, Options, local, Application). 1036create(Queue, Child, Options, URL, Application) :- 1037 pengine_child_id(Child), 1038 catch(create0(Queue, Child, Options, URL, Application), 1039 Error, 1040 create_error(Queue, Child, Error)). 1041 1042pengine_child_id(Child) :- 1043 ( nonvar(Child) 1044 -> true 1045 ; pengine_uuid(Child) 1046 ). 1047 1048create_error(Queue, Child, Error) :- 1049 pengine_reply(Queue, error(Child, Error)). 1050 1051create0(Queue, Child, Options, URL, Application) :- 1052 ( current_application(Application) 1053 -> true 1054 ; existence_error(pengine_application, Application) 1055 ), 1056 ( URL \== http % pengine is _not_ a child of the 1057 % HTTP server thread 1058 -> aggregate_all(count, child(_,_), Count), 1059 setting(Application:slave_limit, Max), 1060 ( Count >= Max 1061 -> throw(error(resource_error(max_pengines), _)) 1062 ; true 1063 ) 1064 ; true 1065 ), 1066 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1067 thread_create_in_pool( 1068 Application, 1069 pengine_main(Queue, PengineOptions, Application), ChildThread, 1070 [ at_exit(pengine_done) 1071 | RestOptions 1072 ]), 1073 option(destroy(Destroy), PengineOptions, true), 1074 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1075 thread_send_message(ChildThread, pengine_registered(Child)), 1076 ( option(id(Id), Options) 1077 -> Id = Child 1078 ; true 1079 ). 1080 1081pengine_create_option(src_text(_)). 1082pengine_create_option(src_url(_)). 1083pengine_create_option(application(_)). 1084pengine_create_option(destroy(_)). 1085pengine_create_option(ask(_)). 1086pengine_create_option(template(_)). 1087pengine_create_option(bindings(_)). 1088pengine_create_option(chunk(_)). 1089pengine_create_option(alias(_)). 1090pengine_create_option(user(_)).
at_exit option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done mutex. See read_event/6.1099:- public 1100 pengine_done/0. 1101 1102pengine_done :- 1103 thread_self(Me), 1104 ( thread_property(Me, status(exception(Ex))), 1105 abort_exception(Ex), 1106 thread_detach(Me), 1107 pengine_self(Pengine) 1108 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1109 error(_,_), true) 1110 ; true 1111 ), 1112 forall(child(_Name, Child), 1113 pengine_destroy(Child)), 1114 pengine_self(Id), 1115 protect_pengine(Id, pengine_unregister(Id)). 1116 1117abort_exception('$aborted'). 1118abort_exception(unwind(abort)).
1125:- thread_local wrap_first_answer_in_create_event/2. 1126 1127:- meta_predicate 1128 pengine_prepare_source(, ). 1129 1130pengine_main(Parent, Options, Application) :- 1131 fix_streams, 1132 thread_get_message(pengine_registered(Self)), 1133 nb_setval(pengine_parent, Parent), 1134 pengine_register_user(Options), 1135 set_prolog_flag(mitigate_spectre, true), 1136 catch(in_temporary_module( 1137 Self, 1138 pengine_prepare_source(Application, Options), 1139 pengine_create_and_loop(Self, Application, Options)), 1140 prepare_source_failed, 1141 pengine_terminate(Self)). 1142 1143pengine_create_and_loop(Self, Application, Options) :- 1144 setting(Application:slave_limit, SlaveLimit), 1145 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1146 ( option(ask(Query0), Options) 1147 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1148 ( string(Query0) % string is not callable 1149 -> ( option(template(TemplateS), Options) 1150 -> Ask2 = Query0-TemplateS 1151 ; Ask2 = Query0 1152 ), 1153 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1154 Error, true), 1155 ( var(Error) 1156 -> true 1157 ; send_error(Error), 1158 throw(prepare_source_failed) 1159 ) 1160 ; Query = Query0, 1161 option(template(Template), Options, Query), 1162 option(bindings(Bindings), Options, []) 1163 ), 1164 option(chunk(Chunk), Options, 1), 1165 pengine_ask(Self, Query, 1166 [ template(Template), 1167 chunk(Chunk), 1168 bindings(Bindings) 1169 ]) 1170 ; Extra = [], 1171 pengine_reply(CreateEvent) 1172 ), 1173 pengine_main_loop(Self).
1183ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1184 !, 1185 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1186 term_string(t(Template1,Ask1), AskTemplate, 1187 [ variable_names(Bindings0), 1188 module(Module) 1189 ]), 1190 phrase(template_bindings(Template1, Bindings0), Bindings). 1191ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1192 term_string(Ask1, Ask, 1193 [ variable_names(Bindings), 1194 module(Module) 1195 ]), 1196 exclude(anon, Bindings, Bindings1), 1197 dict_create(Template, swish_default_template, Bindings1). 1198 1199template_bindings(Var, Bindings) --> 1200 { var(Var) }, !, 1201 ( { var_binding(Bindings, Var, Binding) 1202 } 1203 -> [Binding] 1204 ; [] 1205 ). 1206template_bindings([H|T], Bindings) --> 1207 !, 1208 template_bindings(H, Bindings), 1209 template_bindings(T, Bindings). 1210template_bindings(Compoound, Bindings) --> 1211 { compound(Compoound), !, 1212 compound_name_arguments(Compoound, _, Args) 1213 }, 1214 template_bindings(Args, Bindings). 1215template_bindings(_, _) --> []. 1216 1217var_binding(Bindings, Var, Binding) :- 1218 member(Binding, Bindings), 1219 arg(2, Binding, V), 1220 V == Var, !.
1227fix_streams :- 1228 fix_stream(current_output). 1229 1230fix_stream(Name) :- 1231 is_cgi_stream(Name), 1232 !, 1233 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1234 set_stream(user_output, alias(Name)). 1235fix_stream(_).
1244pengine_prepare_source(Module:Application, Options) :- 1245 setting(Application:program_space, SpaceLimit), 1246 set_module(Module:program_space(SpaceLimit)), 1247 delete_import_module(Module, user), 1248 add_import_module(Module, Application, start), 1249 catch(prep_module(Module, Application, Options), Error, true), 1250 ( var(Error) 1251 -> true 1252 ; send_error(Error), 1253 throw(prepare_source_failed) 1254 ). 1255 1256prep_module(Module, Application, Options) :- 1257 maplist(copy_flag(Module, Application), [var_prefix]), 1258 forall(prepare_module(Module, Application, Options), true), 1259 setup_call_cleanup( 1260 '$set_source_module'(OldModule, Module), 1261 maplist(process_create_option(Module), Options), 1262 '$set_source_module'(OldModule)). 1263 1264copy_flag(Module, Application, Flag) :- 1265 current_prolog_flag(ApplicationFlag, Value), 1266 !, 1267 set_prolog_flag(ModuleFlag, Value). 1268copy_flag(_, _, _). 1269 1270process_create_option(Application, src_text(Text)) :- 1271 !, 1272 pengine_src_text(Text, Application). 1273process_create_option(Application, src_url(URL)) :- 1274 !, 1275 pengine_src_url(URL, Application). 1276process_create_option(_, _).
src_text and
src_url options1299pengine_main_loop(ID) :- 1300 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1301 1302pengine_aborted(ID) :- 1303 thread_self(Self), 1304 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1305 empty_queue, 1306 destroy_or_continue(abort(ID)).
1319guarded_main_loop(ID) :- 1320 pengine_request(Request), 1321 ( Request = destroy 1322 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1323 pengine_terminate(ID) 1324 ; Request = ask(Goal, Options) 1325 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1326 ask(ID, Goal, Options) 1327 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1328 pengine_reply(error(ID, error(protocol_error, _))), 1329 guarded_main_loop(ID) 1330 ). 1331 1332 1333pengine_terminate(ID) :- 1334 pengine_reply(destroy(ID)), 1335 thread_self(Me), % Make the thread silently disappear 1336 thread_detach(Me).
1347solve(Chunk, Template, Goal, ID) :- 1348 prolog_current_choice(Choice), 1349 ( integer(Chunk) 1350 -> State = count(Chunk) 1351 ; Chunk == false 1352 -> State = no_chunk 1353 ; domain_error(chunk, Chunk) 1354 ), 1355 statistics(cputime, Epoch), 1356 Time = time(Epoch), 1357 nb_current('$variable_names', Bindings), 1358 filter_template(Template, Bindings, Template2), 1359 '$current_typein_module'(CurrTypeIn), 1360 ( '$set_typein_module'(ID), 1361 call_cleanup(catch(findnsols_no_empty(State, Template2, 1362 set_projection(Goal, Bindings), 1363 Result), 1364 Error, true), 1365 query_done(Det, CurrTypeIn)), 1366 arg(1, Time, T0), 1367 statistics(cputime, T1), 1368 CPUTime is T1-T0, 1369 forall(pengine_flush_output_hook, true), 1370 ( var(Error) 1371 -> projection(Projection), 1372 ( var(Det) 1373 -> pengine_reply(success(ID, Result, Projection, 1374 CPUTime, true)), 1375 more_solutions(ID, Choice, State, Time) 1376 ; !, % commit 1377 destroy_or_continue(success(ID, Result, Projection, 1378 CPUTime, false)) 1379 ) 1380 ; !, % commit 1381 ( Error == abort_query 1382 -> throw(Error) 1383 ; destroy_or_continue(error(ID, Error)) 1384 ) 1385 ) 1386 ; !, % commit 1387 arg(1, Time, T0), 1388 statistics(cputime, T1), 1389 CPUTime is T1-T0, 1390 destroy_or_continue(failure(ID, CPUTime)) 1391 ). 1392solve(_, _, _, _). % leave a choice point 1393 1394query_done(true, CurrTypeIn) :- 1395 '$set_typein_module'(CurrTypeIn).
1404set_projection(Goal, Bindings) :- 1405 b_setval('$variable_names', Bindings), 1406 call(Goal). 1407 1408projection(Projection) :- 1409 nb_current('$variable_names', Bindings), 1410 !, 1411 maplist(var_name, Bindings, Projection). 1412projection([]).
1422filter_template(Template0, Bindings, Template) :- 1423 is_dict(Template0, swish_default_template), 1424 !, 1425 dict_create(Template, swish_default_template, Bindings). 1426filter_template(Template, _Bindings, Template). 1427 1428findnsols_no_empty(no_chunk, Template, Goal, List) => 1429 List = [Template], 1430 call(Goal). 1431findnsols_no_empty(State, Template, Goal, List) => 1432 findnsols(State, Template, Goal, List), 1433 List \== []. 1434 1435destroy_or_continue(Event) :- 1436 arg(1, Event, ID), 1437 ( pengine_property(ID, destroy(true)) 1438 -> thread_self(Me), 1439 thread_detach(Me), 1440 pengine_reply(destroy(ID, Event)) 1441 ; pengine_reply(Event), 1442 guarded_main_loop(ID) 1443 ).
chunk solutions.next, but sets the new chunk-size to Count.1461more_solutions(ID, Choice, State, Time) :- 1462 pengine_request(Event), 1463 more_solutions(Event, ID, Choice, State, Time). 1464 1465more_solutions(stop, ID, _Choice, _State, _Time) :- 1466 !, 1467 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1468 destroy_or_continue(stop(ID)). 1469more_solutions(next, ID, _Choice, _State, Time) :- 1470 !, 1471 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1472 statistics(cputime, T0), 1473 nb_setarg(1, Time, T0), 1474 fail. 1475more_solutions(next(Count), ID, _Choice, State, Time) :- 1476 Count > 0, 1477 State = count(_), % else fallthrough to protocol error 1478 !, 1479 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1480 nb_setarg(1, State, Count), 1481 statistics(cputime, T0), 1482 nb_setarg(1, Time, T0), 1483 fail. 1484more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1485 !, 1486 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1487 prolog_cut_to(Choice), 1488 ask(ID, Goal, Options). 1489more_solutions(destroy, ID, _Choice, _State, _Time) :- 1490 !, 1491 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1492 pengine_terminate(ID). 1493more_solutions(Event, ID, Choice, State, Time) :- 1494 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1495 pengine_reply(error(ID, error(protocol_error, _))), 1496 more_solutions(ID, Choice, State, Time).
chunk(N) option.
1504ask(ID, Goal, Options) :-
1505 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1506 !,
1507 ( var(Error)
1508 -> option(template(Template), Options, Goal),
1509 option(chunk(N), Options, 1),
1510 solve(N, Template, Goal1, ID)
1511 ; pengine_reply(error(ID, Error)),
1512 guarded_main_loop(ID)
1513 ).
Note that expand_goal(Module:GoalIn, GoalOut) is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1527prepare_goal(ID, Goal0, Module:Goal, Options) :-
1528 option(bindings(Bindings), Options, []),
1529 b_setval('$variable_names', Bindings),
1530 ( prepare_goal(Goal0, Goal1, Options)
1531 -> true
1532 ; Goal1 = Goal0
1533 ),
1534 get_pengine_module(ID, Module),
1535 setup_call_cleanup(
1536 '$set_source_module'(Old, Module),
1537 expand_goal(Goal1, Goal),
1538 '$set_source_module'(_, Old)),
1539 ( pengine_not_sandboxed(ID)
1540 -> true
1541 ; get_pengine_application(ID, App),
1542 setting(App:safe_goal_limit, Limit),
1543 catch(call_with_time_limit(
1544 Limit,
1545 safe_goal(Module:Goal)), E, true)
1546 -> ( var(E)
1547 -> true
1548 ; E = time_limit_exceeded
1549 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1550 ; throw(E)
1551 )
1552 ).not_sandboxed(User, Application) must succeed.
1572pengine_not_sandboxed(ID) :-
1573 pengine_user(ID, User),
1574 pengine_property(ID, application(App)),
1575 not_sandboxed(User, App),
1576 !.1598pengine_pull_response(Pengine, Options) :- 1599 pengine_remote(Pengine, Server), 1600 !, 1601 remote_pengine_pull_response(Server, Pengine, Options). 1602pengine_pull_response(_ID, _Options).
1611pengine_input(Prompt, Term) :-
1612 pengine_self(Self),
1613 pengine_parent(Parent),
1614 pengine_reply(Parent, prompt(Self, Prompt)),
1615 pengine_request(Request),
1616 ( Request = input(Input)
1617 -> Term = Input
1618 ; Request == destroy
1619 -> abort
1620 ; throw(error(protocol_error,_))
1621 ).Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :-
pengine_send(Pengine, input(Input), Options).
*/
1638pengine_respond(Pengine, Input, Options) :-
1639 pengine_send(Pengine, input(Input), Options).1648send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1649 is_list(Frames), 1650 !, 1651 with_output_to(string(Stack), 1652 print_prolog_backtrace(current_output, Frames)), 1653 pengine_self(Self), 1654 replace_blobs(Formal, Formal1), 1655 replace_blobs(Message, Message1), 1656 pengine_reply(error(Self, error(Formal1, 1657 context(prolog_stack(Stack), Message1)))). 1658send_error(Error) :- 1659 pengine_self(Self), 1660 replace_blobs(Error, Error1), 1661 pengine_reply(error(Self, Error1)).
1669replace_blobs(Blob, Atom) :- 1670 blob(Blob, Type), Type \== text, 1671 !, 1672 format(atom(Atom), '~p', [Blob]). 1673replace_blobs(Term0, Term) :- 1674 compound(Term0), 1675 !, 1676 compound_name_arguments(Term0, Name, Args0), 1677 maplist(replace_blobs, Args0, Args), 1678 compound_name_arguments(Term, Name, Args). 1679replace_blobs(Term, Term). 1680 1681 1682/*================= Remote pengines ======================= 1683*/ 1684 1685 1686remote_pengine_create(BaseURL, Options) :- 1687 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1688 ( option(ask(Query), PengineOptions0), 1689 \+ option(template(_Template), PengineOptions0) 1690 -> PengineOptions = [template(Query)|PengineOptions0] 1691 ; PengineOptions = PengineOptions0 1692 ), 1693 options_to_dict(PengineOptions, PostData), 1694 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1695 arg(1, Reply, ID), 1696 ( option(id(ID2), Options) 1697 -> ID = ID2 1698 ; true 1699 ), 1700 option(alias(Name), Options, ID), 1701 assert(child(Name, ID)), 1702 ( ( functor(Reply, create, _) % actually created 1703 ; functor(Reply, output, _) % compiler messages 1704 ) 1705 -> option(application(Application), PengineOptions, pengine_sandbox), 1706 option(destroy(Destroy), PengineOptions, true), 1707 pengine_register_remote(ID, BaseURL, Application, Destroy) 1708 ; true 1709 ), 1710 thread_self(Queue), 1711 pengine_reply(Queue, Reply). 1712 1713options_to_dict(Options, Dict) :- 1714 select_option(ask(Ask), Options, Options1), 1715 select_option(template(Template), Options1, Options2), 1716 !, 1717 no_numbered_var_in(Ask+Template), 1718 findall(AskString-TemplateString, 1719 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1720 [ AskString-TemplateString ]), 1721 options_to_dict(Options2, Dict0), 1722 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1723options_to_dict(Options, Dict) :- 1724 maplist(prolog_option, Options, Options1), 1725 dict_create(Dict, _, Options1). 1726 1727no_numbered_var_in(Term) :- 1728 sub_term(Sub, Term), 1729 subsumes_term('$VAR'(_), Sub), 1730 !, 1731 domain_error(numbered_vars_free_term, Term). 1732no_numbered_var_in(_). 1733 1734ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1735 numbervars(Ask+Template, 0, _), 1736 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1737 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1738 Template, WOpts 1739 ]), 1740 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1741 1742prolog_option(Option0, Option) :- 1743 create_option_type(Option0, term), 1744 !, 1745 Option0 =.. [Name,Value], 1746 format(string(String), '~k', [Value]), 1747 Option =.. [Name,String]. 1748prolog_option(Option, Option). 1749 1750create_option_type(ask(_), term). 1751create_option_type(template(_), term). 1752create_option_type(application(_), atom). 1753 1754remote_pengine_send(BaseURL, ID, Event, Options) :- 1755 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1756 thread_self(Queue), 1757 pengine_reply(Queue, Reply). 1758 1759remote_pengine_pull_response(BaseURL, ID, Options) :- 1760 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1761 thread_self(Queue), 1762 pengine_reply(Queue, Reply). 1763 1764remote_pengine_abort(BaseURL, ID, Options) :- 1765 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1766 thread_self(Queue), 1767 pengine_reply(Queue, Reply).
1774remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1775 !, 1776 server_url(Server, Action, [id=ID], URL), 1777 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1778 [ post(prolog(Event)) % makes it impossible to interrupt. 1779 | Options 1780 ]), 1781 call_cleanup( 1782 read_prolog_reply(Stream, Reply), 1783 close(Stream)). 1784remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1785 server_url(Server, Action, [id=ID|Params], URL), 1786 http_open(URL, Stream, Options), 1787 call_cleanup( 1788 read_prolog_reply(Stream, Reply), 1789 close(Stream)). 1790 1791remote_post_rec(Server, Action, Data, Reply, Options) :- 1792 server_url(Server, Action, [], URL), 1793 probe(Action, URL, Options), 1794 http_open(URL, Stream, 1795 [ post(json(Data)) 1796 | Options 1797 ]), 1798 call_cleanup( 1799 read_prolog_reply(Stream, Reply), 1800 close(Stream)).
1808probe(create, URL, Options) :- 1809 !, 1810 http_open(URL, Stream, [method(options)|Options]), 1811 close(Stream). 1812probe(_, _, _). 1813 1814read_prolog_reply(In, Reply) :- 1815 set_stream(In, encoding(utf8)), 1816 read(In, Reply0), 1817 rebind_cycles(Reply0, Reply). 1818 1819rebind_cycles(@(Reply, Bindings), Reply) :- 1820 is_list(Bindings), 1821 !, 1822 maplist(bind, Bindings). 1823rebind_cycles(Reply, Reply). 1824 1825bind(Var = Value) :- 1826 Var = Value. 1827 1828server_url(Server, Action, Params, URL) :- 1829 atom_concat('pengine/', Action, PAction), 1830 uri_edit([ path(PAction), 1831 search(Params) 1832 ], Server, URL).
Valid options are:
timeout.1853pengine_event(Event) :- 1854 pengine_event(Event, []). 1855 1856pengine_event(Event, Options) :- 1857 thread_self(Self), 1858 option(listen(Id), Options, _), 1859 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1860 -> true 1861 ; Event = timeout 1862 ), 1863 update_remote_destroy(Event). 1864 1865update_remote_destroy(Event) :- 1866 destroy_event(Event), 1867 arg(1, Event, Id), 1868 pengine_remote(Id, _Server), 1869 !, 1870 pengine_unregister_remote(Id). 1871update_remote_destroy(_). 1872 1873destroy_event(destroy(_)). 1874destroy_event(destroy(_,_)). 1875destroy_event(create(_,Features)) :- 1876 memberchk(answer(Answer), Features), 1877 !, 1878 nonvar(Answer), 1879 destroy_event(Answer).
ignore(call(Closure, E)). A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all,
all_but_sender or a Prolog list of NameOrIDs. [not yet
implemented]*/
1908pengine_event_loop(Closure, Options) :- 1909 child(_,_), 1910 !, 1911 pengine_event(Event), 1912 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1913 -> forall(child(_,ID), pengine_send(ID, Event)) 1914 ; true 1915 ), 1916 pengine_event_loop(Event, Closure, Options). 1917pengine_event_loop(_, _). 1918 1919:- meta_predicate 1920 pengine_process_event(, , , ). 1921 1922pengine_event_loop(Event, Closure, Options) :- 1923 pengine_process_event(Event, Closure, Continue, Options), 1924 ( Continue == true 1925 -> pengine_event_loop(Closure, Options) 1926 ; true 1927 ). 1928 1929pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1930 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1931 ( select(answer(First), T, T1) 1932 -> ignore(call(Closure, create(ID, T1))), 1933 pengine_process_event(First, Closure, Continue, Options) 1934 ; ignore(call(Closure, create(ID, T))), 1935 Continue = true 1936 ). 1937pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1938 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1939 ignore(call(Closure, output(ID, Msg))), 1940 pengine_pull_response(ID, []). 1941pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1942 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1943 ignore(call(Closure, debug(ID, Msg))), 1944 pengine_pull_response(ID, []). 1945pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1946 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1947 ignore(call(Closure, prompt(ID, Term))). 1948pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1949 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1950 ignore(call(Closure, success(ID, Sol, More))). 1951pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1952 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1953 ignore(call(Closure, failure(ID))). 1954pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1955 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1956 ( call(Closure, error(ID, Error)) 1957 -> Continue = true 1958 ; forall(child(_,Child), pengine_destroy(Child)), 1959 throw(Error) 1960 ). 1961pengine_process_event(stop(ID), Closure, true, _Options) :- 1962 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1963 ignore(call(Closure, stop(ID))). 1964pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1965 pengine_process_event(Event, Closure, _, Options), 1966 pengine_process_event(destroy(ID), Closure, Continue, Options). 1967pengine_process_event(destroy(ID), Closure, true, _Options) :- 1968 retractall(child(_,ID)), 1969 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1970 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit.Remaining options (except the server option) are passed to pengine_create/1. */
1999pengine_rpc(URL, Query) :- 2000 pengine_rpc(URL, Query, []). 2001 2002pengine_rpc(URL, Query, M:Options0) :- 2003 translate_local_sources(Options0, Options1, M), 2004 ( option(timeout(_), Options1) 2005 -> Options = Options1 2006 ; setting(time_limit, Limit), 2007 Options = [timeout(Limit)|Options1] 2008 ), 2009 term_variables(Query, Vars), 2010 Template =.. [v|Vars], 2011 State = destroy(true), % modified by process_event/4 2012 setup_call_catcher_cleanup( 2013 pengine_create([ ask(Query), 2014 template(Template), 2015 server(URL), 2016 id(Id) 2017 | Options 2018 ]), 2019 wait_event(Template, State, [listen(Id)|Options]), 2020 Why, 2021 pengine_destroy_and_wait(State, Id, Why, Options)). 2022 2023pengine_destroy_and_wait(destroy(true), Id, Why, Options) :- 2024 !, 2025 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2026 pengine_destroy(Id, Options), 2027 wait_destroy(Id, 10). 2028pengine_destroy_and_wait(_, _, Why, _) :- 2029 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2030 2031wait_destroy(Id, _) :- 2032 \+ child(_, Id), 2033 !. 2034wait_destroy(Id, N) :- 2035 pengine_event(Event, [listen(Id),timeout(10)]), 2036 !, 2037 ( destroy_event(Event) 2038 -> retractall(child(_,Id)) 2039 ; succ(N1, N) 2040 -> wait_destroy(Id, N1) 2041 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2042 pengine_unregister_remote(Id), 2043 retractall(child(_,Id)) 2044 ). 2045 2046wait_event(Template, State, Options) :- 2047 pengine_event(Event, Options), 2048 debug(pengine(event), 'Received ~p', [Event]), 2049 process_event(Event, Template, State, Options). 2050 2051process_event(create(_ID, Features), Template, State, Options) :- 2052 memberchk(answer(First), Features), 2053 process_event(First, Template, State, Options). 2054process_event(error(_ID, Error), _Template, _, _Options) :- 2055 throw(Error). 2056process_event(failure(_ID, _Time), _Template, _, _Options) :- 2057 fail. 2058process_event(prompt(ID, Prompt), Template, State, Options) :- 2059 pengine_rpc_prompt(ID, Prompt, Reply), 2060 pengine_send(ID, input(Reply)), 2061 wait_event(Template, State, Options). 2062process_event(output(ID, Term), Template, State, Options) :- 2063 pengine_rpc_output(ID, Term), 2064 pengine_pull_response(ID, Options), 2065 wait_event(Template, State, Options). 2066process_event(debug(ID, Message), Template, State, Options) :- 2067 debug(pengine(debug), '~w', [Message]), 2068 pengine_pull_response(ID, Options), 2069 wait_event(Template, State, Options). 2070process_event(success(_ID, Solutions, _Proj, _Time, false), 2071 Template, _, _Options) :- 2072 !, 2073 member(Template, Solutions). 2074process_event(success(ID, Solutions, _Proj, _Time, true), 2075 Template, State, Options) :- 2076 ( member(Template, Solutions) 2077 ; pengine_next(ID, Options), 2078 wait_event(Template, State, Options) 2079 ). 2080process_event(destroy(ID, Event), Template, State, Options) :- 2081 !, 2082 retractall(child(_,ID)), 2083 nb_setarg(1, State, false), 2084 debug(pengine(destroy), 'State: ~p~n', [State]), 2085 process_event(Event, Template, State, Options). 2086% compatibility with older versions of the protocol. 2087process_event(success(ID, Solutions, Time, More), 2088 Template, State, Options) :- 2089 process_event(success(ID, Solutions, _Proj, Time, More), 2090 Template, State, Options). 2091 2092 2093pengine_rpc_prompt(ID, Prompt, Term) :- 2094 prompt(ID, Prompt, Term0), 2095 !, 2096 Term = Term0. 2097pengine_rpc_prompt(_ID, Prompt, Term) :- 2098 setup_call_cleanup( 2099 prompt(Old, Prompt), 2100 read(Term), 2101 prompt(_, Old)). 2102 2103pengine_rpc_output(ID, Term) :- 2104 output(ID, Term), 2105 !. 2106pengine_rpc_output(_ID, Term) :- 2107 print(Term).
2114:- multifile prompt/3.2121:- multifile output/2. 2122 2123 2124/*================= HTTP handlers ======================= 2125*/ 2126 2127% Declare HTTP locations we serve and how. Note that we use 2128% time_limit(inifinite) because pengines have their own timeout. Also 2129% note that we use spawn. This is needed because we can easily get 2130% many clients waiting for some action on a pengine to complete. 2131% Without spawning, we would quickly exhaust the worker pool of the 2132% HTTP server. 2133% 2134% FIXME: probably we should wait for a short time for the pengine on 2135% the default worker thread. Only if that time has expired, we can 2136% call http_spawn/2 to continue waiting on a new thread. That would 2137% improve the performance and reduce the usage of threads. 2138 2139:- multifile http:location/3. 2140httplocation(pengine, root(pengine), [-100]). 2141 2142:- http_handler(pengine(.), http_404([]), 2143 [ id(pengines) ]). 2144:- http_handler(pengine(create), http_pengine_create, 2145 [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(pengine(send), http_pengine_send, 2147 [ time_limit(infinite), spawn([]) ]). 2148:- http_handler(pengine(pull_response), http_pengine_pull_response, 2149 [ time_limit(infinite), spawn([]) ]). 2150:- http_handler(pengine(abort), http_pengine_abort, []). 2151:- http_handler(pengine(detach), http_pengine_detach, []). 2152:- http_handler(pengine(list), http_pengine_list, []). 2153:- http_handler(pengine(ping), http_pengine_ping, []). 2154:- http_handler(pengine(destroy_all), http_pengine_destroy_all, []). 2155 2156:- http_handler(pengine('pengines.js'), 2157 http_reply_file(library('http/web/js/pengines.js'), []), []). 2158:- http_handler(pengine('plterm.css'), 2159 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json and as
www-form-encoded. Accepted parameters:
| Parameter | Default | Comment |
|---|---|---|
| format | prolog | Output format |
| application | pengine_sandbox | Pengine application |
| chunk | 1 | Chunk-size for results |
| collate | 0 (off) | Join output events |
| solutions | chunked | If all, emit all results |
| ask | - | The query |
| template | - | Output template |
| src_text | "" | Program |
| src_url | - | Program to download |
| disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false simulates the recursive toplevel. See
pengine_ask/3.
2190http_pengine_create(Request) :- 2191 reply_options(Request, [post]), 2192 !. 2193http_pengine_create(Request) :- 2194 memberchk(content_type(CT), Request), 2195 sub_atom(CT, 0, _, _, 'application/json'), 2196 !, 2197 http_read_json_dict(Request, Dict), 2198 dict_atom_option(format, Dict, Format, prolog), 2199 dict_atom_option(application, Dict, Application, pengine_sandbox), 2200 http_pengine_create(Request, Application, Format, Dict). 2201http_pengine_create(Request) :- 2202 Optional = [optional(true)], 2203 OptString = [string|Optional], 2204 Form = [ format(Format, [default(prolog)]), 2205 application(Application, [default(pengine_sandbox)]), 2206 chunk(_, [nonneg;oneof([false]), default(1)]), 2207 collate(_, [number, default(0)]), 2208 solutions(_, [oneof([all,chunked]), default(chunked)]), 2209 ask(_, OptString), 2210 template(_, OptString), 2211 src_text(_, OptString), 2212 disposition(_, OptString), 2213 src_url(_, Optional) 2214 ], 2215 http_parameters(Request, Form), 2216 form_dict(Form, Dict), 2217 http_pengine_create(Request, Application, Format, Dict). 2218 2219dict_atom_option(Key, Dict, Atom, Default) :- 2220 ( get_dict(Key, Dict, String) 2221 -> atom_string(Atom, String) 2222 ; Atom = Default 2223 ). 2224 2225form_dict(Form, Dict) :- 2226 form_values(Form, Pairs), 2227 dict_pairs(Dict, _, Pairs). 2228 2229form_values([], []). 2230form_values([H|T], Pairs) :- 2231 arg(1, H, Value), 2232 nonvar(Value), 2233 !, 2234 functor(H, Name, _), 2235 Pairs = [Name-Value|PairsT], 2236 form_values(T, PairsT). 2237form_values([_|T], Pairs) :- 2238 form_values(T, Pairs).
2243http_pengine_create(Request, Application, Format, Dict) :- 2244 current_application(Application), 2245 !, 2246 allowed(Request, Application), 2247 authenticate(Request, Application, UserOptions), 2248 dict_to_options(Dict, Application, CreateOptions0), 2249 append(UserOptions, CreateOptions0, CreateOptions), 2250 pengine_uuid(Pengine), 2251 message_queue_create(Queue, [max_size(25)]), 2252 setting(Application:time_limit, TimeLimit), 2253 get_time(Now), 2254 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2255 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2256 create(Queue, Pengine, CreateOptions, http, Application), 2257 create_wait_and_output_result(Pengine, Queue, Format, 2258 TimeLimit, Dict), 2259 gc_abandoned_queues. 2260http_pengine_create(_Request, Application, Format, _Dict) :- 2261 Error = existence_error(pengine_application, Application), 2262 pengine_uuid(ID), 2263 output_result(ID, Format, error(ID, error(Error, _))). 2264 2265 2266dict_to_options(Dict, Application, CreateOptions) :- 2267 dict_pairs(Dict, _, Pairs), 2268 pairs_create_options(Pairs, Application, CreateOptions). 2269 2270pairs_create_options([], _, []) :- !. 2271pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2272 Opt =.. [N,V], 2273 pengine_create_option(Opt), N \== user, 2274 !, 2275 ( create_option_type(Opt, atom) 2276 -> atom_string(V, V0) % term creation must be done if 2277 ; V = V0 % we created the source and know 2278 ), % the operators. 2279 pairs_create_options(T0, App, T). 2280pairs_create_options([_|T0], App, T) :- 2281 pairs_create_options(T0, App, T).
time_limit,
Pengine is aborted and the result is error(time_limit_exceeded,
_).
2292wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2293 Collate is min(Collate0, TimeLimit/10),
2294 get_time(Epoch),
2295 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2296 [ timeout(TimeLimit)
2297 ]),
2298 Error, true)
2299 -> ( var(Error)
2300 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2301 ( collating_event(Collate, Event)
2302 -> Deadline is Epoch+TimeLimit,
2303 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2304 Events = [Event|More],
2305 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2306 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2307 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2308 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2309 )
2310 ; output_result(Pengine, Format, died(Pengine))
2311 )
2312 ; time_limit_exceeded(Pengine, Format)
2313 ).2320collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :- 2321 !. 2322collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :- 2323 debug(pengine(wait), 'Waiting to collate events', []), 2324 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2325 [ timeout(Collate) 2326 ]), 2327 Error, true) 2328 -> ( var(Error) 2329 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]), 2330 Events = [Event|More], 2331 ( collating_event(Collate, Event) 2332 -> Max2 is Max - 1, 2333 collect_events(Pengine, Collate, Queue, Deadline, Max2, More) 2334 ; More = [] 2335 ) 2336 ; Events = [died(Pengine)] 2337 ) 2338 ; get_time(Now), 2339 Now > Deadline 2340 -> time_limit_event(Pengine, TimeLimitEvent), 2341 Events = [TimeLimitEvent] 2342 ; Events = [] 2343 ). 2344 2345collating_event(0, _) :- 2346 !, 2347 fail. 2348collating_event(_, output(_,_)).
disposition key to denote the
download location.2357create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2358 get_dict(solutions, Dict, all), 2359 !, 2360 between(1, infinite, Page), 2361 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2362 [ timeout(TimeLimit) 2363 ]), 2364 Error, true) 2365 -> ( var(Error) 2366 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2367 ( destroy_queue_from_http(Pengine, Event, Queue) 2368 -> !, 2369 protect_pengine(Pengine, 2370 output_result_2(Format, page(Page, Event), Dict)) 2371 ; is_more_event(Event) 2372 -> pengine_thread(Pengine, Thread), 2373 thread_send_message(Thread, pengine_request(next)), 2374 protect_pengine(Pengine, 2375 output_result_2(Format, page(Page, Event), Dict)), 2376 fail 2377 ; !, 2378 protect_pengine(Pengine, 2379 output_result_2(Format, page(Page, Event), Dict)) 2380 ) 2381 ; !, output_result(Pengine, Format, died(Pengine)) 2382 ) 2383 ; !, time_limit_exceeded(Pengine, Format) 2384 ), 2385 !. 2386create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2387 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)). 2388 2389is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2390is_more_event(create(_, Options)) :- 2391 memberchk(answer(Event), Options), 2392 is_more_event(Event).
2406time_limit_exceeded(Pengine, Format) :- 2407 time_limit_event(Pengine, Event), 2408 call_cleanup( 2409 pengine_destroy(Pengine, [force(true)]), 2410 output_result(Pengine, Format, Event)). 2411 2412time_limit_event(Pengine, 2413 destroy(Pengine, error(Pengine, time_limit_exceeded))). 2414 2415destroy_pengine_after_output(Pengine, Events) :- 2416 is_list(Events), 2417 last(Events, Last), 2418 time_limit_event(Pengine, Last), 2419 !, 2420 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true). 2421destroy_pengine_after_output(_, _).
2436destroy_queue_from_http(ID, _, Queue) :- 2437 output_queue(ID, Queue, _), 2438 !, 2439 destroy_queue_if_empty(Queue). 2440destroy_queue_from_http(ID, Event, Queue) :- 2441 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2442 is_destroy_event(Event), 2443 !, 2444 message_queue_property(Queue, size(Waiting)), 2445 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2446 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2447 2448is_destroy_event(destroy(_)). 2449is_destroy_event(destroy(_,_)). 2450is_destroy_event(create(_, Options)) :- 2451 memberchk(answer(Event), Options), 2452 is_destroy_event(Event). 2453 2454destroy_queue_if_empty(Queue) :- 2455 thread_peek_message(Queue, _), 2456 !. 2457destroy_queue_if_empty(Queue) :- 2458 retractall(output_queue(_, Queue, _)), 2459 message_queue_destroy(Queue).
2467:- dynamic 2468 last_gc/1. 2469 2470gc_abandoned_queues :- 2471 consider_queue_gc, 2472 !, 2473 get_time(Now), 2474 ( output_queue(_, Queue, Time), 2475 Now-Time > 15*60, 2476 retract(output_queue(_, Queue, Time)), 2477 message_queue_destroy(Queue), 2478 fail 2479 ; retractall(last_gc(_)), 2480 asserta(last_gc(Now)) 2481 ). 2482gc_abandoned_queues. 2483 2484consider_queue_gc :- 2485 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2486 N > 100, 2487 ( last_gc(Time), 2488 get_time(Now), 2489 Now-Time > 5*60 2490 -> true 2491 ; \+ last_gc(_) 2492 ).
2510:- dynamic output_queue_destroyed/1. 2511 2512sync_destroy_queue_from_http(ID, Queue) :- 2513 ( output_queue(ID, Queue, _) 2514 -> destroy_queue_if_empty(Queue) 2515 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2516 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2517 [Queue]), 2518 get_time(Now), 2519 asserta(output_queue(ID, Queue, Now)) 2520 ; message_queue_destroy(Queue), 2521 asserta(output_queue_destroyed(Queue)) 2522 ).
pengine held.2529sync_destroy_queue_from_pengine(ID, Queue) :- 2530 ( retract(output_queue_destroyed(Queue)) 2531 -> true 2532 ; get_time(Now), 2533 asserta(output_queue(ID, Queue, Now)) 2534 ), 2535 retractall(pengine_queue(ID, Queue, _, _)). 2536 2537 2538http_pengine_send(Request) :- 2539 reply_options(Request, [get,post]), 2540 !. 2541http_pengine_send(Request) :- 2542 http_parameters(Request, 2543 [ id(ID, [ type(atom) ]), 2544 event(EventString, [optional(true)]), 2545 collate(Collate, [number, default(0)]), 2546 format(Format, [default(prolog)]) 2547 ]), 2548 catch(read_event(ID, Request, Format, EventString, Event), 2549 Error, 2550 true), 2551 ( var(Error) 2552 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2553 ( pengine_thread(ID, Thread) 2554 -> pengine_queue(ID, Queue, TimeLimit, _), 2555 random_delay, 2556 broadcast(pengine(send(ID, Event))), 2557 thread_send_message(Thread, pengine_request(Event)), 2558 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2559 ; atom(ID) 2560 -> pengine_died(Format, ID) 2561 ; http_404([], Request) 2562 ) 2563 ; Error = error(existence_error(pengine, ID), _) 2564 -> pengine_died(Format, ID) 2565 ; output_result(ID, Format, error(ID, Error)) 2566 ). 2567 2568pengine_died(Format, Pengine) :- 2569 output_result(Pengine, Format, 2570 error(Pengine, error(existence_error(pengine, Pengine),_))).
pengine_done mutex.
2581read_event(Pengine, Request, Format, EventString, Event) :- 2582 protect_pengine( 2583 Pengine, 2584 ( get_pengine_module(Pengine, Module), 2585 read_event_2(Request, EventString, Module, Event0, Bindings) 2586 )), 2587 !, 2588 fix_bindings(Format, Event0, Bindings, Event). 2589read_event(Pengine, Request, _Format, _EventString, _Event) :- 2590 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2591 discard_post_data(Request), 2592 existence_error(pengine, Pengine).
event parameter or as a posted document.2600read_event_2(_Request, EventString, Module, Event, Bindings) :- 2601 nonvar(EventString), 2602 !, 2603 term_string(Event, EventString, 2604 [ variable_names(Bindings), 2605 module(Module) 2606 ]). 2607read_event_2(Request, _EventString, Module, Event, Bindings) :- 2608 option(method(post), Request), 2609 http_read_data(Request, Event, 2610 [ content_type('application/x-prolog'), 2611 module(Module), 2612 variable_names(Bindings) 2613 ]).
2619discard_post_data(Request) :- 2620 option(method(post), Request), 2621 !, 2622 setup_call_cleanup( 2623 open_null_stream(NULL), 2624 http_read_data(Request, _, [to(stream(NULL))]), 2625 close(NULL)). 2626discard_post_data(_).
json(-s) Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2634fix_bindings(Format, 2635 ask(Goal, Options0), Bindings, 2636 ask(Goal, NewOptions)) :- 2637 json_lang(Format), 2638 !, 2639 exclude(anon, Bindings, NamedBindings), 2640 template(NamedBindings, Template, Options0, Options1), 2641 select_option(chunk(Paging), Options1, Options2, 1), 2642 NewOptions = [ template(Template), 2643 chunk(Paging), 2644 bindings(NamedBindings) 2645 | Options2 2646 ]. 2647fix_bindings(_, Command, _, Command). 2648 2649template(_, Template, Options0, Options) :- 2650 select_option(template(Template), Options0, Options), 2651 !. 2652template(Bindings, Template, Options, Options) :- 2653 dict_create(Template, swish_default_template, Bindings). 2654 2655anon(Name=_) :- 2656 sub_atom(Name, 0, _, _, '_'), 2657 sub_atom(Name, 1, 1, _, Next), 2658 char_type(Next, prolog_var_start). 2659 2660var_name(Name=_, Name).
2667json_lang(json) :- !. 2668json_lang(Format) :- 2669 sub_atom(Format, 0, _, _, 'json-').
2676http_pengine_pull_response(Request) :- 2677 reply_options(Request, [get]), 2678 !. 2679http_pengine_pull_response(Request) :- 2680 http_parameters(Request, 2681 [ id(ID, []), 2682 format(Format, [default(prolog)]), 2683 collate(Collate, [number, default(0)]) 2684 ]), 2685 reattach(ID), 2686 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2687 -> true 2688 ; output_queue(ID, Queue, _), 2689 TimeLimit = 0 2690 ) 2691 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2692 ; http_404([], Request) 2693 ).
2702http_pengine_abort(Request) :- 2703 reply_options(Request, [get,post]), 2704 !. 2705http_pengine_abort(Request) :- 2706 http_parameters(Request, 2707 [ id(ID, []) 2708 ]), 2709 ( pengine_thread(ID, _Thread) 2710 -> broadcast(pengine(abort(ID))), 2711 abort_pending_output(ID), 2712 pengine_abort(ID), 2713 reply_json_dict(true) 2714 ; http_404([], Request) 2715 ).
2727http_pengine_detach(Request) :- 2728 reply_options(Request, [post]), 2729 !. 2730http_pengine_detach(Request) :- 2731 http_parameters(Request, 2732 [ id(ID, []) 2733 ]), 2734 http_read_json_dict(Request, ClientData), 2735 ( pengine_property(ID, application(Application)), 2736 allowed(Request, Application), 2737 authenticate(Request, Application, _UserOptions) 2738 -> broadcast(pengine(detach(ID))), 2739 get_time(Now), 2740 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2741 pengine_queue(ID, Queue, _TimeLimit, _Now), 2742 message_queue_set(Queue, max_size(1000)), 2743 pengine_reply(Queue, detached(ID)), 2744 reply_json_dict(true) 2745 ; http_404([], Request) 2746 ). 2747 2748reattach(ID) :- 2749 ( retract(pengine_detached(ID, _Data)), 2750 pengine_queue(ID, Queue, _TimeLimit, _Now) 2751 -> message_queue_set(Queue, max_size(25)) 2752 ; true 2753 ).
2761http_pengine_destroy_all(Request) :- 2762 reply_options(Request, [get,post]), 2763 !. 2764http_pengine_destroy_all(Request) :- 2765 http_parameters(Request, 2766 [ ids(IDsAtom, []) 2767 ]), 2768 atomic_list_concat(IDs, ',', IDsAtom), 2769 forall(( member(ID, IDs), 2770 \+ pengine_detached(ID, _) 2771 ), 2772 pengine_destroy(ID, [force(true)])), 2773 reply_json_dict("ok").
status(Pengine, Stats) is created, where Stats
is the return of thread_statistics/2.2781http_pengine_ping(Request) :- 2782 reply_options(Request, [get]), 2783 !. 2784http_pengine_ping(Request) :- 2785 http_parameters(Request, 2786 [ id(Pengine, []), 2787 format(Format, [default(prolog)]) 2788 ]), 2789 ( pengine_thread(Pengine, Thread), 2790 Error = error(_,_), 2791 catch(thread_statistics(Thread, Stats), Error, fail) 2792 -> output_result(Pengine, Format, ping(Pengine, Stats)) 2793 ; output_result(Pengine, Format, died(Pengine)) 2794 ).
2803http_pengine_list(Request) :- 2804 reply_options(Request, [get]), 2805 !. 2806http_pengine_list(Request) :- 2807 http_parameters(Request, 2808 [ status(Status, [default(detached), oneof([detached])]), 2809 application(Application, [default(pengine_sandbox)]) 2810 ]), 2811 allowed(Request, Application), 2812 authenticate(Request, Application, _UserOptions), 2813 findall(Term, listed_pengine(Application, Status, Term), Terms), 2814 reply_json_dict(json{pengines: Terms}). 2815 2816listed_pengine(Application, detached, State) :- 2817 State = pengine{id:Id, 2818 detached:Time, 2819 queued:Queued, 2820 stats:Stats}, 2821 2822 pengine_property(Id, application(Application)), 2823 pengine_property(Id, detached(Time)), 2824 pengine_queue(Id, Queue, _TimeLimit, _Now), 2825 message_queue_property(Queue, size(Queued)), 2826 ( pengine_thread(Id, Thread), 2827 catch(thread_statistics(Thread, Stats), _, fail) 2828 -> true 2829 ; Stats = thread{status:died} 2830 ).
prolog, json or json-s.
2841:- dynamic 2842 pengine_replying/2. % +Pengine, +Thread 2843 2844output_result(Pengine, Format, Event) :- 2845 thread_self(Thread), 2846 cors_enable, % contingent on http:cors setting 2847 disable_client_cache, 2848 setup_call_cleanup( 2849 asserta(pengine_replying(Pengine, Thread), Ref), 2850 catch(output_result_2(Format, Event, _{}), 2851 pengine_abort_output, 2852 true), 2853 erase(Ref)), 2854 destroy_pengine_after_output(Pengine, Event). 2855 2856output_result_2(Lang, Event, Dict) :- 2857 write_result(Lang, Event, Dict), 2858 !. 2859output_result_2(prolog, Event, _) :- 2860 !, 2861 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2862 write_term(Event, 2863 [ quoted(true), 2864 ignore_ops(true), 2865 fullstop(true), 2866 blobs(portray), 2867 portray_goal(portray_blob), 2868 nl(true) 2869 ]). 2870output_result_2(Lang, Event, _) :- 2871 json_lang(Lang), 2872 !, 2873 ( event_term_to_json_data(Event, JSON, Lang) 2874 -> reply_json_dict(JSON) 2875 ; assertion(event_term_to_json_data(Event, _, Lang)) 2876 ). 2877output_result_2(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2878 domain_error(pengine_format, Lang).
'$BLOB'(Type).
Future versions may include more info, depending on Type.2888:- public portray_blob/2. % called from write-term 2889portray_blob(Blob, _Options) :- 2890 blob(Blob, Type), 2891 writeq('$BLOB'(Type)).
2898abort_pending_output(Pengine) :- 2899 forall(pengine_replying(Pengine, Thread), 2900 abort_output_thread(Thread)). 2901 2902abort_output_thread(Thread) :- 2903 catch(thread_signal(Thread, throw(pengine_abort_output)), 2904 error(existence_error(thread, _), _), 2905 true).
prolog and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2921disable_client_cache :- 2922 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2923 Pragma: no-cache\r\n\c 2924 Expires: 0\r\n'). 2925 2926event_term_to_json_data(Events, JSON, Lang) :- 2927 is_list(Events), 2928 !, 2929 events_to_json_data(Events, JSON, Lang). 2930event_term_to_json_data(Event, JSON, Lang) :- 2931 event_to_json(Event, JSON, Lang), 2932 !. 2933event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2934 json{event:success, id:ID, time:Time, 2935 data:Bindings, more:More, projection:Projection}, 2936 json) :- 2937 !, 2938 term_to_json(Bindings0, Bindings). 2939event_term_to_json_data(destroy(ID, Event), 2940 json{event:destroy, id:ID, data:JSON}, 2941 Style) :- 2942 !, 2943 event_term_to_json_data(Event, JSON, Style). 2944event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2945 !, 2946 ( select(answer(First0), Features0, Features1) 2947 -> event_term_to_json_data(First0, First, Style), 2948 Features = [answer(First)|Features1] 2949 ; Features = Features0 2950 ), 2951 dict_create(JSON, json, [event(create), id(ID)|Features]). 2952event_term_to_json_data(destroy(ID, Event), 2953 json{event:destroy, id:ID, data:JSON}, Style) :- 2954 !, 2955 event_term_to_json_data(Event, JSON, Style). 2956event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2957 !, 2958 Error0 = json{event:error, id:ID, data:Message}, 2959 add_error_details(ErrorTerm, Error0, Error), 2960 message_to_string(ErrorTerm, Message). 2961event_term_to_json_data(failure(ID, Time), 2962 json{event:failure, id:ID, time:Time}, _) :- 2963 !. 2964event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2965 functor(EventTerm, F, 1), 2966 !, 2967 arg(1, EventTerm, ID). 2968event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2969 functor(EventTerm, F, 2), 2970 arg(1, EventTerm, ID), 2971 arg(2, EventTerm, Data), 2972 term_to_json(Data, JSON). 2973 2974events_to_json_data([], [], _). 2975events_to_json_data([E|T0], [J|T], Lang) :- 2976 event_term_to_json_data(E, J, Lang), 2977 events_to_json_data(T0, T, Lang). 2978 2979:- public add_error_details/3.
pengines_io.pl.
2986add_error_details(Error, JSON0, JSON) :-
2987 add_error_code(Error, JSON0, JSON1),
2988 add_error_location(Error, JSON1, JSON).code field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error, type_error, etc. Some errors carry more
information:
3001add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 3002 atom(Type), 3003 !, 3004 to_atomic(Obj, Value), 3005 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 3006add_error_code(error(Formal, _), Error0, Error) :- 3007 callable(Formal), 3008 !, 3009 functor(Formal, Code, _), 3010 Error = Error0.put(code, Code). 3011add_error_code(_, Error, Error). 3012 3013% What to do with large integers? 3014to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 3015to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 3016to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 3017to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location property if the error can be associated with a
source location. The location is an object with properties file
and line and, if available, the character location in the line.3026add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 3027 atom(Path), integer(Line), 3028 !, 3029 Term = Term0.put(_{location:_{file:Path, line:Line}}). 3030add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 3031 atom(Path), integer(Line), integer(Ch), 3032 !, 3033 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 3034add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More) and output(ID,
Term) into a format suitable for processing at the client side.3045%:- multifile pengines:event_to_json/3. 3046 3047 3048 /******************************* 3049 * ACCESS CONTROL * 3050 *******************************/
forbidden header if contact is not allowed.3057allowed(Request, Application) :- 3058 setting(Application:allow_from, Allow), 3059 match_peer(Request, Allow), 3060 setting(Application:deny_from, Deny), 3061 \+ match_peer(Request, Deny), 3062 !. 3063allowed(Request, _Application) :- 3064 memberchk(request_uri(Here), Request), 3065 throw(http_reply(forbidden(Here))). 3066 3067match_peer(_, Allowed) :- 3068 memberchk(*, Allowed), 3069 !. 3070match_peer(_, []) :- !, fail. 3071match_peer(Request, Allowed) :- 3072 http_peer(Request, Peer), 3073 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3074 ( memberchk(Peer, Allowed) 3075 -> true 3076 ; member(Pattern, Allowed), 3077 match_peer_pattern(Pattern, Peer) 3078 ). 3079 3080match_peer_pattern(Pattern, Peer) :- 3081 ip_term(Pattern, IP), 3082 ip_term(Peer, IP), 3083 !. 3084 3085ip_term(Peer, Pattern) :- 3086 split_string(Peer, ".", "", PartStrings), 3087 ip_pattern(PartStrings, Pattern). 3088 3089ip_pattern([], []). 3090ip_pattern([*], _) :- !. 3091ip_pattern([S|T0], [N|T]) :- 3092 number_string(N, S), 3093 ip_pattern(T0, T).
[user(User)], [] or
an exception.3101authenticate(Request, Application, UserOptions) :- 3102 authentication_hook(Request, Application, User), 3103 !, 3104 must_be(ground, User), 3105 UserOptions = [user(User)]. 3106authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path))))
Reject the request using a 403 repply.3128pengine_register_user(Options) :- 3129 option(user(User), Options), 3130 !, 3131 pengine_self(Me), 3132 asserta(pengine_user(Me, User)). 3133pengine_register_user(_).
3144pengine_user(User) :-
3145 pengine_self(Me),
3146 pengine_user(Me, User).3152reply_options(Request, Allowed) :- 3153 option(method(options), Request), 3154 !, 3155 cors_enable(Request, 3156 [ methods(Allowed) 3157 ]), 3158 format('Content-type: text/plain\r\n'), 3159 format('~n'). % empty body 3160 3161 3162 /******************************* 3163 * COMPILE SOURCE * 3164 *******************************/
3173pengine_src_text(Src, Module) :- 3174 pengine_self(Self), 3175 format(atom(ID), 'pengine://~w/src', [Self]), 3176 extra_load_options(Self, Options), 3177 setup_call_cleanup( 3178 open_chars_stream(Src, Stream), 3179 load_files(Module:ID, 3180 [ stream(Stream), 3181 module(Module), 3182 silent(true) 3183 | Options 3184 ]), 3185 close(Stream)), 3186 keep_source(Self, ID, Src). 3187 3188system'#file'(File, _Line) :- 3189 prolog_load_context(stream, Stream), 3190 set_stream(Stream, file_name(File)), 3191 set_stream(Stream, record_position(false)), 3192 set_stream(Stream, record_position(true)).
3202pengine_src_url(URL, Module) :- 3203 pengine_self(Self), 3204 uri_encoded(path, URL, Path), 3205 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3206 extra_load_options(Self, Options), 3207 ( get_pengine_application(Self, Application), 3208 setting(Application:debug_info, false) 3209 -> setup_call_cleanup( 3210 http_open(URL, Stream, []), 3211 ( set_stream(Stream, encoding(utf8)), 3212 load_files(Module:ID, 3213 [ stream(Stream), 3214 module(Module) 3215 | Options 3216 ]) 3217 ), 3218 close(Stream)) 3219 ; setup_call_cleanup( 3220 http_open(URL, TempStream, []), 3221 ( set_stream(TempStream, encoding(utf8)), 3222 read_string(TempStream, _, Src) 3223 ), 3224 close(TempStream)), 3225 setup_call_cleanup( 3226 open_chars_stream(Src, Stream), 3227 load_files(Module:ID, 3228 [ stream(Stream), 3229 module(Module) 3230 | Options 3231 ]), 3232 close(Stream)), 3233 keep_source(Self, ID, Src) 3234 ). 3235 3236 3237extra_load_options(Pengine, Options) :- 3238 pengine_not_sandboxed(Pengine), 3239 !, 3240 Options = []. 3241extra_load_options(_, [sandboxed(true)]). 3242 3243 3244keep_source(Pengine, ID, SrcText) :- 3245 get_pengine_application(Pengine, Application), 3246 setting(Application:debug_info, true), 3247 !, 3248 to_string(SrcText, SrcString), 3249 assertz(pengine_data(Pengine, source(ID, SrcString))). 3250keep_source(_, _, _). 3251 3252to_string(String, String) :- 3253 string(String), 3254 !. 3255to_string(Atom, String) :- 3256 atom_string(Atom, String), 3257 !. 3258 3259 /******************************* 3260 * SANDBOX * 3261 *******************************/ 3262 3263:- multifile 3264 sandbox:safe_primitive/1. 3265 3266sandbox:safe_primitive(pengines:pengine_input(_, _)). 3267sandbox:safe_primitive(pengines:pengine_output(_)). 3268sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3269 3270 3271 /******************************* 3272 * MESSAGES * 3273 *******************************/ 3274 3275prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3276 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3277 'This is normally caused by an insufficiently instantiated'-[], nl, 3278 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3279 'find all possible instantations of Var.'-[] 3280 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.