1:- module(concurrency, [concurrent_or/1, concurrent_or/3]).    2
    3:- meta_predicate concurrent_or(-,:,+).    4:- meta_predicate concurrent_or(:).
 concurrent_or(+Goals:list(callable)) is nondet
Simple interface to concurrent_or/3. Equivalent to concurrent_or(Vars,Goals,[]) where Vars is a list of all the variables in Goals.
   11concurrent_or(Goals) :-
   12   term_variables(Goals,Vars),
   13   concurrent_or(Vars,Goals,[]).
 concurrent_or(-Vars, +Goals:list(callable), +Options:list(option)) is nondet
Succeeds once for each solution of each goal in Goals, with Vars bound to sharing variables in Goals. Goals are executed in parallel. Valid options are
on_error(OnError:oneof([stop,continue]))
If OnError=stop, then an exception occuring in any goal stops all goals and is propagated back to and then thrown from the main thread. If OnError=continue, then an exception in a goal terminates only that thread, with a error message printed. The default is stop.
queue_factor(K:natural)
Solutions are communicated via a message queue of size K*length(Goals). This limits the extent to which threads compute solutions that have not yet been requested. The default is 1. Any remaining options are passed to thread_create/3.
   31concurrent_or(Vars, M:List, Options) :-
   32   select_option(on_error(OnError),Options,Opts1,stop),
   33   select_option(queue_factor(K),Opts1,Opts2,1),
   34   length(List, JobCount),
   35   QueueSize is K*JobCount,
   36   message_queue_create(Done,[max_size(QueueSize)]),
   37   setup_call_cleanup(
   38      maplist(create_worker(M,Vars,Done,Opts2),List,Solvers),
   39      wait_for_one(JobCount, Done, Vars, OnError),
   40      (  debug(concurrency,'Sending kill signal to workers',[]),
   41         maplist(kill_thread,Solvers), drain(Done),
   42         debug(concurrency,'Waiting for workers to die.',[]),
   43         maplist(thread_join,Solvers,_),
   44         message_queue_destroy(Done)
   45      )
   46   ).
   47
   48drain(Q) :- thread_get_message(Q,_,[timeout(0)]) -> drain(Q); true.
   49kill_thread(Id) :- catch(thread_signal(Id,throw(abort)),_,true).
   50create_worker(M,V,Q,O,H,Id) :- thread_create(worker(M:H,V,Q),Id,O).
   51
   52wait_for_one(N, Q, X, OnError) :-
   53   succ(N1,N),
   54   thread_get_message(Q, Msg),
   55   (  Msg=success(_,Var) -> (X=Var; wait_for_one(N,Q,X,OnError))
   56   ;  Msg=failed(_)      -> wait_for_one(N1,Q,X,OnError)
   57   ;  Msg=error(_,E)     -> ( OnError=stop -> throw(error(E))
   58                            ; print_message(error,E),
   59                              wait_for_one(N1,Q,X,OnError)
   60                            )
   61   ).
   62
   63worker(Goal,Var,Q) :-
   64   thread_self(Me),
   65   debug(concurrency,'Worker started on ~q.',[Goal]),
   66   (  catch( Goal,E, (thread_send_message(Q,error(Me,E)), throw(error))),
   67      thread_send_message(Q,success(Me,Var)), fail
   68   ;  thread_send_message(Q,failed(Me)),
   69      debug(concurrency,'Worker finished normally.',[])
   70   )