1:- encoding(utf8).
37
38:- module(pengines,
39 [ pengine_create/1, 40 pengine_ask/3, 41 pengine_next/2, 42 pengine_stop/2, 43 pengine_event/2, 44 pengine_input/2, 45 pengine_output/1, 46 pengine_respond/3, 47 pengine_debug/2, 48 pengine_self/1, 49 pengine_pull_response/2, 50 pengine_destroy/1, 51 pengine_destroy/2, 52 pengine_abort/1, 53 pengine_application/1, 54 current_pengine_application/1, 55 pengine_property/2, 56 pengine_user/1, 57 pengine_event_loop/2, 58 pengine_rpc/2, 59 pengine_rpc/3 60 ]). 61
70
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error),
77 [ must_be/2,
78 existence_error/2,
79 permission_error/3,
80 domain_error/2
81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option),
88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool),
94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri),
97 [ uri_components/2,
98 uri_query_components/2,
99 uri_data/3,
100 uri_data/4,
101 uri_encoded/3
102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch),
106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111
112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json),
114 [http_read_json_dict/2,reply_json_dict/1]). 115
116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119
120
121:- meta_predicate
122 pengine_create(:),
123 pengine_rpc(+, +, :),
124 pengine_event_loop(1, +). 125
126:- multifile
127 write_result/3, 128 event_to_json/3, 129 prepare_module/3, 130 prepare_goal/3, 131 authentication_hook/3, 132 not_sandboxed/2, 133 pengine_flush_output_hook/0. 134
135:- predicate_options(pengine_create/1, 1,
136 [ id(-atom),
137 alias(atom),
138 application(atom),
139 destroy(boolean),
140 server(atom),
141 ask(compound),
142 template(compound),
143 chunk(integer;oneof([false])),
144 bindings(list),
145 src_list(list),
146 src_text(any), 147 src_url(atom),
148 src_predicates(list)
149 ]). 150:- predicate_options(pengine_ask/3, 3,
151 [ template(any),
152 chunk(integer;oneof([false])),
153 bindings(list)
154 ]). 155:- predicate_options(pengine_next/2, 2,
156 [ chunk(integer),
157 pass_to(pengine_send/3, 3)
158 ]). 159:- predicate_options(pengine_stop/2, 2,
160 [ pass_to(pengine_send/3, 3)
161 ]). 162:- predicate_options(pengine_respond/3, 2,
163 [ pass_to(pengine_send/3, 3)
164 ]). 165:- predicate_options(pengine_rpc/3, 3,
166 [ chunk(integer;oneof([false])),
167 pass_to(pengine_create/1, 1)
168 ]). 169:- predicate_options(pengine_send/3, 3,
170 [ delay(number)
171 ]). 172:- predicate_options(pengine_event/2, 2,
173 [ listen(atom),
174 pass_to(system:thread_get_message/3, 3)
175 ]). 176:- predicate_options(pengine_pull_response/2, 2,
177 [ pass_to(http_open/3, 3)
178 ]). 179:- predicate_options(pengine_event_loop/2, 2,
180 []). 181
183:- debug(pengine(debug)). 184
185goal_expansion(random_delay, Expanded) :-
186 ( debugging(pengine(delay))
187 -> Expanded = do_random_delay
188 ; Expanded = true
189 ).
190
191do_random_delay :-
192 Delay is random(20)/1000,
193 sleep(Delay).
194
195:- meta_predicate 196 solve(+, ?, 0, +),
197 findnsols_no_empty(+, ?, 0, -),
198 pengine_event_loop(+, 1, +). 199
251
252
253pengine_create(M:Options0) :-
254 translate_local_sources(Options0, Options, M),
255 ( select_option(server(BaseURL), Options, RestOptions)
256 -> remote_pengine_create(BaseURL, RestOptions)
257 ; local_pengine_create(Options)
258 ).
259
271
272translate_local_sources(OptionsIn, Options, Module) :-
273 translate_local_sources(OptionsIn, Sources, Options2, Module),
274 ( Sources == []
275 -> Options = Options2
276 ; Sources = [Source]
277 -> Options = [src_text(Source)|Options2]
278 ; atomics_to_string(Sources, Source)
279 -> Options = [src_text(Source)|Options2]
280 ).
281
282translate_local_sources([], [], [], _).
283translate_local_sources([H0|T], [S0|S], Options, M) :-
284 nonvar(H0),
285 translate_local_source(H0, S0, M),
286 !,
287 translate_local_sources(T, S, Options, M).
288translate_local_sources([H|T0], S, [H|T], M) :-
289 translate_local_sources(T0, S, T, M).
290
291translate_local_source(src_predicates(PIs), Source, M) :-
292 must_be(list, PIs),
293 with_output_to(string(Source),
294 maplist(list_in_module(M), PIs)).
295translate_local_source(src_list(Terms), Source, _) :-
296 must_be(list, Terms),
297 with_output_to(string(Source),
298 forall(member(Term, Terms),
299 format('~k .~n', [Term]))).
300translate_local_source(src_text(Source), Source, _).
301
302list_in_module(M, PI) :-
303 listing(M:PI).
304
309
310pengine_send(Target, Event) :-
311 pengine_send(Target, Event, []).
312
313
325
326pengine_send(Target, Event, Options) :-
327 must_be(atom, Target),
328 pengine_send2(Target, Event, Options).
329
330pengine_send2(self, Event, Options) :-
331 !,
332 thread_self(Queue),
333 delay_message(queue(Queue), Event, Options).
334pengine_send2(Name, Event, Options) :-
335 child(Name, Target),
336 !,
337 delay_message(pengine(Target), Event, Options).
338pengine_send2(Target, Event, Options) :-
339 delay_message(pengine(Target), Event, Options).
340
341delay_message(Target, Event, Options) :-
342 option(delay(Delay), Options),
343 !,
344 alarm(Delay,
345 send_message(Target, Event, Options),
346 _AlarmID,
347 [remove(true)]).
348delay_message(Target, Event, Options) :-
349 random_delay,
350 send_message(Target, Event, Options).
351
352send_message(queue(Queue), Event, _) :-
353 thread_send_message(Queue, pengine_request(Event)).
354send_message(pengine(Pengine), Event, Options) :-
355 ( pengine_remote(Pengine, Server)
356 -> remote_pengine_send(Server, Pengine, Event, Options)
357 ; pengine_thread(Pengine, Thread)
358 -> thread_send_message(Thread, pengine_request(Event))
359 ; existence_error(pengine, Pengine)
360 ).
361
369
370pengine_request(Request) :-
371 thread_self(Me),
372 thread_get_message(Me, pengine_request(Request), [timeout(1)]),
373 !.
374pengine_request(Request) :-
375 pengine_self(Self),
376 get_pengine_application(Self, Application),
377 setting(Application:idle_limit, IdleLimit0),
378 IdleLimit is IdleLimit0-1,
379 thread_self(Me),
380 ( thread_idle(thread_get_message(Me, pengine_request(Request),
381 [timeout(IdleLimit)]),
382 long)
383 -> true
384 ; Request = destroy
385 ).
386
387
397
398pengine_reply(Event) :-
399 pengine_parent(Queue),
400 pengine_reply(Queue, Event).
401
402pengine_reply(_Queue, _Event0) :-
403 nb_current(pengine_idle_limit_exceeded, true),
404 !.
405pengine_reply(Queue, Event0) :-
406 arg(1, Event0, ID),
407 wrap_first_answer(ID, Event0, Event),
408 random_delay,
409 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
410 ( pengine_self(ID),
411 \+ pengine_detached(ID, _)
412 -> get_pengine_application(ID, Application),
413 setting(Application:idle_limit, IdleLimit),
414 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
415 ( thread_send_message(Queue, pengine_event(ID, Event),
416 [ timeout(IdleLimit)
417 ])
418 -> true
419 ; thread_self(Me),
420 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
421 [ID, Me]),
422 nb_setval(pengine_idle_limit_exceeded, true),
423 thread_detach(Me),
424 abort
425 )
426 ; thread_send_message(Queue, pengine_event(ID, Event))
427 ).
428
429wrap_first_answer(ID, Event0, CreateEvent) :-
430 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
431 arg(1, CreateEvent, ID),
432 !,
433 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
434wrap_first_answer(_ID, Event, Event).
435
436
437empty_queue :-
438 pengine_parent(Queue),
439 empty_queue(Queue, 0, Discarded),
440 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
441
442empty_queue(Queue, C0, C) :-
443 thread_get_message(Queue, _Term, [timeout(0)]),
444 !,
445 C1 is C0+1,
446 empty_queue(Queue, C1, C).
447empty_queue(_, C, C).
448
449
515
516pengine_ask(ID, Query, Options) :-
517 partition(pengine_ask_option, Options, AskOptions, SendOptions),
518 pengine_send(ID, ask(Query, AskOptions), SendOptions).
519
520
521pengine_ask_option(template(_)).
522pengine_ask_option(chunk(_)).
523pengine_ask_option(bindings(_)).
524pengine_ask_option(breakpoints(_)).
525
526
568
569pengine_next(ID, Options) :-
570 select_option(chunk(Count), Options, Options1),
571 !,
572 pengine_send(ID, next(Count), Options1).
573pengine_next(ID, Options) :-
574 pengine_send(ID, next, Options).
575
576
589
590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
591
592
600
601pengine_abort(Name) :-
602 ( child(Name, Pengine)
603 -> true
604 ; Pengine = Name
605 ),
606 ( pengine_remote(Pengine, Server)
607 -> remote_pengine_abort(Server, Pengine, [])
608 ; pengine_thread(Pengine, Thread),
609 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
610 catch(thread_signal(Thread, throw(abort_query)), _, true)
611 ).
612
613
620
621pengine_destroy(ID) :-
622 pengine_destroy(ID, []).
623
624pengine_destroy(Name, Options) :-
625 ( child(Name, ID)
626 -> true
627 ; ID = Name
628 ),
629 option(force(true), Options),
630 !,
631 ( pengine_thread(ID, Thread)
632 -> catch(thread_signal(Thread, abort),
633 error(existence_error(thread, _), _), true)
634 ; true
635 ).
636pengine_destroy(ID, Options) :-
637 catch(pengine_send(ID, destroy, Options),
638 error(existence_error(pengine, ID), _),
639 retractall(child(_,ID))).
640
641
644
653
654:- dynamic
655 current_pengine/6, 656 pengine_queue/4, 657 output_queue/3, 658 pengine_user/2, 659 pengine_data/2, 660 pengine_detached/2. 661:- volatile
662 current_pengine/6,
663 pengine_queue/4,
664 output_queue/3,
665 pengine_user/2,
666 pengine_data/2,
667 pengine_detached/2. 668
669:- thread_local
670 child/2. 671
675
676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
677 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
678
679pengine_register_remote(Id, URL, Application, Destroy) :-
680 thread_self(Queue),
681 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
682
688
689pengine_unregister(Id) :-
690 thread_self(Me),
691 ( current_pengine(Id, Queue, Me, http, _, _)
692 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
693 ; true
694 ),
695 retractall(current_pengine(Id, _, Me, _, _, _)),
696 retractall(pengine_user(Id, _)),
697 retractall(pengine_data(Id, _)).
698
699pengine_unregister_remote(Id) :-
700 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
701
705
706pengine_self(Id) :-
707 thread_self(Thread),
708 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
709
710pengine_parent(Parent) :-
711 nb_getval(pengine_parent, Parent).
712
713pengine_thread(Pengine, Thread) :-
714 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
715 Thread \== 0,
716 !.
717
718pengine_remote(Pengine, URL) :-
719 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
720
721get_pengine_application(Pengine, Application) :-
722 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
723 !.
724
725get_pengine_module(Pengine, Pengine).
726
727:- if(current_predicate(uuid/2)). 728pengine_uuid(Id) :-
729 uuid(Id, [version(4)]). 730:- else. 731pengine_uuid(Id) :-
732 ( current_prolog_flag(max_integer, Max1)
733 -> Max is Max1-1
734 ; Max is 1<<128
735 ),
736 random_between(0, Max, Num),
737 atom_number(Id, Num).
738:- endif. 739
754
755:- meta_predicate protect_pengine(+, 0). 756
757protect_pengine(Id, Goal) :-
758 term_hash(Id, Hash),
759 LockN is Hash mod 64,
760 atom_concat(pengine_done_, LockN, Lock),
761 with_mutex(Lock,
762 ( pengine_thread(Id, _)
763 -> Goal
764 ; Goal
765 )).
766
767
781
782pengine_application(Application) :-
783 throw(error(context_error(nodirective,
784 pengine_application(Application)), _)).
785
786:- multifile
787 system:term_expansion/2,
788 current_application/1. 789
795
796current_pengine_application(Application) :-
797 current_application(Application).
798
799
801
802:- setting(thread_pool_size, integer, 100,
803 'Maximum number of pengines this application can run.'). 804:- setting(thread_pool_stacks, list(compound), [],
805 'Maximum stack sizes for pengines this application can run.'). 806:- setting(slave_limit, integer, 3,
807 'Maximum number of slave pengines a master pengine can create.'). 808:- setting(time_limit, number, 300,
809 'Maximum time to wait for output'). 810:- setting(idle_limit, number, 300,
811 'Pengine auto-destroys when idle for this time'). 812:- setting(safe_goal_limit, number, 10,
813 'Maximum time to try proving safety of the goal'). 814:- setting(program_space, integer, 100_000_000,
815 'Maximum memory used by predicates'). 816:- setting(allow_from, list(atom), [*],
817 'IP addresses from which remotes are allowed to connect'). 818:- setting(deny_from, list(atom), [],
819 'IP addresses from which remotes are NOT allowed to connect'). 820:- setting(debug_info, boolean, false,
821 'Keep information to support source-level debugging'). 822
823
824system:term_expansion((:- pengine_application(Application)), Expanded) :-
825 must_be(atom, Application),
826 ( module_property(Application, file(_))
827 -> permission_error(create, pengine_application, Application)
828 ; true
829 ),
830 expand_term((:- setting(Application:thread_pool_size, integer,
831 setting(pengines:thread_pool_size),
832 'Maximum number of pengines this \c
833 application can run.')),
834 ThreadPoolSizeSetting),
835 expand_term((:- setting(Application:thread_pool_stacks, list(compound),
836 setting(pengines:thread_pool_stacks),
837 'Maximum stack sizes for pengines \c
838 this application can run.')),
839 ThreadPoolStacksSetting),
840 expand_term((:- setting(Application:slave_limit, integer,
841 setting(pengines:slave_limit),
842 'Maximum number of local slave pengines \c
843 a master pengine can create.')),
844 SlaveLimitSetting),
845 expand_term((:- setting(Application:time_limit, number,
846 setting(pengines:time_limit),
847 'Maximum time to wait for output')),
848 TimeLimitSetting),
849 expand_term((:- setting(Application:idle_limit, number,
850 setting(pengines:idle_limit),
851 'Pengine auto-destroys when idle for this time')),
852 IdleLimitSetting),
853 expand_term((:- setting(Application:safe_goal_limit, number,
854 setting(pengines:safe_goal_limit),
855 'Maximum time to try proving safety of the goal')),
856 SafeGoalLimitSetting),
857 expand_term((:- setting(Application:program_space, integer,
858 setting(pengines:program_space),
859 'Maximum memory used by predicates')),
860 ProgramSpaceSetting),
861 expand_term((:- setting(Application:allow_from, list(atom),
862 setting(pengines:allow_from),
863 'IP addresses from which remotes are allowed \c
864 to connect')),
865 AllowFromSetting),
866 expand_term((:- setting(Application:deny_from, list(atom),
867 setting(pengines:deny_from),
868 'IP addresses from which remotes are NOT \c
869 allowed to connect')),
870 DenyFromSetting),
871 expand_term((:- setting(Application:debug_info, boolean,
872 setting(pengines:debug_info),
873 'Keep information to support source-level \c
874 debugging')),
875 DebugInfoSetting),
876 flatten([ pengines:current_application(Application),
877 ThreadPoolSizeSetting,
878 ThreadPoolStacksSetting,
879 SlaveLimitSetting,
880 TimeLimitSetting,
881 IdleLimitSetting,
882 SafeGoalLimitSetting,
883 ProgramSpaceSetting,
884 AllowFromSetting,
885 DenyFromSetting,
886 DebugInfoSetting
887 ], Expanded).
888
890
891:- pengine_application(pengine_sandbox). 892
893
926
927
928pengine_property(Id, Prop) :-
929 nonvar(Id), nonvar(Prop),
930 pengine_property2(Prop, Id),
931 !.
932pengine_property(Id, Prop) :-
933 pengine_property2(Prop, Id).
934
935pengine_property2(self(Id), Id) :-
936 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
937pengine_property2(module(Id), Id) :-
938 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
939pengine_property2(alias(Alias), Id) :-
940 child(Alias, Id),
941 Alias \== Id.
942pengine_property2(thread(Thread), Id) :-
943 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
944 Thread \== 0.
945pengine_property2(remote(Server), Id) :-
946 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
947pengine_property2(application(Application), Id) :-
948 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
949pengine_property2(destroy(Destroy), Id) :-
950 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
951pengine_property2(parent(Parent), Id) :-
952 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
953pengine_property2(source(SourceID, Source), Id) :-
954 pengine_data(Id, source(SourceID, Source)).
955pengine_property2(detached(When), Id) :-
956 pengine_detached(Id, When).
957
962
963pengine_output(Term) :-
964 pengine_self(Me),
965 pengine_reply(output(Me, Term)).
966
967
979
980pengine_debug(Format, Args) :-
981 pengine_parent(Queue),
982 pengine_self(Self),
983 catch(safe_goal(format(atom(_), Format, Args)), E, true),
984 ( var(E)
985 -> format(atom(Message), Format, Args)
986 ; message_to_string(E, Message)
987 ),
988 pengine_reply(Queue, debug(Self, Message)).
989
990
993
1002
1003local_pengine_create(Options) :-
1004 thread_self(Self),
1005 option(application(Application), Options, pengine_sandbox),
1006 create(Self, Child, Options, local, Application),
1007 option(alias(Name), Options, Child),
1008 assert(child(Name, Child)).
1009
1010
1014
1015:- multifile thread_pool:create_pool/1. 1016
1017thread_pool:create_pool(Application) :-
1018 current_application(Application),
1019 setting(Application:thread_pool_size, Size),
1020 setting(Application:thread_pool_stacks, Stacks),
1021 thread_pool_create(Application, Size, Stacks).
1022
1030
1031create(Queue, Child, Options, local, Application) :-
1032 !,
1033 pengine_child_id(Child),
1034 create0(Queue, Child, Options, local, Application).
1035create(Queue, Child, Options, URL, Application) :-
1036 pengine_child_id(Child),
1037 catch(create0(Queue, Child, Options, URL, Application),
1038 Error,
1039 create_error(Queue, Child, Error)).
1040
1041pengine_child_id(Child) :-
1042 ( nonvar(Child)
1043 -> true
1044 ; pengine_uuid(Child)
1045 ).
1046
1047create_error(Queue, Child, Error) :-
1048 pengine_reply(Queue, error(Child, Error)).
1049
1050create0(Queue, Child, Options, URL, Application) :-
1051 ( current_application(Application)
1052 -> true
1053 ; existence_error(pengine_application, Application)
1054 ),
1055 ( URL \== http 1056 1057 -> aggregate_all(count, child(_,_), Count),
1058 setting(Application:slave_limit, Max),
1059 ( Count >= Max
1060 -> throw(error(resource_error(max_pengines), _))
1061 ; true
1062 )
1063 ; true
1064 ),
1065 partition(pengine_create_option, Options, PengineOptions, RestOptions),
1066 thread_create_in_pool(
1067 Application,
1068 pengine_main(Queue, PengineOptions, Application), ChildThread,
1069 [ at_exit(pengine_done)
1070 | RestOptions
1071 ]),
1072 option(destroy(Destroy), PengineOptions, true),
1073 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
1074 thread_send_message(ChildThread, pengine_registered(Child)),
1075 ( option(id(Id), Options)
1076 -> Id = Child
1077 ; true
1078 ).
1079
1080pengine_create_option(src_text(_)).
1081pengine_create_option(src_url(_)).
1082pengine_create_option(application(_)).
1083pengine_create_option(destroy(_)).
1084pengine_create_option(ask(_)).
1085pengine_create_option(template(_)).
1086pengine_create_option(bindings(_)).
1087pengine_create_option(chunk(_)).
1088pengine_create_option(alias(_)).
1089pengine_create_option(user(_)).
1090
1091
1097
1098:- public
1099 pengine_done/0. 1100
1101pengine_done :-
1102 thread_self(Me),
1103 ( thread_property(Me, status(exception(Ex))),
1104 abort_exception(Ex),
1105 thread_detach(Me),
1106 pengine_self(Pengine)
1107 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))),
1108 error(_,_), true)
1109 ; true
1110 ),
1111 forall(child(_Name, Child),
1112 pengine_destroy(Child)),
1113 pengine_self(Id),
1114 protect_pengine(Id, pengine_unregister(Id)).
1115
1116abort_exception('$aborted').
1117abort_exception(unwind(abort)).
1118
1123
1124:- thread_local wrap_first_answer_in_create_event/2. 1125
1126:- meta_predicate
1127 pengine_prepare_source(:, +). 1128
1129pengine_main(Parent, Options, Application) :-
1130 fix_streams,
1131 thread_get_message(pengine_registered(Self)),
1132 nb_setval(pengine_parent, Parent),
1133 pengine_register_user(Options),
1134 set_prolog_flag(mitigate_spectre, true),
1135 catch(in_temporary_module(
1136 Self,
1137 pengine_prepare_source(Application, Options),
1138 pengine_create_and_loop(Self, Application, Options)),
1139 prepare_source_failed,
1140 pengine_terminate(Self)).
1141
1142pengine_create_and_loop(Self, Application, Options) :-
1143 setting(Application:slave_limit, SlaveLimit),
1144 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
1145 ( option(ask(Query0), Options)
1146 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
1147 ( string(Query0) 1148 -> ( option(template(TemplateS), Options)
1149 -> Ask2 = Query0-TemplateS
1150 ; Ask2 = Query0
1151 ),
1152 catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
1153 Error, true),
1154 ( var(Error)
1155 -> true
1156 ; send_error(Error),
1157 throw(prepare_source_failed)
1158 )
1159 ; Query = Query0,
1160 option(template(Template), Options, Query),
1161 option(bindings(Bindings), Options, [])
1162 ),
1163 option(chunk(Chunk), Options, 1),
1164 pengine_ask(Self, Query,
1165 [ template(Template),
1166 chunk(Chunk),
1167 bindings(Bindings)
1168 ])
1169 ; Extra = [],
1170 pengine_reply(CreateEvent)
1171 ),
1172 pengine_main_loop(Self).
1173
1174
1181
1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
1183 !,
1184 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
1185 term_string(t(Template1,Ask1), AskTemplate,
1186 [ variable_names(Bindings0),
1187 module(Module)
1188 ]),
1189 phrase(template_bindings(Template1, Bindings0), Bindings).
1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
1191 term_string(Ask1, Ask,
1192 [ variable_names(Bindings),
1193 module(Module)
1194 ]),
1195 exclude(anon, Bindings, Bindings1),
1196 dict_create(Template, swish_default_template, Bindings1).
1197
1198template_bindings(Var, Bindings) -->
1199 { var(Var) }, !,
1200 ( { var_binding(Bindings, Var, Binding)
1201 }
1202 -> [Binding]
1203 ; []
1204 ).
1205template_bindings([H|T], Bindings) -->
1206 !,
1207 template_bindings(H, Bindings),
1208 template_bindings(T, Bindings).
1209template_bindings(Compoound, Bindings) -->
1210 { compound(Compoound), !,
1211 compound_name_arguments(Compoound, _, Args)
1212 },
1213 template_bindings(Args, Bindings).
1214template_bindings(_, _) --> [].
1215
1216var_binding(Bindings, Var, Binding) :-
1217 member(Binding, Bindings),
1218 arg(2, Binding, V),
1219 V == Var, !.
1220
1225
1226fix_streams :-
1227 fix_stream(current_output).
1228
1229fix_stream(Name) :-
1230 is_cgi_stream(Name),
1231 !,
1232 debug(pengine(stream), '~w is a CGI stream!', [Name]),
1233 set_stream(user_output, alias(Name)).
1234fix_stream(_).
1235
1242
1243pengine_prepare_source(Module:Application, Options) :-
1244 setting(Application:program_space, SpaceLimit),
1245 set_module(Module:program_space(SpaceLimit)),
1246 delete_import_module(Module, user),
1247 add_import_module(Module, Application, start),
1248 catch(prep_module(Module, Application, Options), Error, true),
1249 ( var(Error)
1250 -> true
1251 ; send_error(Error),
1252 throw(prepare_source_failed)
1253 ).
1254
1255prep_module(Module, Application, Options) :-
1256 maplist(copy_flag(Module, Application), [var_prefix]),
1257 forall(prepare_module(Module, Application, Options), true),
1258 setup_call_cleanup(
1259 '$set_source_module'(OldModule, Module),
1260 maplist(process_create_option(Module), Options),
1261 '$set_source_module'(OldModule)).
1262
1263copy_flag(Module, Application, Flag) :-
1264 current_prolog_flag(Application:Flag, Value),
1265 !,
1266 set_prolog_flag(Module:Flag, Value).
1267copy_flag(_, _, _).
1268
1269process_create_option(Application, src_text(Text)) :-
1270 !,
1271 pengine_src_text(Text, Application).
1272process_create_option(Application, src_url(URL)) :-
1273 !,
1274 pengine_src_url(URL, Application).
1275process_create_option(_, _).
1276
1277
1296
1297
1298pengine_main_loop(ID) :-
1299 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
1300
1301pengine_aborted(ID) :-
1302 thread_self(Self),
1303 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
1304 empty_queue,
1305 destroy_or_continue(abort(ID)).
1306
1307
1317
1318guarded_main_loop(ID) :-
1319 pengine_request(Request),
1320 ( Request = destroy
1321 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
1322 pengine_terminate(ID)
1323 ; Request = ask(Goal, Options)
1324 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
1325 ask(ID, Goal, Options)
1326 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
1327 pengine_reply(error(ID, error(protocol_error, _))),
1328 guarded_main_loop(ID)
1329 ).
1330
1331
1332pengine_terminate(ID) :-
1333 pengine_reply(destroy(ID)),
1334 thread_self(Me), 1335 thread_detach(Me).
1336
1337
1345
1346solve(Chunk, Template, Goal, ID) :-
1347 prolog_current_choice(Choice),
1348 ( integer(Chunk)
1349 -> State = count(Chunk)
1350 ; Chunk == false
1351 -> State = no_chunk
1352 ; domain_error(chunk, Chunk)
1353 ),
1354 statistics(cputime, Epoch),
1355 Time = time(Epoch),
1356 nb_current('$variable_names', Bindings),
1357 filter_template(Template, Bindings, Template2),
1358 '$current_typein_module'(CurrTypeIn),
1359 ( '$set_typein_module'(ID),
1360 call_cleanup(catch(findnsols_no_empty(State, Template2,
1361 set_projection(Goal, Bindings),
1362 Result),
1363 Error, true),
1364 query_done(Det, CurrTypeIn)),
1365 arg(1, Time, T0),
1366 statistics(cputime, T1),
1367 CPUTime is T1-T0,
1368 forall(pengine_flush_output_hook, true),
1369 ( var(Error)
1370 -> projection(Projection),
1371 ( var(Det)
1372 -> pengine_reply(success(ID, Result, Projection,
1373 CPUTime, true)),
1374 more_solutions(ID, Choice, State, Time)
1375 ; !, 1376 destroy_or_continue(success(ID, Result, Projection,
1377 CPUTime, false))
1378 )
1379 ; !, 1380 ( Error == abort_query
1381 -> throw(Error)
1382 ; destroy_or_continue(error(ID, Error))
1383 )
1384 )
1385 ; !, 1386 arg(1, Time, T0),
1387 statistics(cputime, T1),
1388 CPUTime is T1-T0,
1389 destroy_or_continue(failure(ID, CPUTime))
1390 ).
1391solve(_, _, _, _). 1392
1393query_done(true, CurrTypeIn) :-
1394 '$set_typein_module'(CurrTypeIn).
1395
1396
1402
1403set_projection(Goal, Bindings) :-
1404 b_setval('$variable_names', Bindings),
1405 call(Goal).
1406
1407projection(Projection) :-
1408 nb_current('$variable_names', Bindings),
1409 !,
1410 maplist(var_name, Bindings, Projection).
1411projection([]).
1412
1420
1421filter_template(Template0, Bindings, Template) :-
1422 is_dict(Template0, swish_default_template),
1423 !,
1424 dict_create(Template, swish_default_template, Bindings).
1425filter_template(Template, _Bindings, Template).
1426
1427findnsols_no_empty(no_chunk, Template, Goal, List) =>
1428 List = [Template],
1429 call(Goal).
1430findnsols_no_empty(State, Template, Goal, List) =>
1431 findnsols(State, Template, Goal, List),
1432 List \== [].
1433
1434destroy_or_continue(Event) :-
1435 arg(1, Event, ID),
1436 ( pengine_property(ID, destroy(true))
1437 -> thread_self(Me),
1438 thread_detach(Me),
1439 pengine_reply(destroy(ID, Event))
1440 ; pengine_reply(Event),
1441 guarded_main_loop(ID)
1442 ).
1443
1459
1460more_solutions(ID, Choice, State, Time) :-
1461 pengine_request(Event),
1462 more_solutions(Event, ID, Choice, State, Time).
1463
1464more_solutions(stop, ID, _Choice, _State, _Time) :-
1465 !,
1466 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
1467 destroy_or_continue(stop(ID)).
1468more_solutions(next, ID, _Choice, _State, Time) :-
1469 !,
1470 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
1471 statistics(cputime, T0),
1472 nb_setarg(1, Time, T0),
1473 fail.
1474more_solutions(next(Count), ID, _Choice, State, Time) :-
1475 Count > 0,
1476 State = count(_), 1477 !,
1478 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
1479 nb_setarg(1, State, Count),
1480 statistics(cputime, T0),
1481 nb_setarg(1, Time, T0),
1482 fail.
1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
1484 !,
1485 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
1486 prolog_cut_to(Choice),
1487 ask(ID, Goal, Options).
1488more_solutions(destroy, ID, _Choice, _State, _Time) :-
1489 !,
1490 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
1491 pengine_terminate(ID).
1492more_solutions(Event, ID, Choice, State, Time) :-
1493 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
1494 pengine_reply(error(ID, error(protocol_error, _))),
1495 more_solutions(ID, Choice, State, Time).
1496
1502
1503ask(ID, Goal, Options) :-
1504 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1505 !,
1506 ( var(Error)
1507 -> option(template(Template), Options, Goal),
1508 option(chunk(N), Options, 1),
1509 solve(N, Template, Goal1, ID)
1510 ; pengine_reply(error(ID, Error)),
1511 guarded_main_loop(ID)
1512 ).
1513
1525
1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
1527 option(bindings(Bindings), Options, []),
1528 b_setval('$variable_names', Bindings),
1529 ( prepare_goal(Goal0, Goal1, Options)
1530 -> true
1531 ; Goal1 = Goal0
1532 ),
1533 get_pengine_module(ID, Module),
1534 setup_call_cleanup(
1535 '$set_source_module'(Old, Module),
1536 expand_goal(Goal1, Goal),
1537 '$set_source_module'(_, Old)),
1538 ( pengine_not_sandboxed(ID)
1539 -> true
1540 ; get_pengine_application(ID, App),
1541 setting(App:safe_goal_limit, Limit),
1542 catch(call_with_time_limit(
1543 Limit,
1544 safe_goal(Module:Goal)), E, true)
1545 -> ( var(E)
1546 -> true
1547 ; E = time_limit_exceeded
1548 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1549 ; throw(E)
1550 )
1551 ).
1552
1553
1563
1564
1570
1571pengine_not_sandboxed(ID) :-
1572 pengine_user(ID, User),
1573 pengine_property(ID, application(App)),
1574 not_sandboxed(User, App),
1575 !.
1576
1589
1590
1596
1597pengine_pull_response(Pengine, Options) :-
1598 pengine_remote(Pengine, Server),
1599 !,
1600 remote_pengine_pull_response(Server, Pengine, Options).
1601pengine_pull_response(_ID, _Options).
1602
1603
1609
1610pengine_input(Prompt, Term) :-
1611 pengine_self(Self),
1612 pengine_parent(Parent),
1613 pengine_reply(Parent, prompt(Self, Prompt)),
1614 pengine_request(Request),
1615 ( Request = input(Input)
1616 -> Term = Input
1617 ; Request == destroy
1618 -> abort
1619 ; throw(error(protocol_error,_))
1620 ).
1621
1622
1636
1637pengine_respond(Pengine, Input, Options) :-
1638 pengine_send(Pengine, input(Input), Options).
1639
1640
1646
1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
1648 is_list(Frames),
1649 !,
1650 with_output_to(string(Stack),
1651 print_prolog_backtrace(current_output, Frames)),
1652 pengine_self(Self),
1653 replace_blobs(Formal, Formal1),
1654 replace_blobs(Message, Message1),
1655 pengine_reply(error(Self, error(Formal1,
1656 context(prolog_stack(Stack), Message1)))).
1657send_error(Error) :-
1658 pengine_self(Self),
1659 replace_blobs(Error, Error1),
1660 pengine_reply(error(Self, Error1)).
1661
1667
1668replace_blobs(Blob, Atom) :-
1669 blob(Blob, Type), Type \== text,
1670 !,
1671 format(atom(Atom), '~p', [Blob]).
1672replace_blobs(Term0, Term) :-
1673 compound(Term0),
1674 !,
1675 compound_name_arguments(Term0, Name, Args0),
1676 maplist(replace_blobs, Args0, Args),
1677 compound_name_arguments(Term, Name, Args).
1678replace_blobs(Term, Term).
1679
1680
1683
1684
1685remote_pengine_create(BaseURL, Options) :-
1686 partition(pengine_create_option, Options, PengineOptions0, RestOptions),
1687 ( option(ask(Query), PengineOptions0),
1688 \+ option(template(_Template), PengineOptions0)
1689 -> PengineOptions = [template(Query)|PengineOptions0]
1690 ; PengineOptions = PengineOptions0
1691 ),
1692 options_to_dict(PengineOptions, PostData),
1693 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
1694 arg(1, Reply, ID),
1695 ( option(id(ID2), Options)
1696 -> ID = ID2
1697 ; true
1698 ),
1699 option(alias(Name), Options, ID),
1700 assert(child(Name, ID)),
1701 ( ( functor(Reply, create, _) 1702 ; functor(Reply, output, _) 1703 )
1704 -> option(application(Application), PengineOptions, pengine_sandbox),
1705 option(destroy(Destroy), PengineOptions, true),
1706 pengine_register_remote(ID, BaseURL, Application, Destroy)
1707 ; true
1708 ),
1709 thread_self(Queue),
1710 pengine_reply(Queue, Reply).
1711
1712options_to_dict(Options, Dict) :-
1713 select_option(ask(Ask), Options, Options1),
1714 select_option(template(Template), Options1, Options2),
1715 !,
1716 no_numbered_var_in(Ask+Template),
1717 findall(AskString-TemplateString,
1718 ask_template_to_strings(Ask, Template, AskString, TemplateString),
1719 [ AskString-TemplateString ]),
1720 options_to_dict(Options2, Dict0),
1721 Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
1722options_to_dict(Options, Dict) :-
1723 maplist(prolog_option, Options, Options1),
1724 dict_create(Dict, _, Options1).
1725
1726no_numbered_var_in(Term) :-
1727 sub_term(Sub, Term),
1728 subsumes_term('$VAR'(_), Sub),
1729 !,
1730 domain_error(numbered_vars_free_term, Term).
1731no_numbered_var_in(_).
1732
1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
1734 numbervars(Ask+Template, 0, _),
1735 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
1736 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
1737 Template, WOpts
1738 ]),
1739 split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
1740
1741prolog_option(Option0, Option) :-
1742 create_option_type(Option0, term),
1743 !,
1744 Option0 =.. [Name,Value],
1745 format(string(String), '~k', [Value]),
1746 Option =.. [Name,String].
1747prolog_option(Option, Option).
1748
1749create_option_type(ask(_), term).
1750create_option_type(template(_), term).
1751create_option_type(application(_), atom).
1752
1753remote_pengine_send(BaseURL, ID, Event, Options) :-
1754 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
1755 thread_self(Queue),
1756 pengine_reply(Queue, Reply).
1757
1758remote_pengine_pull_response(BaseURL, ID, Options) :-
1759 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
1760 thread_self(Queue),
1761 pengine_reply(Queue, Reply).
1762
1763remote_pengine_abort(BaseURL, ID, Options) :-
1764 remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
1765 thread_self(Queue),
1766 pengine_reply(Queue, Reply).
1767
1772
1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
1774 !,
1775 server_url(Server, Action, [id=ID], URL),
1776 http_open(URL, Stream, 1777 [ post(prolog(Event)) 1778 | Options
1779 ]),
1780 call_cleanup(
1781 read_prolog_reply(Stream, Reply),
1782 close(Stream)).
1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
1784 server_url(Server, Action, [id=ID|Params], URL),
1785 http_open(URL, Stream, Options),
1786 call_cleanup(
1787 read_prolog_reply(Stream, Reply),
1788 close(Stream)).
1789
1790remote_post_rec(Server, Action, Data, Reply, Options) :-
1791 server_url(Server, Action, [], URL),
1792 probe(Action, URL, Options),
1793 http_open(URL, Stream,
1794 [ post(json(Data))
1795 | Options
1796 ]),
1797 call_cleanup(
1798 read_prolog_reply(Stream, Reply),
1799 close(Stream)).
1800
1806
1807probe(create, URL, Options) :-
1808 !,
1809 http_open(URL, Stream, [method(options)|Options]),
1810 close(Stream).
1811probe(_, _, _).
1812
1813read_prolog_reply(In, Reply) :-
1814 set_stream(In, encoding(utf8)),
1815 read(In, Reply0),
1816 rebind_cycles(Reply0, Reply).
1817
1818rebind_cycles(@(Reply, Bindings), Reply) :-
1819 is_list(Bindings),
1820 !,
1821 maplist(bind, Bindings).
1822rebind_cycles(Reply, Reply).
1823
1824bind(Var = Value) :-
1825 Var = Value.
1826
1827server_url(Server, Action, Params, URL) :-
1828 atom_concat('pengine/', Action, PAction),
1829 uri_edit([ path(PAction),
1830 search(Params)
1831 ], Server, URL).
1832
1833
1851
1852pengine_event(Event) :-
1853 pengine_event(Event, []).
1854
1855pengine_event(Event, Options) :-
1856 thread_self(Self),
1857 option(listen(Id), Options, _),
1858 ( thread_get_message(Self, pengine_event(Id, Event), Options)
1859 -> true
1860 ; Event = timeout
1861 ),
1862 update_remote_destroy(Event).
1863
1864update_remote_destroy(Event) :-
1865 destroy_event(Event),
1866 arg(1, Event, Id),
1867 pengine_remote(Id, _Server),
1868 !,
1869 pengine_unregister_remote(Id).
1870update_remote_destroy(_).
1871
1872destroy_event(destroy(_)).
1873destroy_event(destroy(_,_)).
1874destroy_event(create(_,Features)) :-
1875 memberchk(answer(Answer), Features),
1876 !,
1877 nonvar(Answer),
1878 destroy_event(Answer).
1879
1880
1906
1907pengine_event_loop(Closure, Options) :-
1908 child(_,_),
1909 !,
1910 pengine_event(Event),
1911 ( option(autoforward(all), Options) 1912 -> forall(child(_,ID), pengine_send(ID, Event))
1913 ; true
1914 ),
1915 pengine_event_loop(Event, Closure, Options).
1916pengine_event_loop(_, _).
1917
1918:- meta_predicate
1919 pengine_process_event(+, 1, -, +). 1920
1921pengine_event_loop(Event, Closure, Options) :-
1922 pengine_process_event(Event, Closure, Continue, Options),
1923 ( Continue == true
1924 -> pengine_event_loop(Closure, Options)
1925 ; true
1926 ).
1927
1928pengine_process_event(create(ID, T), Closure, Continue, Options) :-
1929 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
1930 ( select(answer(First), T, T1)
1931 -> ignore(call(Closure, create(ID, T1))),
1932 pengine_process_event(First, Closure, Continue, Options)
1933 ; ignore(call(Closure, create(ID, T))),
1934 Continue = true
1935 ).
1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
1937 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
1938 ignore(call(Closure, output(ID, Msg))),
1939 pengine_pull_response(ID, []).
1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
1941 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
1942 ignore(call(Closure, debug(ID, Msg))),
1943 pengine_pull_response(ID, []).
1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
1945 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
1946 ignore(call(Closure, prompt(ID, Term))).
1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
1948 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
1949 ignore(call(Closure, success(ID, Sol, More))).
1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
1951 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
1952 ignore(call(Closure, failure(ID))).
1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
1954 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
1955 ( call(Closure, error(ID, Error))
1956 -> Continue = true
1957 ; forall(child(_,Child), pengine_destroy(Child)),
1958 throw(Error)
1959 ).
1960pengine_process_event(stop(ID), Closure, true, _Options) :-
1961 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
1962 ignore(call(Closure, stop(ID))).
1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
1964 pengine_process_event(Event, Closure, _, Options),
1965 pengine_process_event(destroy(ID), Closure, Continue, Options).
1966pengine_process_event(destroy(ID), Closure, true, _Options) :-
1967 retractall(child(_,ID)),
1968 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
1969 ignore(call(Closure, destroy(ID))).
1970
1971
1997
1998pengine_rpc(URL, Query) :-
1999 pengine_rpc(URL, Query, []).
2000
2001pengine_rpc(URL, Query, M:Options0) :-
2002 translate_local_sources(Options0, Options1, M),
2003 ( option(timeout(_), Options1)
2004 -> Options = Options1
2005 ; setting(time_limit, Limit),
2006 Options = [timeout(Limit)|Options1]
2007 ),
2008 term_variables(Query, Vars),
2009 Template =.. [v|Vars],
2010 State = destroy(true), 2011 setup_call_catcher_cleanup(
2012 pengine_create([ ask(Query),
2013 template(Template),
2014 server(URL),
2015 id(Id)
2016 | Options
2017 ]),
2018 wait_event(Template, State, [listen(Id)|Options]),
2019 Why,
2020 pengine_destroy_and_wait(State, Id, Why, Options)).
2021
2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
2023 !,
2024 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
2025 pengine_destroy(Id, Options),
2026 wait_destroy(Id, 10).
2027pengine_destroy_and_wait(_, _, Why, _) :-
2028 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
2029
2030wait_destroy(Id, _) :-
2031 \+ child(_, Id),
2032 !.
2033wait_destroy(Id, N) :-
2034 pengine_event(Event, [listen(Id),timeout(10)]),
2035 !,
2036 ( destroy_event(Event)
2037 -> retractall(child(_,Id))
2038 ; succ(N1, N)
2039 -> wait_destroy(Id, N1)
2040 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
2041 pengine_unregister_remote(Id),
2042 retractall(child(_,Id))
2043 ).
2044
2045wait_event(Template, State, Options) :-
2046 pengine_event(Event, Options),
2047 debug(pengine(event), 'Received ~p', [Event]),
2048 process_event(Event, Template, State, Options).
2049
2050process_event(create(_ID, Features), Template, State, Options) :-
2051 memberchk(answer(First), Features),
2052 process_event(First, Template, State, Options).
2053process_event(error(_ID, Error), _Template, _, _Options) :-
2054 throw(Error).
2055process_event(failure(_ID, _Time), _Template, _, _Options) :-
2056 fail.
2057process_event(prompt(ID, Prompt), Template, State, Options) :-
2058 pengine_rpc_prompt(ID, Prompt, Reply),
2059 pengine_send(ID, input(Reply)),
2060 wait_event(Template, State, Options).
2061process_event(output(ID, Term), Template, State, Options) :-
2062 pengine_rpc_output(ID, Term),
2063 pengine_pull_response(ID, Options),
2064 wait_event(Template, State, Options).
2065process_event(debug(ID, Message), Template, State, Options) :-
2066 debug(pengine(debug), '~w', [Message]),
2067 pengine_pull_response(ID, Options),
2068 wait_event(Template, State, Options).
2069process_event(success(_ID, Solutions, _Proj, _Time, false),
2070 Template, _, _Options) :-
2071 !,
2072 member(Template, Solutions).
2073process_event(success(ID, Solutions, _Proj, _Time, true),
2074 Template, State, Options) :-
2075 ( member(Template, Solutions)
2076 ; pengine_next(ID, Options),
2077 wait_event(Template, State, Options)
2078 ).
2079process_event(destroy(ID, Event), Template, State, Options) :-
2080 !,
2081 retractall(child(_,ID)),
2082 nb_setarg(1, State, false),
2083 debug(pengine(destroy), 'State: ~p~n', [State]),
2084 process_event(Event, Template, State, Options).
2086process_event(success(ID, Solutions, Time, More),
2087 Template, State, Options) :-
2088 process_event(success(ID, Solutions, _Proj, Time, More),
2089 Template, State, Options).
2090
2091
2092pengine_rpc_prompt(ID, Prompt, Term) :-
2093 prompt(ID, Prompt, Term0),
2094 !,
2095 Term = Term0.
2096pengine_rpc_prompt(_ID, Prompt, Term) :-
2097 setup_call_cleanup(
2098 prompt(Old, Prompt),
2099 read(Term),
2100 prompt(_, Old)).
2101
2102pengine_rpc_output(ID, Term) :-
2103 output(ID, Term),
2104 !.
2105pengine_rpc_output(_ID, Term) :-
2106 print(Term).
2107
2112
2113:- multifile prompt/3. 2114
2119
2120:- multifile output/2. 2121
2122
2125
2137
2138:- http_handler(root(pengine), http_404([]),
2139 [ id(pengines) ]). 2140:- http_handler(root(pengine/create), http_pengine_create,
2141 [ time_limit(infinite), spawn([]) ]). 2142:- http_handler(root(pengine/send), http_pengine_send,
2143 [ time_limit(infinite), spawn([]) ]). 2144:- http_handler(root(pengine/pull_response), http_pengine_pull_response,
2145 [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(root(pengine/abort), http_pengine_abort, []). 2147:- http_handler(root(pengine/detach), http_pengine_detach, []). 2148:- http_handler(root(pengine/list), http_pengine_list, []). 2149:- http_handler(root(pengine/ping), http_pengine_ping, []). 2150:- http_handler(root(pengine/destroy_all), http_pengine_destroy_all, []). 2151
2152:- http_handler(root(pengine/'pengines.js'),
2153 http_reply_file(library('http/web/js/pengines.js'), []), []). 2154:- http_handler(root(pengine/'plterm.css'),
2155 http_reply_file(library('http/web/css/plterm.css'), []), []). 2156
2157
2185
2186http_pengine_create(Request) :-
2187 reply_options(Request, [post]),
2188 !.
2189http_pengine_create(Request) :-
2190 memberchk(content_type(CT), Request),
2191 sub_atom(CT, 0, _, _, 'application/json'),
2192 !,
2193 http_read_json_dict(Request, Dict),
2194 dict_atom_option(format, Dict, Format, prolog),
2195 dict_atom_option(application, Dict, Application, pengine_sandbox),
2196 http_pengine_create(Request, Application, Format, Dict).
2197http_pengine_create(Request) :-
2198 Optional = [optional(true)],
2199 OptString = [string|Optional],
2200 Form = [ format(Format, [default(prolog)]),
2201 application(Application, [default(pengine_sandbox)]),
2202 chunk(_, [nonneg;oneof([false]), default(1)]),
2203 collate(_, [number, default(0)]),
2204 solutions(_, [oneof([all,chunked]), default(chunked)]),
2205 ask(_, OptString),
2206 template(_, OptString),
2207 src_text(_, OptString),
2208 disposition(_, OptString),
2209 src_url(_, Optional)
2210 ],
2211 http_parameters(Request, Form),
2212 form_dict(Form, Dict),
2213 http_pengine_create(Request, Application, Format, Dict).
2214
2215dict_atom_option(Key, Dict, Atom, Default) :-
2216 ( get_dict(Key, Dict, String)
2217 -> atom_string(Atom, String)
2218 ; Atom = Default
2219 ).
2220
2221form_dict(Form, Dict) :-
2222 form_values(Form, Pairs),
2223 dict_pairs(Dict, _, Pairs).
2224
2225form_values([], []).
2226form_values([H|T], Pairs) :-
2227 arg(1, H, Value),
2228 nonvar(Value),
2229 !,
2230 functor(H, Name, _),
2231 Pairs = [Name-Value|PairsT],
2232 form_values(T, PairsT).
2233form_values([_|T], Pairs) :-
2234 form_values(T, Pairs).
2235
2237
2238
2239http_pengine_create(Request, Application, Format, Dict) :-
2240 current_application(Application),
2241 !,
2242 allowed(Request, Application),
2243 authenticate(Request, Application, UserOptions),
2244 dict_to_options(Dict, Application, CreateOptions0),
2245 append(UserOptions, CreateOptions0, CreateOptions),
2246 pengine_uuid(Pengine),
2247 message_queue_create(Queue, [max_size(25)]),
2248 setting(Application:time_limit, TimeLimit),
2249 get_time(Now),
2250 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
2251 broadcast(pengine(create(Pengine, Application, CreateOptions))),
2252 create(Queue, Pengine, CreateOptions, http, Application),
2253 create_wait_and_output_result(Pengine, Queue, Format,
2254 TimeLimit, Dict),
2255 gc_abandoned_queues.
2256http_pengine_create(_Request, Application, Format, _Dict) :-
2257 Error = existence_error(pengine_application, Application),
2258 pengine_uuid(ID),
2259 output_result(ID, Format, error(ID, error(Error, _))).
2260
2261
2262dict_to_options(Dict, Application, CreateOptions) :-
2263 dict_pairs(Dict, _, Pairs),
2264 pairs_create_options(Pairs, Application, CreateOptions).
2265
2266pairs_create_options([], _, []) :- !.
2267pairs_create_options([N-V0|T0], App, [Opt|T]) :-
2268 Opt =.. [N,V],
2269 pengine_create_option(Opt), N \== user,
2270 !,
2271 ( create_option_type(Opt, atom)
2272 -> atom_string(V, V0) 2273 ; V = V0 2274 ), 2275 pairs_create_options(T0, App, T).
2276pairs_create_options([_|T0], App, T) :-
2277 pairs_create_options(T0, App, T).
2278
2287
2288wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2289 Collate is min(Collate0, TimeLimit/10),
2290 get_time(Epoch),
2291 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2292 [ timeout(TimeLimit)
2293 ]),
2294 Error, true)
2295 -> ( var(Error)
2296 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2297 ( collating_event(Collate, Event)
2298 -> Deadline is Epoch+TimeLimit,
2299 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2300 Events = [Event|More],
2301 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2302 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2303 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2304 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2305 )
2306 ; output_result(Pengine, Format, died(Pengine))
2307 )
2308 ; time_limit_exceeded(Pengine, Format)
2309 ).
2310
2315
2316collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
2317 !.
2318collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
2319 debug(pengine(wait), 'Waiting to collate events', []),
2320 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2321 [ timeout(Collate)
2322 ]),
2323 Error, true)
2324 -> ( var(Error)
2325 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2326 Events = [Event|More],
2327 ( collating_event(Collate, Event)
2328 -> Max2 is Max - 1,
2329 collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
2330 ; More = []
2331 )
2332 ; Events = [died(Pengine)]
2333 )
2334 ; get_time(Now),
2335 Now > Deadline
2336 -> time_limit_event(Pengine, TimeLimitEvent),
2337 Events = [TimeLimitEvent]
2338 ; Events = []
2339 ).
2340
2341collating_event(0, _) :-
2342 !,
2343 fail.
2344collating_event(_, output(_,_)).
2345
2352
2353create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
2354 get_dict(solutions, Dict, all),
2355 !,
2356 between(1, infinite, Page),
2357 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2358 [ timeout(TimeLimit)
2359 ]),
2360 Error, true)
2361 -> ( var(Error)
2362 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
2363 ( destroy_queue_from_http(Pengine, Event, Queue)
2364 -> !,
2365 protect_pengine(Pengine,
2366 output_result_2(Format, page(Page, Event), Dict))
2367 ; is_more_event(Event)
2368 -> pengine_thread(Pengine, Thread),
2369 thread_send_message(Thread, pengine_request(next)),
2370 protect_pengine(Pengine,
2371 output_result_2(Format, page(Page, Event), Dict)),
2372 fail
2373 ; !,
2374 protect_pengine(Pengine,
2375 output_result_2(Format, page(Page, Event), Dict))
2376 )
2377 ; !, output_result(Pengine, Format, died(Pengine))
2378 )
2379 ; !, time_limit_exceeded(Pengine, Format)
2380 ),
2381 !.
2382create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
2383 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
2384
2385is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
2386is_more_event(create(_, Options)) :-
2387 memberchk(answer(Event), Options),
2388 is_more_event(Event).
2389
2390
2391
2401
2402time_limit_exceeded(Pengine, Format) :-
2403 time_limit_event(Pengine, Event),
2404 call_cleanup(
2405 pengine_destroy(Pengine, [force(true)]),
2406 output_result(Pengine, Format, Event)).
2407
2408time_limit_event(Pengine,
2409 destroy(Pengine, error(Pengine, time_limit_exceeded))).
2410
2411destroy_pengine_after_output(Pengine, Events) :-
2412 is_list(Events),
2413 last(Events, Last),
2414 time_limit_event(Pengine, Last),
2415 !,
2416 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
2417destroy_pengine_after_output(_, _).
2418
2419
2431
2432destroy_queue_from_http(ID, _, Queue) :-
2433 output_queue(ID, Queue, _),
2434 !,
2435 destroy_queue_if_empty(Queue).
2436destroy_queue_from_http(ID, Event, Queue) :-
2437 debug(pengine(destroy), 'DESTROY? ~p', [Event]),
2438 is_destroy_event(Event),
2439 !,
2440 message_queue_property(Queue, size(Waiting)),
2441 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
2442 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
2443
2444is_destroy_event(destroy(_)).
2445is_destroy_event(destroy(_,_)).
2446is_destroy_event(create(_, Options)) :-
2447 memberchk(answer(Event), Options),
2448 is_destroy_event(Event).
2449
2450destroy_queue_if_empty(Queue) :-
2451 thread_peek_message(Queue, _),
2452 !.
2453destroy_queue_if_empty(Queue) :-
2454 retractall(output_queue(_, Queue, _)),
2455 message_queue_destroy(Queue).
2456
2462
2463:- dynamic
2464 last_gc/1. 2465
2466gc_abandoned_queues :-
2467 consider_queue_gc,
2468 !,
2469 get_time(Now),
2470 ( output_queue(_, Queue, Time),
2471 Now-Time > 15*60,
2472 retract(output_queue(_, Queue, Time)),
2473 message_queue_destroy(Queue),
2474 fail
2475 ; retractall(last_gc(_)),
2476 asserta(last_gc(Now))
2477 ).
2478gc_abandoned_queues.
2479
2480consider_queue_gc :-
2481 predicate_property(output_queue(_,_,_), number_of_clauses(N)),
2482 N > 100,
2483 ( last_gc(Time),
2484 get_time(Now),
2485 Now-Time > 5*60
2486 -> true
2487 ; \+ last_gc(_)
2488 ).
2489
2505
2506:- dynamic output_queue_destroyed/1. 2507
2508sync_destroy_queue_from_http(ID, Queue) :-
2509 ( output_queue(ID, Queue, _)
2510 -> destroy_queue_if_empty(Queue)
2511 ; thread_peek_message(Queue, pengine_event(_, output(_,_)))
2512 -> debug(pengine(destroy), 'Delay destruction of ~p because of output',
2513 [Queue]),
2514 get_time(Now),
2515 asserta(output_queue(ID, Queue, Now))
2516 ; message_queue_destroy(Queue),
2517 asserta(output_queue_destroyed(Queue))
2518 ).
2519
2524
2525sync_destroy_queue_from_pengine(ID, Queue) :-
2526 ( retract(output_queue_destroyed(Queue))
2527 -> true
2528 ; get_time(Now),
2529 asserta(output_queue(ID, Queue, Now))
2530 ),
2531 retractall(pengine_queue(ID, Queue, _, _)).
2532
2533
2534http_pengine_send(Request) :-
2535 reply_options(Request, [get,post]),
2536 !.
2537http_pengine_send(Request) :-
2538 http_parameters(Request,
2539 [ id(ID, [ type(atom) ]),
2540 event(EventString, [optional(true)]),
2541 collate(Collate, [number, default(0)]),
2542 format(Format, [default(prolog)])
2543 ]),
2544 catch(read_event(ID, Request, Format, EventString, Event),
2545 Error,
2546 true),
2547 ( var(Error)
2548 -> debug(pengine(event), 'HTTP send: ~p', [Event]),
2549 ( pengine_thread(ID, Thread)
2550 -> pengine_queue(ID, Queue, TimeLimit, _),
2551 random_delay,
2552 broadcast(pengine(send(ID, Event))),
2553 thread_send_message(Thread, pengine_request(Event)),
2554 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
2555 ; atom(ID)
2556 -> pengine_died(Format, ID)
2557 ; http_404([], Request)
2558 )
2559 ; Error = error(existence_error(pengine, ID), _)
2560 -> pengine_died(Format, ID)
2561 ; output_result(ID, Format, error(ID, Error))
2562 ).
2563
2564pengine_died(Format, Pengine) :-
2565 output_result(Pengine, Format,
2566 error(Pengine, error(existence_error(pengine, Pengine),_))).
2567
2568
2576
2577read_event(Pengine, Request, Format, EventString, Event) :-
2578 protect_pengine(
2579 Pengine,
2580 ( get_pengine_module(Pengine, Module),
2581 read_event_2(Request, EventString, Module, Event0, Bindings)
2582 )),
2583 !,
2584 fix_bindings(Format, Event0, Bindings, Event).
2585read_event(Pengine, Request, _Format, _EventString, _Event) :-
2586 debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
2587 discard_post_data(Request),
2588 existence_error(pengine, Pengine).
2589
2590
2595
2596read_event_2(_Request, EventString, Module, Event, Bindings) :-
2597 nonvar(EventString),
2598 !,
2599 term_string(Event, EventString,
2600 [ variable_names(Bindings),
2601 module(Module)
2602 ]).
2603read_event_2(Request, _EventString, Module, Event, Bindings) :-
2604 option(method(post), Request),
2605 http_read_data(Request, Event,
2606 [ content_type('application/x-prolog'),
2607 module(Module),
2608 variable_names(Bindings)
2609 ]).
2610
2614
2615discard_post_data(Request) :-
2616 option(method(post), Request),
2617 !,
2618 setup_call_cleanup(
2619 open_null_stream(NULL),
2620 http_read_data(Request, _, [to(stream(NULL))]),
2621 close(NULL)).
2622discard_post_data(_).
2623
2629
2630fix_bindings(Format,
2631 ask(Goal, Options0), Bindings,
2632 ask(Goal, NewOptions)) :-
2633 json_lang(Format),
2634 !,
2635 exclude(anon, Bindings, NamedBindings),
2636 template(NamedBindings, Template, Options0, Options1),
2637 select_option(chunk(Paging), Options1, Options2, 1),
2638 NewOptions = [ template(Template),
2639 chunk(Paging),
2640 bindings(NamedBindings)
2641 | Options2
2642 ].
2643fix_bindings(_, Command, _, Command).
2644
2645template(_, Template, Options0, Options) :-
2646 select_option(template(Template), Options0, Options),
2647 !.
2648template(Bindings, Template, Options, Options) :-
2649 dict_create(Template, swish_default_template, Bindings).
2650
2651anon(Name=_) :-
2652 sub_atom(Name, 0, _, _, '_'),
2653 sub_atom(Name, 1, 1, _, Next),
2654 char_type(Next, prolog_var_start).
2655
2656var_name(Name=_, Name).
2657
2658
2662
2663json_lang(json) :- !.
2664json_lang(Format) :-
2665 sub_atom(Format, 0, _, _, 'json-').
2666
2671
2672http_pengine_pull_response(Request) :-
2673 reply_options(Request, [get]),
2674 !.
2675http_pengine_pull_response(Request) :-
2676 http_parameters(Request,
2677 [ id(ID, []),
2678 format(Format, [default(prolog)]),
2679 collate(Collate, [number, default(0)])
2680 ]),
2681 reattach(ID),
2682 ( ( pengine_queue(ID, Queue, TimeLimit, _)
2683 -> true
2684 ; output_queue(ID, Queue, _),
2685 TimeLimit = 0
2686 )
2687 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
2688 ; http_404([], Request)
2689 ).
2690
2697
2698http_pengine_abort(Request) :-
2699 reply_options(Request, [get,post]),
2700 !.
2701http_pengine_abort(Request) :-
2702 http_parameters(Request,
2703 [ id(ID, [])
2704 ]),
2705 ( pengine_thread(ID, _Thread)
2706 -> broadcast(pengine(abort(ID))),
2707 abort_pending_output(ID),
2708 pengine_abort(ID),
2709 reply_json_dict(true)
2710 ; http_404([], Request)
2711 ).
2712
2722
2723http_pengine_detach(Request) :-
2724 reply_options(Request, [post]),
2725 !.
2726http_pengine_detach(Request) :-
2727 http_parameters(Request,
2728 [ id(ID, [])
2729 ]),
2730 http_read_json_dict(Request, ClientData),
2731 ( pengine_property(ID, application(Application)),
2732 allowed(Request, Application),
2733 authenticate(Request, Application, _UserOptions)
2734 -> broadcast(pengine(detach(ID))),
2735 get_time(Now),
2736 assertz(pengine_detached(ID, ClientData.put(time, Now))),
2737 pengine_queue(ID, Queue, _TimeLimit, _Now),
2738 message_queue_set(Queue, max_size(1000)),
2739 pengine_reply(Queue, detached(ID)),
2740 reply_json_dict(true)
2741 ; http_404([], Request)
2742 ).
2743
2744reattach(ID) :-
2745 ( retract(pengine_detached(ID, _Data)),
2746 pengine_queue(ID, Queue, _TimeLimit, _Now)
2747 -> message_queue_set(Queue, max_size(25))
2748 ; true
2749 ).
2750
2751
2756
2757http_pengine_destroy_all(Request) :-
2758 reply_options(Request, [get,post]),
2759 !.
2760http_pengine_destroy_all(Request) :-
2761 http_parameters(Request,
2762 [ ids(IDsAtom, [])
2763 ]),
2764 atomic_list_concat(IDs, ',', IDsAtom),
2765 forall(( member(ID, IDs),
2766 \+ pengine_detached(ID, _)
2767 ),
2768 pengine_destroy(ID, [force(true)])),
2769 reply_json_dict("ok").
2770
2776
2777http_pengine_ping(Request) :-
2778 reply_options(Request, [get]),
2779 !.
2780http_pengine_ping(Request) :-
2781 http_parameters(Request,
2782 [ id(Pengine, []),
2783 format(Format, [default(prolog)])
2784 ]),
2785 ( pengine_thread(Pengine, Thread),
2786 Error = error(_,_),
2787 catch(thread_statistics(Thread, Stats), Error, fail)
2788 -> output_result(Pengine, Format, ping(Pengine, Stats))
2789 ; output_result(Pengine, Format, died(Pengine))
2790 ).
2791
2798
2799http_pengine_list(Request) :-
2800 reply_options(Request, [get]),
2801 !.
2802http_pengine_list(Request) :-
2803 http_parameters(Request,
2804 [ status(Status, [default(detached), oneof([detached])]),
2805 application(Application, [default(pengine_sandbox)])
2806 ]),
2807 allowed(Request, Application),
2808 authenticate(Request, Application, _UserOptions),
2809 findall(Term, listed_pengine(Application, Status, Term), Terms),
2810 reply_json_dict(json{pengines: Terms}).
2811
2812listed_pengine(Application, detached, State) :-
2813 State = pengine{id:Id,
2814 detached:Time,
2815 queued:Queued,
2816 stats:Stats},
2817
2818 pengine_property(Id, application(Application)),
2819 pengine_property(Id, detached(Time)),
2820 pengine_queue(Id, Queue, _TimeLimit, _Now),
2821 message_queue_property(Queue, size(Queued)),
2822 ( pengine_thread(Id, Thread),
2823 catch(thread_statistics(Thread, Stats), _, fail)
2824 -> true
2825 ; Stats = thread{status:died}
2826 ).
2827
2828
2836
2837:- dynamic
2838 pengine_replying/2. 2839
2840output_result(Pengine, Format, Event) :-
2841 thread_self(Thread),
2842 cors_enable, 2843 disable_client_cache,
2844 setup_call_cleanup(
2845 asserta(pengine_replying(Pengine, Thread), Ref),
2846 catch(output_result_2(Format, Event, _{}),
2847 pengine_abort_output,
2848 true),
2849 erase(Ref)),
2850 destroy_pengine_after_output(Pengine, Event).
2851
2852output_result_2(Lang, Event, Dict) :-
2853 write_result(Lang, Event, Dict),
2854 !.
2855output_result_2(prolog, Event, _) :-
2856 !,
2857 format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
2858 write_term(Event,
2859 [ quoted(true),
2860 ignore_ops(true),
2861 fullstop(true),
2862 blobs(portray),
2863 portray_goal(portray_blob),
2864 nl(true)
2865 ]).
2866output_result_2(Lang, Event, _) :-
2867 json_lang(Lang),
2868 !,
2869 ( event_term_to_json_data(Event, JSON, Lang)
2870 -> reply_json_dict(JSON)
2871 ; assertion(event_term_to_json_data(Event, _, Lang))
2872 ).
2873output_result_2(Lang, _Event, _) :- 2874 domain_error(pengine_format, Lang).
2875
2883
2884:- public portray_blob/2. 2885portray_blob(Blob, _Options) :-
2886 blob(Blob, Type),
2887 writeq('$BLOB'(Type)).
2888
2893
2894abort_pending_output(Pengine) :-
2895 forall(pengine_replying(Pengine, Thread),
2896 abort_output_thread(Thread)).
2897
2898abort_output_thread(Thread) :-
2899 catch(thread_signal(Thread, throw(pengine_abort_output)),
2900 error(existence_error(thread, _), _),
2901 true).
2902
2910
2916
2917disable_client_cache :-
2918 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
2919 Pragma: no-cache\r\n\c
2920 Expires: 0\r\n').
2921
2922event_term_to_json_data(Events, JSON, Lang) :-
2923 is_list(Events),
2924 !,
2925 events_to_json_data(Events, JSON, Lang).
2926event_term_to_json_data(Event, JSON, Lang) :-
2927 event_to_json(Event, JSON, Lang),
2928 !.
2929event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
2930 json{event:success, id:ID, time:Time,
2931 data:Bindings, more:More, projection:Projection},
2932 json) :-
2933 !,
2934 term_to_json(Bindings0, Bindings).
2935event_term_to_json_data(destroy(ID, Event),
2936 json{event:destroy, id:ID, data:JSON},
2937 Style) :-
2938 !,
2939 event_term_to_json_data(Event, JSON, Style).
2940event_term_to_json_data(create(ID, Features0), JSON, Style) :-
2941 !,
2942 ( select(answer(First0), Features0, Features1)
2943 -> event_term_to_json_data(First0, First, Style),
2944 Features = [answer(First)|Features1]
2945 ; Features = Features0
2946 ),
2947 dict_create(JSON, json, [event(create), id(ID)|Features]).
2948event_term_to_json_data(destroy(ID, Event),
2949 json{event:destroy, id:ID, data:JSON}, Style) :-
2950 !,
2951 event_term_to_json_data(Event, JSON, Style).
2952event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
2953 !,
2954 Error0 = json{event:error, id:ID, data:Message},
2955 add_error_details(ErrorTerm, Error0, Error),
2956 message_to_string(ErrorTerm, Message).
2957event_term_to_json_data(failure(ID, Time),
2958 json{event:failure, id:ID, time:Time}, _) :-
2959 !.
2960event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
2961 functor(EventTerm, F, 1),
2962 !,
2963 arg(1, EventTerm, ID).
2964event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
2965 functor(EventTerm, F, 2),
2966 arg(1, EventTerm, ID),
2967 arg(2, EventTerm, Data),
2968 term_to_json(Data, JSON).
2969
2970events_to_json_data([], [], _).
2971events_to_json_data([E|T0], [J|T], Lang) :-
2972 event_term_to_json_data(E, J, Lang),
2973 events_to_json_data(T0, T, Lang).
2974
2975:- public add_error_details/3. 2976
2981
2982add_error_details(Error, JSON0, JSON) :-
2983 add_error_code(Error, JSON0, JSON1),
2984 add_error_location(Error, JSON1, JSON).
2985
2996
2997add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
2998 atom(Type),
2999 !,
3000 to_atomic(Obj, Value),
3001 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
3002add_error_code(error(Formal, _), Error0, Error) :-
3003 callable(Formal),
3004 !,
3005 functor(Formal, Code, _),
3006 Error = Error0.put(code, Code).
3007add_error_code(_, Error, Error).
3008
3010to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj.
3011to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
3012to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
3013to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
3014
3015
3021
3022add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
3023 atom(Path), integer(Line),
3024 !,
3025 Term = Term0.put(_{location:_{file:Path, line:Line}}).
3026add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
3027 atom(Path), integer(Line), integer(Ch),
3028 !,
3029 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
3030add_error_location(_, Term, Term).
3031
3032
3040
3042
3043
3044 3047
3052
3053allowed(Request, Application) :-
3054 setting(Application:allow_from, Allow),
3055 match_peer(Request, Allow),
3056 setting(Application:deny_from, Deny),
3057 \+ match_peer(Request, Deny),
3058 !.
3059allowed(Request, _Application) :-
3060 memberchk(request_uri(Here), Request),
3061 throw(http_reply(forbidden(Here))).
3062
3063match_peer(_, Allowed) :-
3064 memberchk(*, Allowed),
3065 !.
3066match_peer(_, []) :- !, fail.
3067match_peer(Request, Allowed) :-
3068 http_peer(Request, Peer),
3069 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
3070 ( memberchk(Peer, Allowed)
3071 -> true
3072 ; member(Pattern, Allowed),
3073 match_peer_pattern(Pattern, Peer)
3074 ).
3075
3076match_peer_pattern(Pattern, Peer) :-
3077 ip_term(Pattern, IP),
3078 ip_term(Peer, IP),
3079 !.
3080
3081ip_term(Peer, Pattern) :-
3082 split_string(Peer, ".", "", PartStrings),
3083 ip_pattern(PartStrings, Pattern).
3084
3085ip_pattern([], []).
3086ip_pattern([*], _) :- !.
3087ip_pattern([S|T0], [N|T]) :-
3088 number_string(N, S),
3089 ip_pattern(T0, T).
3090
3091
3096
3097authenticate(Request, Application, UserOptions) :-
3098 authentication_hook(Request, Application, User),
3099 !,
3100 must_be(ground, User),
3101 UserOptions = [user(User)].
3102authenticate(_, _, []).
3103
3123
3124pengine_register_user(Options) :-
3125 option(user(User), Options),
3126 !,
3127 pengine_self(Me),
3128 asserta(pengine_user(Me, User)).
3129pengine_register_user(_).
3130
3131
3139
3140pengine_user(User) :-
3141 pengine_self(Me),
3142 pengine_user(Me, User).
3143
3147
3148reply_options(Request, Allowed) :-
3149 option(method(options), Request),
3150 !,
3151 cors_enable(Request,
3152 [ methods(Allowed)
3153 ]),
3154 format('Content-type: text/plain\r\n'),
3155 format('~n'). 3156
3157
3158 3161
3168
3169pengine_src_text(Src, Module) :-
3170 pengine_self(Self),
3171 format(atom(ID), 'pengine://~w/src', [Self]),
3172 extra_load_options(Self, Options),
3173 setup_call_cleanup(
3174 open_chars_stream(Src, Stream),
3175 load_files(Module:ID,
3176 [ stream(Stream),
3177 module(Module),
3178 silent(true)
3179 | Options
3180 ]),
3181 close(Stream)),
3182 keep_source(Self, ID, Src).
3183
3184system:'#file'(File, _Line) :-
3185 prolog_load_context(stream, Stream),
3186 set_stream(Stream, file_name(File)),
3187 set_stream(Stream, record_position(false)),
3188 set_stream(Stream, record_position(true)).
3189
3197
3198pengine_src_url(URL, Module) :-
3199 pengine_self(Self),
3200 uri_encoded(path, URL, Path),
3201 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
3202 extra_load_options(Self, Options),
3203 ( get_pengine_application(Self, Application),
3204 setting(Application:debug_info, false)
3205 -> setup_call_cleanup(
3206 http_open(URL, Stream, []),
3207 ( set_stream(Stream, encoding(utf8)),
3208 load_files(Module:ID,
3209 [ stream(Stream),
3210 module(Module)
3211 | Options
3212 ])
3213 ),
3214 close(Stream))
3215 ; setup_call_cleanup(
3216 http_open(URL, TempStream, []),
3217 ( set_stream(TempStream, encoding(utf8)),
3218 read_string(TempStream, _, Src)
3219 ),
3220 close(TempStream)),
3221 setup_call_cleanup(
3222 open_chars_stream(Src, Stream),
3223 load_files(Module:ID,
3224 [ stream(Stream),
3225 module(Module)
3226 | Options
3227 ]),
3228 close(Stream)),
3229 keep_source(Self, ID, Src)
3230 ).
3231
3232
(Pengine, Options) :-
3234 pengine_not_sandboxed(Pengine),
3235 !,
3236 Options = [].
3237extra_load_options(_, [sandboxed(true)]).
3238
3239
3240keep_source(Pengine, ID, SrcText) :-
3241 get_pengine_application(Pengine, Application),
3242 setting(Application:debug_info, true),
3243 !,
3244 to_string(SrcText, SrcString),
3245 assertz(pengine_data(Pengine, source(ID, SrcString))).
3246keep_source(_, _, _).
3247
3248to_string(String, String) :-
3249 string(String),
3250 !.
3251to_string(Atom, String) :-
3252 atom_string(Atom, String),
3253 !.
3254
3255 3258
3259:- multifile
3260 sandbox:safe_primitive/1. 3261
3262sandbox:safe_primitive(pengines:pengine_input(_, _)).
3263sandbox:safe_primitive(pengines:pengine_output(_)).
3264sandbox:safe_primitive(pengines:pengine_debug(_,_)).
3265
3266
3267 3270
3271prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
3272 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
3273 'This is normally caused by an insufficiently instantiated'-[], nl,
3274 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
3275 'find all possible instantations of Var.'-[]
3276 ]