. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2024, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool), 94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri), 97 [ uri_components/2, 98 uri_query_components/2, 99 uri_data/3, 100 uri_data/4, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json_dict/1]). 115 116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119 120 121:- meta_predicate 122 pengine_create( ), 123 pengine_rpc( , , ), 124 pengine_event_loop( , ). 125 126:- multifile 127 write_result/3, % +Format, +Event, +Dict 128 event_to_json/3, % +Event, -JSON, +Format 129 prepare_module/3, % +Module, +Application, +Options 130 prepare_goal/3, % +GoalIn, -GoalOut, +Options 131 authentication_hook/3, % +Request, +Application, -User 132 not_sandboxed/2, % +User, +App 133 pengine_flush_output_hook/0. 134 135:- predicate_options(pengine_create/1, 1, 136 [ id(-atom), 137 alias(atom), 138 application(atom), 139 destroy(boolean), 140 server(atom), 141 ask(compound), 142 template(compound), 143 chunk(integer;oneof([false])), 144 bindings(list), 145 src_list(list), 146 src_text(any), % text 147 src_url(atom), 148 src_predicates(list) 149 ]). 150:- predicate_options(pengine_ask/3, 3, 151 [ template(any), 152 chunk(integer;oneof([false])), 153 bindings(list) 154 ]). 155:- predicate_options(pengine_next/2, 2, 156 [ chunk(integer), 157 pass_to(pengine_send/3, 3) 158 ]). 159:- predicate_options(pengine_stop/2, 2, 160 [ pass_to(pengine_send/3, 3) 161 ]). 162:- predicate_options(pengine_respond/3, 2, 163 [ pass_to(pengine_send/3, 3) 164 ]). 165:- predicate_options(pengine_rpc/3, 3, 166 [ chunk(integer;oneof([false])), 167 pass_to(pengine_create/1, 1) 168 ]). 169:- predicate_options(pengine_send/3, 3, 170 [ delay(number) 171 ]). 172:- predicate_options(pengine_event/2, 2, 173 [ listen(atom), 174 pass_to(system:thread_get_message/3, 3) 175 ]). 176:- predicate_options(pengine_pull_response/2, 2, 177 [ pass_to(http_open/3, 3) 178 ]). 179:- predicate_options(pengine_event_loop/2, 2, 180 []). % not yet implemented 181 182% :- debug(pengine(transition)). 183:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 184 185goal_expansion(random_delay, Expanded) :- 186 ( debugging(pengine(delay)) 187 -> Expanded = do_random_delay 188 ; Expanded = true 189 ). 190 191do_random_delay :- 192 Delay is random(20)/1000, 193 sleep(Delay). 194 195:- meta_predicate % internal meta predicates 196 solve( , , , ), 197 findnsols_no_empty( , , , ), 198 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
253pengine_create(M:Options0) :-
254 translate_local_sources(Options0, Options, M),
255 ( select_option(server(BaseURL), Options, RestOptions)
256 -> remote_pengine_create(BaseURL, RestOptions)
257 ; local_pengine_create(Options)
258 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
272translate_local_sources(OptionsIn, Options, Module) :- 273 translate_local_sources(OptionsIn, Sources, Options2, Module), 274 ( Sources == [] 275 -> Options = Options2 276 ; Sources = [Source] 277 -> Options = [src_text(Source)|Options2] 278 ; atomics_to_string(Sources, Source) 279 -> Options = [src_text(Source)|Options2] 280 ). 281 282translate_local_sources([], [], [], _). 283translate_local_sources([H0|T], [S0|S], Options, M) :- 284 nonvar(H0), 285 translate_local_source(H0, S0, M), 286 !, 287 translate_local_sources(T, S, Options, M). 288translate_local_sources([H|T0], S, [H|T], M) :- 289 translate_local_sources(T0, S, T, M). 290 291translate_local_source(src_predicates(PIs), Source, M) :- 292 must_be(list, PIs), 293 with_output_to(string(Source), 294 maplist(list_in_module(M), PIs)). 295translate_local_source(src_list(Terms), Source, _) :- 296 must_be(list, Terms), 297 with_output_to(string(Source), 298 forall(member(Term, Terms), 299 format('~k .~n', [Term]))). 300translate_local_source(src_text(Source), Source, _). 301 302list_in_module(M, PI) :- 303 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
310pengine_send(Target, Event) :-
311 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
326pengine_send(Target, Event, Options) :- 327 must_be(atom, Target), 328 pengine_send2(Target, Event, Options). 329 330pengine_send2(self, Event, Options) :- 331 !, 332 thread_self(Queue), 333 delay_message(queue(Queue), Event, Options). 334pengine_send2(Name, Event, Options) :- 335 child(Name, Target), 336 !, 337 delay_message(pengine(Target), Event, Options). 338pengine_send2(Target, Event, Options) :- 339 delay_message(pengine(Target), Event, Options). 340 341delay_message(Target, Event, Options) :- 342 option(delay(Delay), Options), 343 !, 344 alarm(Delay, 345 send_message(Target, Event, Options), 346 _AlarmID, 347 [remove(true)]). 348delay_message(Target, Event, Options) :- 349 random_delay, 350 send_message(Target, Event, Options). 351 352send_message(queue(Queue), Event, _) :- 353 thread_send_message(Queue, pengine_request(Event)). 354send_message(pengine(Pengine), Event, Options) :- 355 ( pengine_remote(Pengine, Server) 356 -> remote_pengine_send(Server, Pengine, Event, Options) 357 ; pengine_thread(Pengine, Thread) 358 -> thread_send_message(Thread, pengine_request(Event)) 359 ; existence_error(pengine, Pengine) 360 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.370pengine_request(Request) :- 371 thread_self(Me), 372 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 373 !. 374pengine_request(Request) :- 375 pengine_self(Self), 376 get_pengine_application(Self, Application), 377 setting(Application:idle_limit, IdleLimit0), 378 IdleLimit is IdleLimit0-1, 379 thread_self(Me), 380 ( thread_idle(thread_get_message(Me, pengine_request(Request), 381 [timeout(IdleLimit)]), 382 long) 383 -> true 384 ; Request = destroy 385 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
398pengine_reply(Event) :- 399 pengine_parent(Queue), 400 pengine_reply(Queue, Event). 401 402pengine_reply(_Queue, _Event0) :- 403 nb_current(pengine_idle_limit_exceeded, true), 404 !. 405pengine_reply(Queue, Event0) :- 406 arg(1, Event0, ID), 407 wrap_first_answer(ID, Event0, Event), 408 random_delay, 409 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 410 ( pengine_self(ID), 411 \+ pengine_detached(ID, _) 412 -> get_pengine_application(ID, Application), 413 setting(Application:idle_limit, IdleLimit), 414 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 415 ( thread_send_message(Queue, pengine_event(ID, Event), 416 [ timeout(IdleLimit) 417 ]) 418 -> true 419 ; thread_self(Me), 420 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 421 [ID, Me]), 422 nb_setval(pengine_idle_limit_exceeded, true), 423 thread_detach(Me), 424 abort 425 ) 426 ; thread_send_message(Queue, pengine_event(ID, Event)) 427 ). 428 429wrap_first_answer(ID, Event0, CreateEvent) :- 430 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 431 arg(1, CreateEvent, ID), 432 !, 433 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 434wrap_first_answer(_ID, Event, Event). 435 436 437empty_queue :- 438 pengine_parent(Queue), 439 empty_queue(Queue, 0, Discarded), 440 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 441 442empty_queue(Queue, C0, C) :- 443 thread_get_message(Queue, _Term, [timeout(0)]), 444 !, 445 C1 is C0+1, 446 empty_queue(Queue, C1, C). 447empty_queue(_, C, C).
Options is a list of options:
false
, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive)
.Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
516pengine_ask(ID, Query, Options) :- 517 partition(pengine_ask_option, Options, AskOptions, SendOptions), 518 pengine_send(ID, ask(Query, AskOptions), SendOptions). 519 520 521pengine_ask_option(template(_)). 522pengine_ask_option(chunk(_)). 523pengine_ask_option(bindings(_)). 524pengine_ask_option(breakpoints(_)).
chunk(false)
.Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
569pengine_next(ID, Options) :- 570 select_option(chunk(Count), Options, Options1), 571 !, 572 pengine_send(ID, next(Count), Options1). 573pengine_next(ID, Options) :- 574 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
601pengine_abort(Name) :-
602 ( child(Name, Pengine)
603 -> true
604 ; Pengine = Name
605 ),
606 ( pengine_remote(Pengine, Server)
607 -> remote_pengine_abort(Server, Pengine, [])
608 ; pengine_thread(Pengine, Thread),
609 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
610 catch(thread_signal(Thread, throw(abort_query)), _, true)
611 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/621pengine_destroy(ID) :- 622 pengine_destroy(ID, []). 623 624pengine_destroy(Name, Options) :- 625 ( child(Name, ID) 626 -> true 627 ; ID = Name 628 ), 629 option(force(true), Options), 630 !, 631 ( pengine_thread(ID, Thread) 632 -> catch(thread_signal(Thread, abort), 633 error(existence_error(thread, _), _), true) 634 ; true 635 ). 636pengine_destroy(ID, Options) :- 637 catch(pengine_send(ID, destroy, Options), 638 error(existence_error(pengine, ID), _), 639 retractall(child(_,ID))). 640 641 642/*================= pengines administration ======================= 643*/
thread(ThreadId)
remote(URL)
654:- dynamic 655 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 656 pengine_queue/4, % Id, Queue, TimeOut, Time 657 output_queue/3, % Id, Queue, Time 658 pengine_user/2, % Id, User 659 pengine_data/2, % Id, Data 660 pengine_detached/2. % Id, Data 661:- volatile 662 current_pengine/6, 663 pengine_queue/4, 664 output_queue/3, 665 pengine_user/2, 666 pengine_data/2, 667 pengine_detached/2. 668 669:- thread_local 670 child/2. % ?Name, ?Child
676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 677 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 678 679pengine_register_remote(Id, URL, Application, Destroy) :- 680 thread_self(Queue), 681 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.689pengine_unregister(Id) :- 690 thread_self(Me), 691 ( current_pengine(Id, Queue, Me, http, _, _) 692 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 693 ; true 694 ), 695 retractall(current_pengine(Id, _, Me, _, _, _)), 696 retractall(pengine_user(Id, _)), 697 retractall(pengine_data(Id, _)). 698 699pengine_unregister_remote(Id) :- 700 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
706pengine_self(Id) :- 707 thread_self(Thread), 708 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 709 710pengine_parent(Parent) :- 711 nb_getval(pengine_parent, Parent). 712 713pengine_thread(Pengine, Thread) :- 714 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 715 Thread \== 0, 716 !. 717 718pengine_remote(Pengine, URL) :- 719 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 720 721get_pengine_application(Pengine, Application) :- 722 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 723 !. 724 725get_pengine_module(Pengine, Pengine). 726 727:- if(current_predicate(uuid/2)). 728pengine_uuid(Id) :- 729 uuid(Id, [version(4)]). % Version 4 is random. 730:- else. 731pengine_uuid(Id) :- 732 ( current_prolog_flag(max_integer, Max1) 733 -> Max is Max1-1 734 ; Max is 1<<128 735 ), 736 random_between(0, Max, Num), 737 atom_number(Id, Num). 738:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
755:- meta_predicate protect_pengine( , ). 756 757protect_pengine(Id, Goal) :- 758 term_hash(Id, Hash), 759 LockN is Hash mod 64, 760 atom_concat(pengine_done_, LockN, Lock), 761 with_mutex(Lock, 762 ( pengine_thread(Id, _) 763 -> Goal 764 ; Goal 765 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
782pengine_application(Application) :- 783 throw(error(context_error(nodirective, 784 pengine_application(Application)), _)). 785 786:- multifile 787 system:term_expansion/2, 788 current_application/1.
796current_pengine_application(Application) :- 797 current_application(Application). 798 799 800% Default settings for all applications 801 802:- setting(thread_pool_size, integer, 100, 803 'Maximum number of pengines this application can run.'). 804:- setting(thread_pool_stacks, list(compound), [], 805 'Maximum stack sizes for pengines this application can run.'). 806:- setting(slave_limit, integer, 3, 807 'Maximum number of slave pengines a master pengine can create.'). 808:- setting(time_limit, number, 300, 809 'Maximum time to wait for output'). 810:- setting(idle_limit, number, 300, 811 'Pengine auto-destroys when idle for this time'). 812:- setting(safe_goal_limit, number, 10, 813 'Maximum time to try proving safety of the goal'). 814:- setting(program_space, integer, 100_000_000, 815 'Maximum memory used by predicates'). 816:- setting(allow_from, list(atom), [*], 817 'IP addresses from which remotes are allowed to connect'). 818:- setting(deny_from, list(atom), [], 819 'IP addresses from which remotes are NOT allowed to connect'). 820:- setting(debug_info, boolean, false, 821 'Keep information to support source-level debugging'). 822 823 824systemterm_expansion((:- pengine_application(Application)), Expanded) :- 825 must_be(atom, Application), 826 ( module_property(Application, file(_)) 827 -> permission_error(create, pengine_application, Application) 828 ; true 829 ), 830 expand_term((:- setting(Application:thread_pool_size, integer, 831 setting(pengines:thread_pool_size), 832 'Maximum number of pengines this \c 833 application can run.')), 834 ThreadPoolSizeSetting), 835 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 836 setting(pengines:thread_pool_stacks), 837 'Maximum stack sizes for pengines \c 838 this application can run.')), 839 ThreadPoolStacksSetting), 840 expand_term((:- setting(Application:slave_limit, integer, 841 setting(pengines:slave_limit), 842 'Maximum number of local slave pengines \c 843 a master pengine can create.')), 844 SlaveLimitSetting), 845 expand_term((:- setting(Application:time_limit, number, 846 setting(pengines:time_limit), 847 'Maximum time to wait for output')), 848 TimeLimitSetting), 849 expand_term((:- setting(Application:idle_limit, number, 850 setting(pengines:idle_limit), 851 'Pengine auto-destroys when idle for this time')), 852 IdleLimitSetting), 853 expand_term((:- setting(Application:safe_goal_limit, number, 854 setting(pengines:safe_goal_limit), 855 'Maximum time to try proving safety of the goal')), 856 SafeGoalLimitSetting), 857 expand_term((:- setting(Application:program_space, integer, 858 setting(pengines:program_space), 859 'Maximum memory used by predicates')), 860 ProgramSpaceSetting), 861 expand_term((:- setting(Application:allow_from, list(atom), 862 setting(pengines:allow_from), 863 'IP addresses from which remotes are allowed \c 864 to connect')), 865 AllowFromSetting), 866 expand_term((:- setting(Application:deny_from, list(atom), 867 setting(pengines:deny_from), 868 'IP addresses from which remotes are NOT \c 869 allowed to connect')), 870 DenyFromSetting), 871 expand_term((:- setting(Application:debug_info, boolean, 872 setting(pengines:debug_info), 873 'Keep information to support source-level \c 874 debugging')), 875 DebugInfoSetting), 876 flatten([ pengines:current_application(Application), 877 ThreadPoolSizeSetting, 878 ThreadPoolStacksSetting, 879 SlaveLimitSetting, 880 TimeLimitSetting, 881 IdleLimitSetting, 882 SafeGoalLimitSetting, 883 ProgramSpaceSetting, 884 AllowFromSetting, 885 DenyFromSetting, 886 DebugInfoSetting 887 ], Expanded). 888 889% Register default application 890 891:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.928pengine_property(Id, Prop) :- 929 nonvar(Id), nonvar(Prop), 930 pengine_property2(Prop, Id), 931 !. 932pengine_property(Id, Prop) :- 933 pengine_property2(Prop, Id). 934 935pengine_property2(self(Id), Id) :- 936 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 937pengine_property2(module(Id), Id) :- 938 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 939pengine_property2(alias(Alias), Id) :- 940 child(Alias, Id), 941 Alias \== Id. 942pengine_property2(thread(Thread), Id) :- 943 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 944 Thread \== 0. 945pengine_property2(remote(Server), Id) :- 946 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 947pengine_property2(application(Application), Id) :- 948 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 949pengine_property2(destroy(Destroy), Id) :- 950 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 951pengine_property2(parent(Parent), Id) :- 952 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 953pengine_property2(source(SourceID, Source), Id) :- 954 pengine_data(Id, source(SourceID, Source)). 955pengine_property2(detached(When), Id) :- 956 pengine_detached(Id, When).
963pengine_output(Term) :-
964 pengine_self(Me),
965 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
980pengine_debug(Format, Args) :- 981 pengine_parent(Queue), 982 pengine_self(Self), 983 catch(safe_goal(format(atom(_), Format, Args)), E, true), 984 ( var(E) 985 -> format(atom(Message), Format, Args) 986 ; message_to_string(E, Message) 987 ), 988 pengine_reply(Queue, debug(Self, Message)). 989 990 991/*================= Local pengine ======================= 992*/
1003local_pengine_create(Options) :-
1004 thread_self(Self),
1005 option(application(Application), Options, pengine_sandbox),
1006 create(Self, Child, Options, local, Application),
1007 option(alias(Name), Options, Child),
1008 assert(child(Name, Child)).
1015:- multifile thread_pool:create_pool/1. 1016 1017thread_poolcreate_pool(Application) :- 1018 current_application(Application), 1019 setting(Application:thread_pool_size, Size), 1020 setting(Application:thread_pool_stacks, Stacks), 1021 thread_pool_create(Application, Size, Stacks).
1031create(Queue, Child, Options, local, Application) :- 1032 !, 1033 pengine_child_id(Child), 1034 create0(Queue, Child, Options, local, Application). 1035create(Queue, Child, Options, URL, Application) :- 1036 pengine_child_id(Child), 1037 catch(create0(Queue, Child, Options, URL, Application), 1038 Error, 1039 create_error(Queue, Child, Error)). 1040 1041pengine_child_id(Child) :- 1042 ( nonvar(Child) 1043 -> true 1044 ; pengine_uuid(Child) 1045 ). 1046 1047create_error(Queue, Child, Error) :- 1048 pengine_reply(Queue, error(Child, Error)). 1049 1050create0(Queue, Child, Options, URL, Application) :- 1051 ( current_application(Application) 1052 -> true 1053 ; existence_error(pengine_application, Application) 1054 ), 1055 ( URL \== http % pengine is _not_ a child of the 1056 % HTTP server thread 1057 -> aggregate_all(count, child(_,_), Count), 1058 setting(Application:slave_limit, Max), 1059 ( Count >= Max 1060 -> throw(error(resource_error(max_pengines), _)) 1061 ; true 1062 ) 1063 ; true 1064 ), 1065 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1066 thread_create_in_pool( 1067 Application, 1068 pengine_main(Queue, PengineOptions, Application), ChildThread, 1069 [ at_exit(pengine_done) 1070 | RestOptions 1071 ]), 1072 option(destroy(Destroy), PengineOptions, true), 1073 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1074 thread_send_message(ChildThread, pengine_registered(Child)), 1075 ( option(id(Id), Options) 1076 -> Id = Child 1077 ; true 1078 ). 1079 1080pengine_create_option(src_text(_)). 1081pengine_create_option(src_url(_)). 1082pengine_create_option(application(_)). 1083pengine_create_option(destroy(_)). 1084pengine_create_option(ask(_)). 1085pengine_create_option(template(_)). 1086pengine_create_option(bindings(_)). 1087pengine_create_option(chunk(_)). 1088pengine_create_option(alias(_)). 1089pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1098:- public 1099 pengine_done/0. 1100 1101pengine_done :- 1102 thread_self(Me), 1103 ( thread_property(Me, status(exception('$aborted'))), 1104 thread_detach(Me), 1105 pengine_self(Pengine) 1106 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1107 error(_,_), true) 1108 ; true 1109 ), 1110 forall(child(_Name, Child), 1111 pengine_destroy(Child)), 1112 pengine_self(Id), 1113 protect_pengine(Id, pengine_unregister(Id)).
1121:- thread_local wrap_first_answer_in_create_event/2. 1122 1123:- meta_predicate 1124 pengine_prepare_source( , ). 1125 1126pengine_main(Parent, Options, Application) :- 1127 fix_streams, 1128 thread_get_message(pengine_registered(Self)), 1129 nb_setval(pengine_parent, Parent), 1130 pengine_register_user(Options), 1131 set_prolog_flag(mitigate_spectre, true), 1132 catch(in_temporary_module( 1133 Self, 1134 pengine_prepare_source(Application, Options), 1135 pengine_create_and_loop(Self, Application, Options)), 1136 prepare_source_failed, 1137 pengine_terminate(Self)). 1138 1139pengine_create_and_loop(Self, Application, Options) :- 1140 setting(Application:slave_limit, SlaveLimit), 1141 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1142 ( option(ask(Query0), Options) 1143 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1144 ( string(Query0) % string is not callable 1145 -> ( option(template(TemplateS), Options) 1146 -> Ask2 = Query0-TemplateS 1147 ; Ask2 = Query0 1148 ), 1149 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1150 Error, true), 1151 ( var(Error) 1152 -> true 1153 ; send_error(Error), 1154 throw(prepare_source_failed) 1155 ) 1156 ; Query = Query0, 1157 option(template(Template), Options, Query), 1158 option(bindings(Bindings), Options, []) 1159 ), 1160 option(chunk(Chunk), Options, 1), 1161 pengine_ask(Self, Query, 1162 [ template(Template), 1163 chunk(Chunk), 1164 bindings(Bindings) 1165 ]) 1166 ; Extra = [], 1167 pengine_reply(CreateEvent) 1168 ), 1169 pengine_main_loop(Self).
1179ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1180 !, 1181 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1182 term_string(t(Template1,Ask1), AskTemplate, 1183 [ variable_names(Bindings0), 1184 module(Module) 1185 ]), 1186 phrase(template_bindings(Template1, Bindings0), Bindings). 1187ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1188 term_string(Ask1, Ask, 1189 [ variable_names(Bindings), 1190 module(Module) 1191 ]), 1192 exclude(anon, Bindings, Bindings1), 1193 dict_create(Template, swish_default_template, Bindings1). 1194 1195template_bindings(Var, Bindings) --> 1196 { var(Var) }, !, 1197 ( { var_binding(Bindings, Var, Binding) 1198 } 1199 -> [Binding] 1200 ; [] 1201 ). 1202template_bindings([H|T], Bindings) --> 1203 !, 1204 template_bindings(H, Bindings), 1205 template_bindings(T, Bindings). 1206template_bindings(Compoound, Bindings) --> 1207 { compound(Compoound), !, 1208 compound_name_arguments(Compoound, _, Args) 1209 }, 1210 template_bindings(Args, Bindings). 1211template_bindings(_, _) --> []. 1212 1213var_binding(Bindings, Var, Binding) :- 1214 member(Binding, Bindings), 1215 arg(2, Binding, V), 1216 V == Var, !.
1223fix_streams :- 1224 fix_stream(current_output). 1225 1226fix_stream(Name) :- 1227 is_cgi_stream(Name), 1228 !, 1229 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1230 set_stream(user_output, alias(Name)). 1231fix_stream(_).
1240pengine_prepare_source(Module:Application, Options) :- 1241 setting(Application:program_space, SpaceLimit), 1242 set_module(Module:program_space(SpaceLimit)), 1243 delete_import_module(Module, user), 1244 add_import_module(Module, Application, start), 1245 catch(prep_module(Module, Application, Options), Error, true), 1246 ( var(Error) 1247 -> true 1248 ; send_error(Error), 1249 throw(prepare_source_failed) 1250 ). 1251 1252prep_module(Module, Application, Options) :- 1253 maplist(copy_flag(Module, Application), [var_prefix]), 1254 forall(prepare_module(Module, Application, Options), true), 1255 setup_call_cleanup( 1256 '$set_source_module'(OldModule, Module), 1257 maplist(process_create_option(Module), Options), 1258 '$set_source_module'(OldModule)). 1259 1260copy_flag(Module, Application, Flag) :- 1261 current_prolog_flag(ApplicationFlag, Value), 1262 !, 1263 set_prolog_flag(ModuleFlag, Value). 1264copy_flag(_, _, _). 1265 1266process_create_option(Application, src_text(Text)) :- 1267 !, 1268 pengine_src_text(Text, Application). 1269process_create_option(Application, src_url(URL)) :- 1270 !, 1271 pengine_src_url(URL, Application). 1272process_create_option(_, _).
src_text
and
src_url
options1295pengine_main_loop(ID) :- 1296 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1297 1298pengine_aborted(ID) :- 1299 thread_self(Self), 1300 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1301 empty_queue, 1302 destroy_or_continue(abort(ID)).
1315guarded_main_loop(ID) :- 1316 pengine_request(Request), 1317 ( Request = destroy 1318 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1319 pengine_terminate(ID) 1320 ; Request = ask(Goal, Options) 1321 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1322 ask(ID, Goal, Options) 1323 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1324 pengine_reply(error(ID, error(protocol_error, _))), 1325 guarded_main_loop(ID) 1326 ). 1327 1328 1329pengine_terminate(ID) :- 1330 pengine_reply(destroy(ID)), 1331 thread_self(Me), % Make the thread silently disappear 1332 thread_detach(Me).
1343solve(Chunk, Template, Goal, ID) :- 1344 prolog_current_choice(Choice), 1345 ( integer(Chunk) 1346 -> State = count(Chunk) 1347 ; Chunk == false 1348 -> State = no_chunk 1349 ; domain_error(chunk, Chunk) 1350 ), 1351 statistics(cputime, Epoch), 1352 Time = time(Epoch), 1353 nb_current('$variable_names', Bindings), 1354 filter_template(Template, Bindings, Template2), 1355 '$current_typein_module'(CurrTypeIn), 1356 ( '$set_typein_module'(ID), 1357 call_cleanup(catch(findnsols_no_empty(State, Template2, 1358 set_projection(Goal, Bindings), 1359 Result), 1360 Error, true), 1361 query_done(Det, CurrTypeIn)), 1362 arg(1, Time, T0), 1363 statistics(cputime, T1), 1364 CPUTime is T1-T0, 1365 forall(pengine_flush_output_hook, true), 1366 ( var(Error) 1367 -> projection(Projection), 1368 ( var(Det) 1369 -> pengine_reply(success(ID, Result, Projection, 1370 CPUTime, true)), 1371 more_solutions(ID, Choice, State, Time) 1372 ; !, % commit 1373 destroy_or_continue(success(ID, Result, Projection, 1374 CPUTime, false)) 1375 ) 1376 ; !, % commit 1377 ( Error == abort_query 1378 -> throw(Error) 1379 ; destroy_or_continue(error(ID, Error)) 1380 ) 1381 ) 1382 ; !, % commit 1383 arg(1, Time, T0), 1384 statistics(cputime, T1), 1385 CPUTime is T1-T0, 1386 destroy_or_continue(failure(ID, CPUTime)) 1387 ). 1388solve(_, _, _, _). % leave a choice point 1389 1390query_done(true, CurrTypeIn) :- 1391 '$set_typein_module'(CurrTypeIn).
1400set_projection(Goal, Bindings) :- 1401 b_setval('$variable_names', Bindings), 1402 call(Goal). 1403 1404projection(Projection) :- 1405 nb_current('$variable_names', Bindings), 1406 !, 1407 maplist(var_name, Bindings, Projection). 1408projection([]).
1418filter_template(Template0, Bindings, Template) :- 1419 is_dict(Template0, swish_default_template), 1420 !, 1421 dict_create(Template, swish_default_template, Bindings). 1422filter_template(Template, _Bindings, Template). 1423 1424findnsols_no_empty(no_chunk, Template, Goal, List) => 1425 List = [Template], 1426 call(Goal). 1427findnsols_no_empty(State, Template, Goal, List) => 1428 findnsols(State, Template, Goal, List), 1429 List \== []. 1430 1431destroy_or_continue(Event) :- 1432 arg(1, Event, ID), 1433 ( pengine_property(ID, destroy(true)) 1434 -> thread_self(Me), 1435 thread_detach(Me), 1436 pengine_reply(destroy(ID, Event)) 1437 ; pengine_reply(Event), 1438 guarded_main_loop(ID) 1439 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1457more_solutions(ID, Choice, State, Time) :- 1458 pengine_request(Event), 1459 more_solutions(Event, ID, Choice, State, Time). 1460 1461more_solutions(stop, ID, _Choice, _State, _Time) :- 1462 !, 1463 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1464 destroy_or_continue(stop(ID)). 1465more_solutions(next, ID, _Choice, _State, Time) :- 1466 !, 1467 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1468 statistics(cputime, T0), 1469 nb_setarg(1, Time, T0), 1470 fail. 1471more_solutions(next(Count), ID, _Choice, State, Time) :- 1472 Count > 0, 1473 State = count(_), % else fallthrough to protocol error 1474 !, 1475 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1476 nb_setarg(1, State, Count), 1477 statistics(cputime, T0), 1478 nb_setarg(1, Time, T0), 1479 fail. 1480more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1481 !, 1482 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1483 prolog_cut_to(Choice), 1484 ask(ID, Goal, Options). 1485more_solutions(destroy, ID, _Choice, _State, _Time) :- 1486 !, 1487 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1488 pengine_terminate(ID). 1489more_solutions(Event, ID, Choice, State, Time) :- 1490 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1491 pengine_reply(error(ID, error(protocol_error, _))), 1492 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1500ask(ID, Goal, Options) :-
1501 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1502 !,
1503 ( var(Error)
1504 -> option(template(Template), Options, Goal),
1505 option(chunk(N), Options, 1),
1506 solve(N, Template, Goal1, ID)
1507 ; pengine_reply(error(ID, Error)),
1508 guarded_main_loop(ID)
1509 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1523prepare_goal(ID, Goal0, Module:Goal, Options) :-
1524 option(bindings(Bindings), Options, []),
1525 b_setval('$variable_names', Bindings),
1526 ( prepare_goal(Goal0, Goal1, Options)
1527 -> true
1528 ; Goal1 = Goal0
1529 ),
1530 get_pengine_module(ID, Module),
1531 setup_call_cleanup(
1532 '$set_source_module'(Old, Module),
1533 expand_goal(Goal1, Goal),
1534 '$set_source_module'(_, Old)),
1535 ( pengine_not_sandboxed(ID)
1536 -> true
1537 ; get_pengine_application(ID, App),
1538 setting(App:safe_goal_limit, Limit),
1539 catch(call_with_time_limit(
1540 Limit,
1541 safe_goal(Module:Goal)), E, true)
1542 -> ( var(E)
1543 -> true
1544 ; E = time_limit_exceeded
1545 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1546 ; throw(E)
1547 )
1548 ).
not_sandboxed(User, Application)
must succeed.
1568pengine_not_sandboxed(ID) :-
1569 pengine_user(ID, User),
1570 pengine_property(ID, application(App)),
1571 not_sandboxed(User, App),
1572 !.
1594pengine_pull_response(Pengine, Options) :- 1595 pengine_remote(Pengine, Server), 1596 !, 1597 remote_pengine_pull_response(Server, Pengine, Options). 1598pengine_pull_response(_ID, _Options).
1607pengine_input(Prompt, Term) :-
1608 pengine_self(Self),
1609 pengine_parent(Parent),
1610 pengine_reply(Parent, prompt(Self, Prompt)),
1611 pengine_request(Request),
1612 ( Request = input(Input)
1613 -> Term = Input
1614 ; Request == destroy
1615 -> abort
1616 ; throw(error(protocol_error,_))
1617 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1634pengine_respond(Pengine, Input, Options) :-
1635 pengine_send(Pengine, input(Input), Options).
1644send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1645 is_list(Frames), 1646 !, 1647 with_output_to(string(Stack), 1648 print_prolog_backtrace(current_output, Frames)), 1649 pengine_self(Self), 1650 replace_blobs(Formal, Formal1), 1651 replace_blobs(Message, Message1), 1652 pengine_reply(error(Self, error(Formal1, 1653 context(prolog_stack(Stack), Message1)))). 1654send_error(Error) :- 1655 pengine_self(Self), 1656 replace_blobs(Error, Error1), 1657 pengine_reply(error(Self, Error1)).
1665replace_blobs(Blob, Atom) :- 1666 blob(Blob, Type), Type \== text, 1667 !, 1668 format(atom(Atom), '~p', [Blob]). 1669replace_blobs(Term0, Term) :- 1670 compound(Term0), 1671 !, 1672 compound_name_arguments(Term0, Name, Args0), 1673 maplist(replace_blobs, Args0, Args), 1674 compound_name_arguments(Term, Name, Args). 1675replace_blobs(Term, Term). 1676 1677 1678/*================= Remote pengines ======================= 1679*/ 1680 1681 1682remote_pengine_create(BaseURL, Options) :- 1683 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1684 ( option(ask(Query), PengineOptions0), 1685 \+ option(template(_Template), PengineOptions0) 1686 -> PengineOptions = [template(Query)|PengineOptions0] 1687 ; PengineOptions = PengineOptions0 1688 ), 1689 options_to_dict(PengineOptions, PostData), 1690 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1691 arg(1, Reply, ID), 1692 ( option(id(ID2), Options) 1693 -> ID = ID2 1694 ; true 1695 ), 1696 option(alias(Name), Options, ID), 1697 assert(child(Name, ID)), 1698 ( ( functor(Reply, create, _) % actually created 1699 ; functor(Reply, output, _) % compiler messages 1700 ) 1701 -> option(application(Application), PengineOptions, pengine_sandbox), 1702 option(destroy(Destroy), PengineOptions, true), 1703 pengine_register_remote(ID, BaseURL, Application, Destroy) 1704 ; true 1705 ), 1706 thread_self(Queue), 1707 pengine_reply(Queue, Reply). 1708 1709options_to_dict(Options, Dict) :- 1710 select_option(ask(Ask), Options, Options1), 1711 select_option(template(Template), Options1, Options2), 1712 !, 1713 no_numbered_var_in(Ask+Template), 1714 findall(AskString-TemplateString, 1715 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1716 [ AskString-TemplateString ]), 1717 options_to_dict(Options2, Dict0), 1718 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1719options_to_dict(Options, Dict) :- 1720 maplist(prolog_option, Options, Options1), 1721 dict_create(Dict, _, Options1). 1722 1723no_numbered_var_in(Term) :- 1724 sub_term(Sub, Term), 1725 subsumes_term('$VAR'(_), Sub), 1726 !, 1727 domain_error(numbered_vars_free_term, Term). 1728no_numbered_var_in(_). 1729 1730ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1731 numbervars(Ask+Template, 0, _), 1732 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1733 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1734 Template, WOpts 1735 ]), 1736 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1737 1738prolog_option(Option0, Option) :- 1739 create_option_type(Option0, term), 1740 !, 1741 Option0 =.. [Name,Value], 1742 format(string(String), '~k', [Value]), 1743 Option =.. [Name,String]. 1744prolog_option(Option, Option). 1745 1746create_option_type(ask(_), term). 1747create_option_type(template(_), term). 1748create_option_type(application(_), atom). 1749 1750remote_pengine_send(BaseURL, ID, Event, Options) :- 1751 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1752 thread_self(Queue), 1753 pengine_reply(Queue, Reply). 1754 1755remote_pengine_pull_response(BaseURL, ID, Options) :- 1756 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1757 thread_self(Queue), 1758 pengine_reply(Queue, Reply). 1759 1760remote_pengine_abort(BaseURL, ID, Options) :- 1761 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1762 thread_self(Queue), 1763 pengine_reply(Queue, Reply).
1770remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1771 !, 1772 server_url(Server, Action, [id=ID], URL), 1773 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1774 [ post(prolog(Event)) % makes it impossible to interrupt. 1775 | Options 1776 ]), 1777 call_cleanup( 1778 read_prolog_reply(Stream, Reply), 1779 close(Stream)). 1780remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1781 server_url(Server, Action, [id=ID|Params], URL), 1782 http_open(URL, Stream, Options), 1783 call_cleanup( 1784 read_prolog_reply(Stream, Reply), 1785 close(Stream)). 1786 1787remote_post_rec(Server, Action, Data, Reply, Options) :- 1788 server_url(Server, Action, [], URL), 1789 probe(Action, URL, Options), 1790 http_open(URL, Stream, 1791 [ post(json(Data)) 1792 | Options 1793 ]), 1794 call_cleanup( 1795 read_prolog_reply(Stream, Reply), 1796 close(Stream)).
1804probe(create, URL, Options) :- 1805 !, 1806 http_open(URL, Stream, [method(options)|Options]), 1807 close(Stream). 1808probe(_, _, _). 1809 1810read_prolog_reply(In, Reply) :- 1811 set_stream(In, encoding(utf8)), 1812 read(In, Reply0), 1813 rebind_cycles(Reply0, Reply). 1814 1815rebind_cycles(@(Reply, Bindings), Reply) :- 1816 is_list(Bindings), 1817 !, 1818 maplist(bind, Bindings). 1819rebind_cycles(Reply, Reply). 1820 1821bind(Var = Value) :- 1822 Var = Value. 1823 1824server_url(Server, Action, Params, URL) :- 1825 atom_concat('pengine/', Action, PAction), 1826 uri_edit([ path(PAction), 1827 search(Params) 1828 ], Server, URL).
Valid options are:
timeout
.1849pengine_event(Event) :- 1850 pengine_event(Event, []). 1851 1852pengine_event(Event, Options) :- 1853 thread_self(Self), 1854 option(listen(Id), Options, _), 1855 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1856 -> true 1857 ; Event = timeout 1858 ), 1859 update_remote_destroy(Event). 1860 1861update_remote_destroy(Event) :- 1862 destroy_event(Event), 1863 arg(1, Event, Id), 1864 pengine_remote(Id, _Server), 1865 !, 1866 pengine_unregister_remote(Id). 1867update_remote_destroy(_). 1868 1869destroy_event(destroy(_)). 1870destroy_event(destroy(_,_)). 1871destroy_event(create(_,Features)) :- 1872 memberchk(answer(Answer), Features), 1873 !, 1874 nonvar(Answer), 1875 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1904pengine_event_loop(Closure, Options) :- 1905 child(_,_), 1906 !, 1907 pengine_event(Event), 1908 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1909 -> forall(child(_,ID), pengine_send(ID, Event)) 1910 ; true 1911 ), 1912 pengine_event_loop(Event, Closure, Options). 1913pengine_event_loop(_, _). 1914 1915:- meta_predicate 1916 pengine_process_event( , , , ). 1917 1918pengine_event_loop(Event, Closure, Options) :- 1919 pengine_process_event(Event, Closure, Continue, Options), 1920 ( Continue == true 1921 -> pengine_event_loop(Closure, Options) 1922 ; true 1923 ). 1924 1925pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1926 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1927 ( select(answer(First), T, T1) 1928 -> ignore(call(Closure, create(ID, T1))), 1929 pengine_process_event(First, Closure, Continue, Options) 1930 ; ignore(call(Closure, create(ID, T))), 1931 Continue = true 1932 ). 1933pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1934 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1935 ignore(call(Closure, output(ID, Msg))), 1936 pengine_pull_response(ID, []). 1937pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1938 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1939 ignore(call(Closure, debug(ID, Msg))), 1940 pengine_pull_response(ID, []). 1941pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1942 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1943 ignore(call(Closure, prompt(ID, Term))). 1944pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1945 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1946 ignore(call(Closure, success(ID, Sol, More))). 1947pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1948 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1949 ignore(call(Closure, failure(ID))). 1950pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1951 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1952 ( call(Closure, error(ID, Error)) 1953 -> Continue = true 1954 ; forall(child(_,Child), pengine_destroy(Child)), 1955 throw(Error) 1956 ). 1957pengine_process_event(stop(ID), Closure, true, _Options) :- 1958 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1959 ignore(call(Closure, stop(ID))). 1960pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1961 pengine_process_event(Event, Closure, _, Options), 1962 pengine_process_event(destroy(ID), Closure, Continue, Options). 1963pengine_process_event(destroy(ID), Closure, true, _Options) :- 1964 retractall(child(_,ID)), 1965 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1966 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1995pengine_rpc(URL, Query) :- 1996 pengine_rpc(URL, Query, []). 1997 1998pengine_rpc(URL, Query, M:Options0) :- 1999 translate_local_sources(Options0, Options1, M), 2000 ( option(timeout(_), Options1) 2001 -> Options = Options1 2002 ; setting(time_limit, Limit), 2003 Options = [timeout(Limit)|Options1] 2004 ), 2005 term_variables(Query, Vars), 2006 Template =.. [v|Vars], 2007 State = destroy(true), % modified by process_event/4 2008 setup_call_catcher_cleanup( 2009 pengine_create([ ask(Query), 2010 template(Template), 2011 server(URL), 2012 id(Id) 2013 | Options 2014 ]), 2015 wait_event(Template, State, [listen(Id)|Options]), 2016 Why, 2017 pengine_destroy_and_wait(State, Id, Why, Options)). 2018 2019pengine_destroy_and_wait(destroy(true), Id, Why, Options) :- 2020 !, 2021 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2022 pengine_destroy(Id, Options), 2023 wait_destroy(Id, 10). 2024pengine_destroy_and_wait(_, _, Why, _) :- 2025 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2026 2027wait_destroy(Id, _) :- 2028 \+ child(_, Id), 2029 !. 2030wait_destroy(Id, N) :- 2031 pengine_event(Event, [listen(Id),timeout(10)]), 2032 !, 2033 ( destroy_event(Event) 2034 -> retractall(child(_,Id)) 2035 ; succ(N1, N) 2036 -> wait_destroy(Id, N1) 2037 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2038 pengine_unregister_remote(Id), 2039 retractall(child(_,Id)) 2040 ). 2041 2042wait_event(Template, State, Options) :- 2043 pengine_event(Event, Options), 2044 debug(pengine(event), 'Received ~p', [Event]), 2045 process_event(Event, Template, State, Options). 2046 2047process_event(create(_ID, Features), Template, State, Options) :- 2048 memberchk(answer(First), Features), 2049 process_event(First, Template, State, Options). 2050process_event(error(_ID, Error), _Template, _, _Options) :- 2051 throw(Error). 2052process_event(failure(_ID, _Time), _Template, _, _Options) :- 2053 fail. 2054process_event(prompt(ID, Prompt), Template, State, Options) :- 2055 pengine_rpc_prompt(ID, Prompt, Reply), 2056 pengine_send(ID, input(Reply)), 2057 wait_event(Template, State, Options). 2058process_event(output(ID, Term), Template, State, Options) :- 2059 pengine_rpc_output(ID, Term), 2060 pengine_pull_response(ID, Options), 2061 wait_event(Template, State, Options). 2062process_event(debug(ID, Message), Template, State, Options) :- 2063 debug(pengine(debug), '~w', [Message]), 2064 pengine_pull_response(ID, Options), 2065 wait_event(Template, State, Options). 2066process_event(success(_ID, Solutions, _Proj, _Time, false), 2067 Template, _, _Options) :- 2068 !, 2069 member(Template, Solutions). 2070process_event(success(ID, Solutions, _Proj, _Time, true), 2071 Template, State, Options) :- 2072 ( member(Template, Solutions) 2073 ; pengine_next(ID, Options), 2074 wait_event(Template, State, Options) 2075 ). 2076process_event(destroy(ID, Event), Template, State, Options) :- 2077 !, 2078 retractall(child(_,ID)), 2079 nb_setarg(1, State, false), 2080 debug(pengine(destroy), 'State: ~p~n', [State]), 2081 process_event(Event, Template, State, Options). 2082% compatibility with older versions of the protocol. 2083process_event(success(ID, Solutions, Time, More), 2084 Template, State, Options) :- 2085 process_event(success(ID, Solutions, _Proj, Time, More), 2086 Template, State, Options). 2087 2088 2089pengine_rpc_prompt(ID, Prompt, Term) :- 2090 prompt(ID, Prompt, Term0), 2091 !, 2092 Term = Term0. 2093pengine_rpc_prompt(_ID, Prompt, Term) :- 2094 setup_call_cleanup( 2095 prompt(Old, Prompt), 2096 read(Term), 2097 prompt(_, Old)). 2098 2099pengine_rpc_output(ID, Term) :- 2100 output(ID, Term), 2101 !. 2102pengine_rpc_output(_ID, Term) :- 2103 print(Term).
2110:- multifile prompt/3.
2117:- multifile output/2. 2118 2119 2120/*================= HTTP handlers ======================= 2121*/ 2122 2123% Declare HTTP locations we serve and how. Note that we use 2124% time_limit(inifinite) because pengines have their own timeout. Also 2125% note that we use spawn. This is needed because we can easily get 2126% many clients waiting for some action on a pengine to complete. 2127% Without spawning, we would quickly exhaust the worker pool of the 2128% HTTP server. 2129% 2130% FIXME: probably we should wait for a short time for the pengine on 2131% the default worker thread. Only if that time has expired, we can 2132% call http_spawn/2 to continue waiting on a new thread. That would 2133% improve the performance and reduce the usage of threads. 2134 2135:- http_handler(root(pengine), http_404([]), 2136 [ id(pengines) ]). 2137:- http_handler(root(pengine/create), http_pengine_create, 2138 [ time_limit(infinite), spawn([]) ]). 2139:- http_handler(root(pengine/send), http_pengine_send, 2140 [ time_limit(infinite), spawn([]) ]). 2141:- http_handler(root(pengine/pull_response), http_pengine_pull_response, 2142 [ time_limit(infinite), spawn([]) ]). 2143:- http_handler(root(pengine/abort), http_pengine_abort, []). 2144:- http_handler(root(pengine/detach), http_pengine_detach, []). 2145:- http_handler(root(pengine/list), http_pengine_list, []). 2146:- http_handler(root(pengine/ping), http_pengine_ping, []). 2147:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2148 2149:- http_handler(root(pengine/'pengines.js'), 2150 http_reply_file(library('http/web/js/pengines.js'), []), []). 2151:- http_handler(root(pengine/'plterm.css'), 2152 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
collate | 0 (off) | Join output events |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false
simulates the recursive toplevel. See
pengine_ask/3.
2183http_pengine_create(Request) :- 2184 reply_options(Request, [post]), 2185 !. 2186http_pengine_create(Request) :- 2187 memberchk(content_type(CT), Request), 2188 sub_atom(CT, 0, _, _, 'application/json'), 2189 !, 2190 http_read_json_dict(Request, Dict), 2191 dict_atom_option(format, Dict, Format, prolog), 2192 dict_atom_option(application, Dict, Application, pengine_sandbox), 2193 http_pengine_create(Request, Application, Format, Dict). 2194http_pengine_create(Request) :- 2195 Optional = [optional(true)], 2196 OptString = [string|Optional], 2197 Form = [ format(Format, [default(prolog)]), 2198 application(Application, [default(pengine_sandbox)]), 2199 chunk(_, [nonneg;oneof([false]), default(1)]), 2200 collate(_, [number, default(0)]), 2201 solutions(_, [oneof([all,chunked]), default(chunked)]), 2202 ask(_, OptString), 2203 template(_, OptString), 2204 src_text(_, OptString), 2205 disposition(_, OptString), 2206 src_url(_, Optional) 2207 ], 2208 http_parameters(Request, Form), 2209 form_dict(Form, Dict), 2210 http_pengine_create(Request, Application, Format, Dict). 2211 2212dict_atom_option(Key, Dict, Atom, Default) :- 2213 ( get_dict(Key, Dict, String) 2214 -> atom_string(Atom, String) 2215 ; Atom = Default 2216 ). 2217 2218form_dict(Form, Dict) :- 2219 form_values(Form, Pairs), 2220 dict_pairs(Dict, _, Pairs). 2221 2222form_values([], []). 2223form_values([H|T], Pairs) :- 2224 arg(1, H, Value), 2225 nonvar(Value), 2226 !, 2227 functor(H, Name, _), 2228 Pairs = [Name-Value|PairsT], 2229 form_values(T, PairsT). 2230form_values([_|T], Pairs) :- 2231 form_values(T, Pairs).
2236http_pengine_create(Request, Application, Format, Dict) :- 2237 current_application(Application), 2238 !, 2239 allowed(Request, Application), 2240 authenticate(Request, Application, UserOptions), 2241 dict_to_options(Dict, Application, CreateOptions0), 2242 append(UserOptions, CreateOptions0, CreateOptions), 2243 pengine_uuid(Pengine), 2244 message_queue_create(Queue, [max_size(25)]), 2245 setting(Application:time_limit, TimeLimit), 2246 get_time(Now), 2247 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2248 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2249 create(Queue, Pengine, CreateOptions, http, Application), 2250 create_wait_and_output_result(Pengine, Queue, Format, 2251 TimeLimit, Dict), 2252 gc_abandoned_queues. 2253http_pengine_create(_Request, Application, Format, _Dict) :- 2254 Error = existence_error(pengine_application, Application), 2255 pengine_uuid(ID), 2256 output_result(ID, Format, error(ID, error(Error, _))). 2257 2258 2259dict_to_options(Dict, Application, CreateOptions) :- 2260 dict_pairs(Dict, _, Pairs), 2261 pairs_create_options(Pairs, Application, CreateOptions). 2262 2263pairs_create_options([], _, []) :- !. 2264pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2265 Opt =.. [N,V], 2266 pengine_create_option(Opt), N \== user, 2267 !, 2268 ( create_option_type(Opt, atom) 2269 -> atom_string(V, V0) % term creation must be done if 2270 ; V = V0 % we created the source and know 2271 ), % the operators. 2272 pairs_create_options(T0, App, T). 2273pairs_create_options([_|T0], App, T) :- 2274 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2285wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2286 Collate is min(Collate0, TimeLimit/10),
2287 get_time(Epoch),
2288 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2289 [ timeout(TimeLimit)
2290 ]),
2291 Error, true)
2292 -> ( var(Error)
2293 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2294 ( collating_event(Collate, Event)
2295 -> Deadline is Epoch+TimeLimit,
2296 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2297 Events = [Event|More],
2298 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2299 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2300 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2301 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2302 )
2303 ; output_result(Pengine, Format, died(Pengine))
2304 )
2305 ; time_limit_exceeded(Pengine, Format)
2306 ).
2313collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :- 2314 !. 2315collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :- 2316 debug(pengine(wait), 'Waiting to collate events', []), 2317 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2318 [ timeout(Collate) 2319 ]), 2320 Error, true) 2321 -> ( var(Error) 2322 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]), 2323 Events = [Event|More], 2324 ( collating_event(Collate, Event) 2325 -> Max2 is Max - 1, 2326 collect_events(Pengine, Collate, Queue, Deadline, Max2, More) 2327 ; More = [] 2328 ) 2329 ; Events = [died(Pengine)] 2330 ) 2331 ; get_time(Now), 2332 Now > Deadline 2333 -> time_limit_event(Pengine, TimeLimitEvent), 2334 Events = [TimeLimitEvent] 2335 ; Events = [] 2336 ). 2337 2338collating_event(0, _) :- 2339 !, 2340 fail. 2341collating_event(_, output(_,_)).
disposition
key to denote the
download location.2350create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2351 get_dict(solutions, Dict, all), 2352 !, 2353 between(1, infinite, Page), 2354 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2355 [ timeout(TimeLimit) 2356 ]), 2357 Error, true) 2358 -> ( var(Error) 2359 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2360 ( destroy_queue_from_http(Pengine, Event, Queue) 2361 -> !, 2362 protect_pengine(Pengine, 2363 output_result_2(Format, page(Page, Event), Dict)) 2364 ; is_more_event(Event) 2365 -> pengine_thread(Pengine, Thread), 2366 thread_send_message(Thread, pengine_request(next)), 2367 protect_pengine(Pengine, 2368 output_result_2(Format, page(Page, Event), Dict)), 2369 fail 2370 ; !, 2371 protect_pengine(Pengine, 2372 output_result_2(Format, page(Page, Event), Dict)) 2373 ) 2374 ; !, output_result(Pengine, Format, died(Pengine)) 2375 ) 2376 ; !, time_limit_exceeded(Pengine, Format) 2377 ), 2378 !. 2379create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2380 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)). 2381 2382is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2383is_more_event(create(_, Options)) :- 2384 memberchk(answer(Event), Options), 2385 is_more_event(Event).
2399time_limit_exceeded(Pengine, Format) :- 2400 time_limit_event(Pengine, Event), 2401 call_cleanup( 2402 pengine_destroy(Pengine, [force(true)]), 2403 output_result(Pengine, Format, Event)). 2404 2405time_limit_event(Pengine, 2406 destroy(Pengine, error(Pengine, time_limit_exceeded))). 2407 2408destroy_pengine_after_output(Pengine, Events) :- 2409 is_list(Events), 2410 last(Events, Last), 2411 time_limit_event(Pengine, Last), 2412 !, 2413 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true). 2414destroy_pengine_after_output(_, _).
2429destroy_queue_from_http(ID, _, Queue) :- 2430 output_queue(ID, Queue, _), 2431 !, 2432 destroy_queue_if_empty(Queue). 2433destroy_queue_from_http(ID, Event, Queue) :- 2434 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2435 is_destroy_event(Event), 2436 !, 2437 message_queue_property(Queue, size(Waiting)), 2438 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2439 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2440 2441is_destroy_event(destroy(_)). 2442is_destroy_event(destroy(_,_)). 2443is_destroy_event(create(_, Options)) :- 2444 memberchk(answer(Event), Options), 2445 is_destroy_event(Event). 2446 2447destroy_queue_if_empty(Queue) :- 2448 thread_peek_message(Queue, _), 2449 !. 2450destroy_queue_if_empty(Queue) :- 2451 retractall(output_queue(_, Queue, _)), 2452 message_queue_destroy(Queue).
2460:- dynamic 2461 last_gc/1. 2462 2463gc_abandoned_queues :- 2464 consider_queue_gc, 2465 !, 2466 get_time(Now), 2467 ( output_queue(_, Queue, Time), 2468 Now-Time > 15*60, 2469 retract(output_queue(_, Queue, Time)), 2470 message_queue_destroy(Queue), 2471 fail 2472 ; retractall(last_gc(_)), 2473 asserta(last_gc(Now)) 2474 ). 2475gc_abandoned_queues. 2476 2477consider_queue_gc :- 2478 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2479 N > 100, 2480 ( last_gc(Time), 2481 get_time(Now), 2482 Now-Time > 5*60 2483 -> true 2484 ; \+ last_gc(_) 2485 ).
2503:- dynamic output_queue_destroyed/1. 2504 2505sync_destroy_queue_from_http(ID, Queue) :- 2506 ( output_queue(ID, Queue, _) 2507 -> destroy_queue_if_empty(Queue) 2508 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2509 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2510 [Queue]), 2511 get_time(Now), 2512 asserta(output_queue(ID, Queue, Now)) 2513 ; message_queue_destroy(Queue), 2514 asserta(output_queue_destroyed(Queue)) 2515 ).
pengine
held.2522sync_destroy_queue_from_pengine(ID, Queue) :- 2523 ( retract(output_queue_destroyed(Queue)) 2524 -> true 2525 ; get_time(Now), 2526 asserta(output_queue(ID, Queue, Now)) 2527 ), 2528 retractall(pengine_queue(ID, Queue, _, _)). 2529 2530 2531http_pengine_send(Request) :- 2532 reply_options(Request, [get,post]), 2533 !. 2534http_pengine_send(Request) :- 2535 http_parameters(Request, 2536 [ id(ID, [ type(atom) ]), 2537 event(EventString, [optional(true)]), 2538 collate(Collate, [number, default(0)]), 2539 format(Format, [default(prolog)]) 2540 ]), 2541 catch(read_event(ID, Request, Format, EventString, Event), 2542 Error, 2543 true), 2544 ( var(Error) 2545 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2546 ( pengine_thread(ID, Thread) 2547 -> pengine_queue(ID, Queue, TimeLimit, _), 2548 random_delay, 2549 broadcast(pengine(send(ID, Event))), 2550 thread_send_message(Thread, pengine_request(Event)), 2551 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2552 ; atom(ID) 2553 -> pengine_died(Format, ID) 2554 ; http_404([], Request) 2555 ) 2556 ; Error = error(existence_error(pengine, ID), _) 2557 -> pengine_died(Format, ID) 2558 ; output_result(ID, Format, error(ID, Error)) 2559 ). 2560 2561pengine_died(Format, Pengine) :- 2562 output_result(Pengine, Format, 2563 error(Pengine, error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2574read_event(Pengine, Request, Format, EventString, Event) :- 2575 protect_pengine( 2576 Pengine, 2577 ( get_pengine_module(Pengine, Module), 2578 read_event_2(Request, EventString, Module, Event0, Bindings) 2579 )), 2580 !, 2581 fix_bindings(Format, Event0, Bindings, Event). 2582read_event(Pengine, Request, _Format, _EventString, _Event) :- 2583 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2584 discard_post_data(Request), 2585 existence_error(pengine, Pengine).
event
parameter or as a posted document.2593read_event_2(_Request, EventString, Module, Event, Bindings) :- 2594 nonvar(EventString), 2595 !, 2596 term_string(Event, EventString, 2597 [ variable_names(Bindings), 2598 module(Module) 2599 ]). 2600read_event_2(Request, _EventString, Module, Event, Bindings) :- 2601 option(method(post), Request), 2602 http_read_data(Request, Event, 2603 [ content_type('application/x-prolog'), 2604 module(Module), 2605 variable_names(Bindings) 2606 ]).
2612discard_post_data(Request) :- 2613 option(method(post), Request), 2614 !, 2615 setup_call_cleanup( 2616 open_null_stream(NULL), 2617 http_read_data(Request, _, [to(stream(NULL))]), 2618 close(NULL)). 2619discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2627fix_bindings(Format, 2628 ask(Goal, Options0), Bindings, 2629 ask(Goal, NewOptions)) :- 2630 json_lang(Format), 2631 !, 2632 exclude(anon, Bindings, NamedBindings), 2633 template(NamedBindings, Template, Options0, Options1), 2634 select_option(chunk(Paging), Options1, Options2, 1), 2635 NewOptions = [ template(Template), 2636 chunk(Paging), 2637 bindings(NamedBindings) 2638 | Options2 2639 ]. 2640fix_bindings(_, Command, _, Command). 2641 2642template(_, Template, Options0, Options) :- 2643 select_option(template(Template), Options0, Options), 2644 !. 2645template(Bindings, Template, Options, Options) :- 2646 dict_create(Template, swish_default_template, Bindings). 2647 2648anon(Name=_) :- 2649 sub_atom(Name, 0, _, _, '_'), 2650 sub_atom(Name, 1, 1, _, Next), 2651 char_type(Next, prolog_var_start). 2652 2653var_name(Name=_, Name).
2660json_lang(json) :- !. 2661json_lang(Format) :- 2662 sub_atom(Format, 0, _, _, 'json-').
2669http_pengine_pull_response(Request) :- 2670 reply_options(Request, [get]), 2671 !. 2672http_pengine_pull_response(Request) :- 2673 http_parameters(Request, 2674 [ id(ID, []), 2675 format(Format, [default(prolog)]), 2676 collate(Collate, [number, default(0)]) 2677 ]), 2678 reattach(ID), 2679 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2680 -> true 2681 ; output_queue(ID, Queue, _), 2682 TimeLimit = 0 2683 ) 2684 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2685 ; http_404([], Request) 2686 ).
2695http_pengine_abort(Request) :- 2696 reply_options(Request, [get,post]), 2697 !. 2698http_pengine_abort(Request) :- 2699 http_parameters(Request, 2700 [ id(ID, []) 2701 ]), 2702 ( pengine_thread(ID, _Thread) 2703 -> broadcast(pengine(abort(ID))), 2704 abort_pending_output(ID), 2705 pengine_abort(ID), 2706 reply_json_dict(true) 2707 ; http_404([], Request) 2708 ).
2720http_pengine_detach(Request) :- 2721 reply_options(Request, [post]), 2722 !. 2723http_pengine_detach(Request) :- 2724 http_parameters(Request, 2725 [ id(ID, []) 2726 ]), 2727 http_read_json_dict(Request, ClientData), 2728 ( pengine_property(ID, application(Application)), 2729 allowed(Request, Application), 2730 authenticate(Request, Application, _UserOptions) 2731 -> broadcast(pengine(detach(ID))), 2732 get_time(Now), 2733 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2734 pengine_queue(ID, Queue, _TimeLimit, _Now), 2735 message_queue_set(Queue, max_size(1000)), 2736 pengine_reply(Queue, detached(ID)), 2737 reply_json_dict(true) 2738 ; http_404([], Request) 2739 ). 2740 2741reattach(ID) :- 2742 ( retract(pengine_detached(ID, _Data)), 2743 pengine_queue(ID, Queue, _TimeLimit, _Now) 2744 -> message_queue_set(Queue, max_size(25)) 2745 ; true 2746 ).
2754http_pengine_destroy_all(Request) :- 2755 reply_options(Request, [get,post]), 2756 !. 2757http_pengine_destroy_all(Request) :- 2758 http_parameters(Request, 2759 [ ids(IDsAtom, []) 2760 ]), 2761 atomic_list_concat(IDs, ',', IDsAtom), 2762 forall(( member(ID, IDs), 2763 \+ pengine_detached(ID, _) 2764 ), 2765 pengine_destroy(ID, [force(true)])), 2766 reply_json_dict("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2774http_pengine_ping(Request) :- 2775 reply_options(Request, [get]), 2776 !. 2777http_pengine_ping(Request) :- 2778 http_parameters(Request, 2779 [ id(Pengine, []), 2780 format(Format, [default(prolog)]) 2781 ]), 2782 ( pengine_thread(Pengine, Thread), 2783 Error = error(_,_), 2784 catch(thread_statistics(Thread, Stats), Error, fail) 2785 -> output_result(Pengine, Format, ping(Pengine, Stats)) 2786 ; output_result(Pengine, Format, died(Pengine)) 2787 ).
2796http_pengine_list(Request) :- 2797 reply_options(Request, [get]), 2798 !. 2799http_pengine_list(Request) :- 2800 http_parameters(Request, 2801 [ status(Status, [default(detached), oneof([detached])]), 2802 application(Application, [default(pengine_sandbox)]) 2803 ]), 2804 allowed(Request, Application), 2805 authenticate(Request, Application, _UserOptions), 2806 findall(Term, listed_pengine(Application, Status, Term), Terms), 2807 reply_json_dict(json{pengines: Terms}). 2808 2809listed_pengine(Application, detached, State) :- 2810 State = pengine{id:Id, 2811 detached:Time, 2812 queued:Queued, 2813 stats:Stats}, 2814 2815 pengine_property(Id, application(Application)), 2816 pengine_property(Id, detached(Time)), 2817 pengine_queue(Id, Queue, _TimeLimit, _Now), 2818 message_queue_property(Queue, size(Queued)), 2819 ( pengine_thread(Id, Thread), 2820 catch(thread_statistics(Thread, Stats), _, fail) 2821 -> true 2822 ; Stats = thread{status:died} 2823 ).
prolog
, json
or json-s
.
2834:- dynamic 2835 pengine_replying/2. % +Pengine, +Thread 2836 2837output_result(Pengine, Format, Event) :- 2838 thread_self(Thread), 2839 cors_enable, % contingent on http:cors setting 2840 disable_client_cache, 2841 setup_call_cleanup( 2842 asserta(pengine_replying(Pengine, Thread), Ref), 2843 catch(output_result_2(Format, Event, _{}), 2844 pengine_abort_output, 2845 true), 2846 erase(Ref)), 2847 destroy_pengine_after_output(Pengine, Event). 2848 2849output_result_2(Lang, Event, Dict) :- 2850 write_result(Lang, Event, Dict), 2851 !. 2852output_result_2(prolog, Event, _) :- 2853 !, 2854 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2855 write_term(Event, 2856 [ quoted(true), 2857 ignore_ops(true), 2858 fullstop(true), 2859 blobs(portray), 2860 portray_goal(portray_blob), 2861 nl(true) 2862 ]). 2863output_result_2(Lang, Event, _) :- 2864 json_lang(Lang), 2865 !, 2866 ( event_term_to_json_data(Event, JSON, Lang) 2867 -> reply_json_dict(JSON) 2868 ; assertion(event_term_to_json_data(Event, _, Lang)) 2869 ). 2870output_result_2(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2871 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2881:- public portray_blob/2. % called from write-term 2882portray_blob(Blob, _Options) :- 2883 blob(Blob, Type), 2884 writeq('$BLOB'(Type)).
2891abort_pending_output(Pengine) :- 2892 forall(pengine_replying(Pengine, Thread), 2893 abort_output_thread(Thread)). 2894 2895abort_output_thread(Thread) :- 2896 catch(thread_signal(Thread, throw(pengine_abort_output)), 2897 error(existence_error(thread, _), _), 2898 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2914disable_client_cache :- 2915 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2916 Pragma: no-cache\r\n\c 2917 Expires: 0\r\n'). 2918 2919event_term_to_json_data(Events, JSON, Lang) :- 2920 is_list(Events), 2921 !, 2922 events_to_json_data(Events, JSON, Lang). 2923event_term_to_json_data(Event, JSON, Lang) :- 2924 event_to_json(Event, JSON, Lang), 2925 !. 2926event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2927 json{event:success, id:ID, time:Time, 2928 data:Bindings, more:More, projection:Projection}, 2929 json) :- 2930 !, 2931 term_to_json(Bindings0, Bindings). 2932event_term_to_json_data(destroy(ID, Event), 2933 json{event:destroy, id:ID, data:JSON}, 2934 Style) :- 2935 !, 2936 event_term_to_json_data(Event, JSON, Style). 2937event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2938 !, 2939 ( select(answer(First0), Features0, Features1) 2940 -> event_term_to_json_data(First0, First, Style), 2941 Features = [answer(First)|Features1] 2942 ; Features = Features0 2943 ), 2944 dict_create(JSON, json, [event(create), id(ID)|Features]). 2945event_term_to_json_data(destroy(ID, Event), 2946 json{event:destroy, id:ID, data:JSON}, Style) :- 2947 !, 2948 event_term_to_json_data(Event, JSON, Style). 2949event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2950 !, 2951 Error0 = json{event:error, id:ID, data:Message}, 2952 add_error_details(ErrorTerm, Error0, Error), 2953 message_to_string(ErrorTerm, Message). 2954event_term_to_json_data(failure(ID, Time), 2955 json{event:failure, id:ID, time:Time}, _) :- 2956 !. 2957event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2958 functor(EventTerm, F, 1), 2959 !, 2960 arg(1, EventTerm, ID). 2961event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2962 functor(EventTerm, F, 2), 2963 arg(1, EventTerm, ID), 2964 arg(2, EventTerm, Data), 2965 term_to_json(Data, JSON). 2966 2967events_to_json_data([], [], _). 2968events_to_json_data([E|T0], [J|T], Lang) :- 2969 event_term_to_json_data(E, J, Lang), 2970 events_to_json_data(T0, T, Lang). 2971 2972:- public add_error_details/3.
pengines_io.pl
.
2979add_error_details(Error, JSON0, JSON) :-
2980 add_error_code(Error, JSON0, JSON1),
2981 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
2994add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 2995 atom(Type), 2996 !, 2997 to_atomic(Obj, Value), 2998 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 2999add_error_code(error(Formal, _), Error0, Error) :- 3000 callable(Formal), 3001 !, 3002 functor(Formal, Code, _), 3003 Error = Error0.put(code, Code). 3004add_error_code(_, Error, Error). 3005 3006% What to do with large integers? 3007to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 3008to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 3009to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 3010to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.3019add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 3020 atom(Path), integer(Line), 3021 !, 3022 Term = Term0.put(_{location:_{file:Path, line:Line}}). 3023add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 3024 atom(Path), integer(Line), integer(Ch), 3025 !, 3026 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 3027add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.3038%:- multifile pengines:event_to_json/3. 3039 3040 3041 /******************************* 3042 * ACCESS CONTROL * 3043 *******************************/
forbidden
header if contact is not allowed.3050allowed(Request, Application) :- 3051 setting(Application:allow_from, Allow), 3052 match_peer(Request, Allow), 3053 setting(Application:deny_from, Deny), 3054 \+ match_peer(Request, Deny), 3055 !. 3056allowed(Request, _Application) :- 3057 memberchk(request_uri(Here), Request), 3058 throw(http_reply(forbidden(Here))). 3059 3060match_peer(_, Allowed) :- 3061 memberchk(*, Allowed), 3062 !. 3063match_peer(_, []) :- !, fail. 3064match_peer(Request, Allowed) :- 3065 http_peer(Request, Peer), 3066 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3067 ( memberchk(Peer, Allowed) 3068 -> true 3069 ; member(Pattern, Allowed), 3070 match_peer_pattern(Pattern, Peer) 3071 ). 3072 3073match_peer_pattern(Pattern, Peer) :- 3074 ip_term(Pattern, IP), 3075 ip_term(Peer, IP), 3076 !. 3077 3078ip_term(Peer, Pattern) :- 3079 split_string(Peer, ".", "", PartStrings), 3080 ip_pattern(PartStrings, Pattern). 3081 3082ip_pattern([], []). 3083ip_pattern([*], _) :- !. 3084ip_pattern([S|T0], [N|T]) :- 3085 number_string(N, S), 3086 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3094authenticate(Request, Application, UserOptions) :- 3095 authentication_hook(Request, Application, User), 3096 !, 3097 must_be(ground, User), 3098 UserOptions = [user(User)]. 3099authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3121pengine_register_user(Options) :- 3122 option(user(User), Options), 3123 !, 3124 pengine_self(Me), 3125 asserta(pengine_user(Me, User)). 3126pengine_register_user(_).
3137pengine_user(User) :-
3138 pengine_self(Me),
3139 pengine_user(Me, User).
3145reply_options(Request, Allowed) :- 3146 option(method(options), Request), 3147 !, 3148 cors_enable(Request, 3149 [ methods(Allowed) 3150 ]), 3151 format('Content-type: text/plain\r\n'), 3152 format('~n'). % empty body 3153 3154 3155 /******************************* 3156 * COMPILE SOURCE * 3157 *******************************/
3166pengine_src_text(Src, Module) :- 3167 pengine_self(Self), 3168 format(atom(ID), 'pengine://~w/src', [Self]), 3169 extra_load_options(Self, Options), 3170 setup_call_cleanup( 3171 open_chars_stream(Src, Stream), 3172 load_files(Module:ID, 3173 [ stream(Stream), 3174 module(Module), 3175 silent(true) 3176 | Options 3177 ]), 3178 close(Stream)), 3179 keep_source(Self, ID, Src). 3180 3181system'#file'(File, _Line) :- 3182 prolog_load_context(stream, Stream), 3183 set_stream(Stream, file_name(File)), 3184 set_stream(Stream, record_position(false)), 3185 set_stream(Stream, record_position(true)).
3195pengine_src_url(URL, Module) :- 3196 pengine_self(Self), 3197 uri_encoded(path, URL, Path), 3198 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3199 extra_load_options(Self, Options), 3200 ( get_pengine_application(Self, Application), 3201 setting(Application:debug_info, false) 3202 -> setup_call_cleanup( 3203 http_open(URL, Stream, []), 3204 ( set_stream(Stream, encoding(utf8)), 3205 load_files(Module:ID, 3206 [ stream(Stream), 3207 module(Module) 3208 | Options 3209 ]) 3210 ), 3211 close(Stream)) 3212 ; setup_call_cleanup( 3213 http_open(URL, TempStream, []), 3214 ( set_stream(TempStream, encoding(utf8)), 3215 read_string(TempStream, _, Src) 3216 ), 3217 close(TempStream)), 3218 setup_call_cleanup( 3219 open_chars_stream(Src, Stream), 3220 load_files(Module:ID, 3221 [ stream(Stream), 3222 module(Module) 3223 | Options 3224 ]), 3225 close(Stream)), 3226 keep_source(Self, ID, Src) 3227 ). 3228 3229 3230extra_load_options(Pengine, Options) :- 3231 pengine_not_sandboxed(Pengine), 3232 !, 3233 Options = []. 3234extra_load_options(_, [sandboxed(true)]). 3235 3236 3237keep_source(Pengine, ID, SrcText) :- 3238 get_pengine_application(Pengine, Application), 3239 setting(Application:debug_info, true), 3240 !, 3241 to_string(SrcText, SrcString), 3242 assertz(pengine_data(Pengine, source(ID, SrcString))). 3243keep_source(_, _, _). 3244 3245to_string(String, String) :- 3246 string(String), 3247 !. 3248to_string(Atom, String) :- 3249 atom_string(Atom, String), 3250 !. 3251 3252 /******************************* 3253 * SANDBOX * 3254 *******************************/ 3255 3256:- multifile 3257 sandbox:safe_primitive/1. 3258 3259sandbox:safe_primitive(pengines:pengine_input(_, _)). 3260sandbox:safe_primitive(pengines:pengine_output(_)). 3261sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3262 3263 3264 /******************************* 3265 * MESSAGES * 3266 *******************************/ 3267 3268prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3269 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3270 'This is normally caused by an insufficiently instantiated'-[], nl, 3271 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3272 'find all possible instantations of Var.'-[] 3273 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.