1:- module(jolog_manager, [ manager_create/2
2 , manager_destroy/1
3 , manager_loop/1
4 ]). 5
6manager_loop(Module) :-
7 manager_loop(Module, 0).
8
9manager_loop(Module, Outstanding) :-
10 iterate_patterns(Module, Outstanding).
11
12
14manager_create(Module,StartupMessage) :-
15 message_queue_create(ManagerQueue),
16 jolog:set_meta(Module, manager_queue, ManagerQueue),
17
18 message_queue_create(WorkQueue),
19 jolog:set_meta(Module, work_queue, WorkQueue),
20
21 current_prolog_flag(cpu_count,CoreCount),
22 WorkerCount is 2*CoreCount,
23 jolog:set_meta(Module, worker_count, WorkerCount),
24 length(Workers,WorkerCount),
25 debug(jolog,"Jolog: starting ~d worker threads.",[WorkerCount]),
26 maplist(jolog:create_worker(Module,WorkQueue),Workers),
27 jolog:set_meta(Module, workers, Workers),
28
29 Module:send(StartupMessage).
30
31
33manager_destroy(Module) :-
34 jolog:meta(Module,workers,Workers),
35 maplist(destroy_worker,Workers),
36
37 jolog:meta(Module,work_queue,WorkQueue),
38 message_queue_destroy(WorkQueue),
39
40 jolog:meta(Module,manager_queue,ManagerQueue),
41 message_queue_destroy(ManagerQueue),
42
43 44 retractall(jolog:meta(Module,_,_)),
45 retractall(jolog:channels(_,_)).
46
47destroy_worker(ThreadId) :-
48 debug(jolog,'Signaling worker ~d to stop',[ThreadId]),
49 thread_signal(ThreadId,thread_exit(halt)).
50
51
53iterate_patterns(Module, Outstanding) :-
54 debug(jolog, '~w', [manager(iterate_patterns,Outstanding)]),
55 Module:'$jolog_code',
56 !,
57 iterate_patterns(Module, Outstanding).
58iterate_patterns(Module, Outstanding) :-
59 iterate_events(Module, Outstanding).
60
61
63iterate_events(Module, Outstanding) :-
64 take_event_no_block(Module,Event),
65 debug(jolog,'~w',[manager(event, Event)]),
66 handle_event(Event, Module, Outstanding).
67
68handle_event(send_message(Msg), Module, Outstanding) :-
69 debug(jolog,"Sending message: ~w",[Msg]),
70 jolog:assert(channels(Module,Msg)),
71 iterate_patterns(Module, Outstanding). 72handle_event(active(N), Module, Outstanding0) :-
73 Outstanding is Outstanding0 + N,
74 debug(jolog,"Outstanding workers: ~d -> ~d",[Outstanding0,Outstanding]),
75 iterate_events(Module, Outstanding).
76handle_event(none, Module, Outstanding) :-
77 ( Outstanding > 0 -> 78 debug(jolog, 'manager blocking', []),
79 take_event_block(Module,Event),
80 handle_event(Event, Module, Outstanding)
81 ; true -> 82 debug(jolog,"No events, no outstanding workers: sending halt",[]),
83 handle_event(send_message(halt), Module, Outstanding)
84 ).
85handle_event(halt, _, _). 86
87
91take_event_block(Module,Event) :-
92 jolog:meta(Module,manager_queue,ManagerQueue),
93 thread_get_message(ManagerQueue, Event).
94
95
98take_event_no_block(Module,Event) :-
99 jolog:meta(Module,manager_queue,ManagerQueue),
100 ( thread_get_message(ManagerQueue, Message, [timeout(0)]) ->
101 Event = Message
102 ; 103 Event = none
104 )