1:- module(concurrency, [concurrent_or/1, concurrent_or/3]). 2
3:- meta_predicate concurrent_or(-,:,+). 4:- meta_predicate concurrent_or(:).
11concurrent_or(Goals) :-
12 term_variables(Goals,Vars),
13 concurrent_or(Vars,Goals,[]).
31concurrent_or(Vars, M:List, Options) :-
32 select_option(on_error(OnError),Options,Opts1,stop),
33 select_option(queue_factor(K),Opts1,Opts2,1),
34 length(List, JobCount),
35 QueueSize is K*JobCount,
36 message_queue_create(Done,[max_size(QueueSize)]),
37 setup_call_cleanup(
38 maplist(create_worker(M,Vars,Done,Opts2),List,Solvers),
39 wait_for_one(JobCount, Done, Vars, OnError),
40 ( debug(concurrency,'Sending kill signal to workers',[]),
41 maplist(kill_thread,Solvers), drain(Done),
42 debug(concurrency,'Waiting for workers to die.',[]),
43 maplist(thread_join,Solvers,_),
44 message_queue_destroy(Done)
45 )
46 ).
47
48drain(Q) :- thread_get_message(Q,_,[timeout(0)]) -> drain(Q); true.
49kill_thread(Id) :- catch(thread_signal(Id,throw(abort)),_,true).
50create_worker(M,V,Q,O,H,Id) :- thread_create(worker(M:H,V,Q),Id,O).
51
52wait_for_one(N, Q, X, OnError) :-
53 succ(N1,N),
54 thread_get_message(Q, Msg),
55 ( Msg=success(_,Var) -> (X=Var; wait_for_one(N,Q,X,OnError))
56 ; Msg=failed(_) -> wait_for_one(N1,Q,X,OnError)
57 ; Msg=error(_,E) -> ( OnError=stop -> throw(error(E))
58 ; print_message(error,E),
59 wait_for_one(N1,Q,X,OnError)
60 )
61 ).
62
63worker(Goal,Var,Q) :-
64 thread_self(Me),
65 debug(concurrency,'Worker started on ~q.',[Goal]),
66 ( catch( Goal,E, (thread_send_message(Q,error(Me,E)), throw(error))),
67 thread_send_message(Q,success(Me,Var)), fail
68 ; thread_send_message(Q,failed(Me)),
69 debug(concurrency,'Worker finished normally.',[])
70 )