View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2007-2020, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread,
   38          [ concurrent/3,               % +Threads, :Goals, +Options
   39            concurrent_maplist/2,       % :Goal, +List
   40            concurrent_maplist/3,       % :Goal, ?List1, ?List2
   41            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
   42            concurrent_forall/2,        % :Generate, :Test
   43            concurrent_forall/3,        % :Generate, :Test, +Options
   44            concurrent_and/2,           % :Generator,:Test
   45            concurrent_and/3,           % :Generator,:Test,+Options
   46            first_solution/3,           % -Var, :Goals, +Options
   47
   48            call_in_thread/2            % +Thread, :Goal
   49          ]).   50:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]).   51:- autoload(library(error), [must_be/2, instantiation_error/1]).   52:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]).   53:- autoload(library(option), [option/2, option/3]).   54:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]).   55:- use_module(library(debug), [debug/3, assertion/1]).   56
   57%:- debug(concurrent).
   58
   59:- meta_predicate
   60    concurrent(+, :, +),
   61    concurrent_maplist(1, +),
   62    concurrent_maplist(2, ?, ?),
   63    concurrent_maplist(3, ?, ?, ?),
   64    concurrent_forall(0, 0),
   65    concurrent_forall(0, 0, +),
   66    concurrent_and(0, 0),
   67    concurrent_and(0, 0, +),
   68    first_solution(-, :, +),
   69    call_in_thread(+, 0).   70
   71
   72:- predicate_options(concurrent/3, 3,
   73                     [ pass_to(system:thread_create/3, 3)
   74                     ]).   75:- predicate_options(concurrent_forall/3, 3,
   76                     [ threads(nonneg)
   77                     ]).   78:- predicate_options(concurrent_and/3, 3,
   79                     [ threads(nonneg)
   80                     ]).   81:- predicate_options(first_solution/3, 3,
   82                     [ on_fail(oneof([stop,continue])),
   83                       on_error(oneof([stop,continue])),
   84                       pass_to(system:thread_create/3, 3)
   85                     ]).

High level thread primitives

This module defines simple to use predicates for running goals concurrently. Where the core multi-threaded API is targeted at communicating long-living threads, the predicates here are defined to run goals concurrently without having to deal with thread creation and maintenance explicitely.

Note that these predicates run goals concurrently and therefore these goals need to be thread-safe. As the predicates in this module also abort branches of the computation that are no longer needed, predicates that have side-effect must act properly. In a nutshell, this has the following consequences:

author
- Jan Wielemaker */
 concurrent(+N, :Goals, +Options) is semidet
Run Goals in parallel using N threads. This call blocks until all work has been done. The Goals must be independent. They should not communicate using shared variables or any form of global data. All Goals must be thread-safe.

Execution succeeds if all goals have succeeded. If one goal fails or throws an exception, other workers are abandoned as soon as possible and the entire computation fails or re-throws the exception. Note that if multiple goals fail or raise an error it is not defined which error or failure is reported.

On successful completion, variable bindings are returned. Note however that threads have independent stacks and therefore the goal is copied to the worker thread and the result is copied back to the caller of concurrent/3.

Choosing the right number of threads is not always obvious. Here are some scenarios:

Arguments:
N- Number of worker-threads to create. Using 1, no threads are created. If N is larger than the number of Goals we create exactly as many threads as there are Goals.
Goals- List of callable terms.
Options- Passed to thread_create/3 for creating the workers. Only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options.
See also
- In many cases, concurrent_maplist/2 and friends is easier to program and is tractable to program analysis.
  164concurrent(1, M:List, _) :-
  165    !,
  166    maplist(once_in_module(M), List).
  167concurrent(N, M:List, Options) :-
  168    must_be(positive_integer, N),
  169    must_be(list(callable), List),
  170    length(List, JobCount),
  171    message_queue_create(Done),
  172    message_queue_create(Queue),
  173    WorkerCount is min(N, JobCount),
  174    create_workers(WorkerCount, Queue, Done, Workers, Options),
  175    submit_goals(List, 1, M, Queue, VarList),
  176    forall(between(1, WorkerCount, _),
  177           thread_send_message(Queue, done)),
  178    VT =.. [vars|VarList],
  179    concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
  180                Result, [], Exitted),
  181    subtract(Workers, Exitted, RemainingWorkers),
  182    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
  183    (   Result == true
  184    ->  true
  185    ;   Result = false
  186    ->  fail
  187    ;   Result = exception(Error)
  188    ->  throw(Error)
  189    ).
  190
  191once_in_module(M, Goal) :-
  192    call(M:Goal), !.
 submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det
Send all jobs from List to Queue. Each goal is added to Queue as a term goal(Id, Goal, Vars). Vars is unified with a list of lists of free variables appearing in each goal.
  200submit_goals([], _, _, _, []).
  201submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
  202    term_variables(H, Vars),
  203    thread_send_message(Queue, goal(I, M:H, Vars)),
  204    I2 is I + 1,
  205    submit_goals(T, I2, M, Queue, VT).
 concur_wait(+N, +Done:queue, +VT:compound, +Cleanup, -Result, +Exitted0, -Exitted) is semidet
Wait for completion, failure or error.
Arguments:
Exited- List of thread-ids with threads that completed before all work was done.
  216concur_wait(0, _, _, _, true, Exited, Exited) :- !.
  217concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
  218    debug(concurrent, 'Concurrent: waiting for workers ...', []),
  219    catch(thread_get_message(Done, Exit), Error,
  220          concur_abort(Error, Cleanup, Done, Exitted0)),
  221    debug(concurrent, 'Waiting: received ~p', [Exit]),
  222    (   Exit = done(Id, Vars)
  223    ->  debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
  224        arg(Id, VT, Vars),
  225        N2 is N - 1,
  226        concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
  227    ;   Exit = finished(Thread)
  228    ->  thread_join(Thread, JoinStatus),
  229        debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
  230              [Thread, JoinStatus]),
  231        (   JoinStatus == true
  232        ->  concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
  233        ;   Status = JoinStatus,
  234            Exitted = [Thread|Exitted0]
  235        )
  236    ).
  237
  238concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
  239    debug(concurrent, 'Concurrent: got ~p', [Error]),
  240    subtract(Workers, Exitted, RemainingWorkers),
  241    concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
  242    throw(Error).
  243
  244create_workers(N, Queue, Done, [Id|Ids], Options) :-
  245    N > 0,
  246    !,
  247    thread_create(worker(Queue, Done), Id,
  248                  [ at_exit(thread_send_message(Done, finished(Id)))
  249                  | Options
  250                  ]),
  251    N2 is N - 1,
  252    create_workers(N2, Queue, Done, Ids, Options).
  253create_workers(_, _, _, [], _).
 worker(+WorkQueue, +DoneQueue) is det
Process jobs from WorkQueue and send the results to DoneQueue.
  260worker(Queue, Done) :-
  261    thread_get_message(Queue, Message),
  262    debug(concurrent, 'Worker: received ~p', [Message]),
  263    (   Message = goal(Id, Goal, Vars)
  264    ->  (   Goal
  265        ->  thread_send_message(Done, done(Id, Vars)),
  266            worker(Queue, Done)
  267        )
  268    ;   true
  269    ).
 concur_cleanup(+Result, +Workers:list, +Queues:list) is det
Cleanup the concurrent workers and message queues. If Result is not true, signal all workers to make them stop prematurely. If result is true we assume all workers have been instructed to stop or have stopped themselves.
  279concur_cleanup(Result, Workers, Queues) :-
  280    !,
  281    (   Result == true
  282    ->  true
  283    ;   kill_workers(Workers)
  284    ),
  285    join_all(Workers),
  286    maplist(message_queue_destroy, Queues).
  287
  288kill_workers([]).
  289kill_workers([Id|T]) :-
  290    debug(concurrent, 'Signalling ~w', [Id]),
  291    catch(thread_signal(Id, abort), _, true),
  292    kill_workers(T).
  293
  294join_all([]).
  295join_all([Id|T]) :-
  296    thread_join(Id, _),
  297    join_all(T).
  298
  299
  300		 /*******************************
  301		 *             FORALL		*
  302		 *******************************/
 concurrent_forall(:Generate, :Action) is semidet
 concurrent_forall(:Generate, :Action, +Options) is semidet
True when Action is true for all solutions of Generate. This has the same semantics as forall/2, but the Action goals are executed in multiple threads. Notable a failing Action or a Action throwing an exception signals the calling thread which in turn aborts all workers and fails or re-throws the generated error. Options:
threads(+Count)
Number of threads to use. The default is determined by the Prolog flag cpu_count.
To be done
- Ideally we would grow the set of workers dynamically, similar to dynamic scheduling of HTTP worker threads. This would avoid creating threads that are never used if Generate is too slow or does not provide enough answers and would further raise the number of threads if Action is I/O bound rather than CPU bound.
  323:- dynamic
  324    fa_aborted/1.  325
  326concurrent_forall(Generate, Test) :-
  327    concurrent_forall(Generate, Test, []).
  328
  329concurrent_forall(Generate, Test, Options) :-
  330    jobs(Jobs, Options),
  331    Jobs > 1,
  332    !,
  333    term_variables(Generate, GVars),
  334    term_variables(Test, TVars),
  335    sort(GVars, GVarsS),
  336    sort(TVars, TVarsS),
  337    ord_intersection(GVarsS, TVarsS, Shared),
  338    Templ =.. [v|Shared],
  339    MaxSize is Jobs*4,
  340    message_queue_create(Q, [max_size(MaxSize)]),
  341    length(Workers, Jobs),
  342    thread_self(Me),
  343    maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
  344    catch(( forall(Generate,
  345                   thread_send_message(Q, job(Templ))),
  346            forall(between(1, Jobs, _),
  347                   thread_send_message(Q, done)),
  348            maplist(thread_join, Workers),
  349            message_queue_destroy(Q)
  350          ),
  351          Error,
  352          fa_cleanup(Error, Workers, Q)).
  353concurrent_forall(Generate, Test, _) :-
  354    forall(Generate, Test).
  355
  356fa_cleanup(Error, Workers, Q) :-
  357    maplist(safe_abort, Workers),
  358    debug(concurrent(fail), 'Joining workers', []),
  359    maplist(safe_join, Workers),
  360    debug(concurrent(fail), 'Destroying queue', []),
  361    retractall(fa_aborted(Q)),
  362    message_queue_destroy(Q),
  363    (   Error = fa_worker_failed(_0Test, Why)
  364    ->  debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]),
  365        (   Why == false
  366        ->  fail
  367        ;   Why = error(E)
  368        ->  throw(E)
  369        ;   assertion(fail)
  370        )
  371    ;   throw(Error)
  372    ).
  373
  374fa_worker(Queue, Main, Templ, Test) :-
  375    repeat,
  376    thread_get_message(Queue, Msg),
  377    (   Msg == done
  378    ->  !
  379    ;   Msg = job(Templ),
  380        debug(concurrent, 'Running test ~p', [Test]),
  381        (   catch_with_backtrace(Test, E, true)
  382        ->  (   var(E)
  383            ->  fail
  384            ;   fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
  385            )
  386        ;   !,
  387            fa_stop(Queue, Main, fa_worker_failed(Test, false))
  388        )
  389    ).
  390
  391fa_stop(Queue, Main, Why) :-
  392    with_mutex('$concurrent_forall',
  393               fa_stop_sync(Queue, Main, Why)).
  394
  395fa_stop_sync(Queue, _Main, _Why) :-
  396    fa_aborted(Queue),
  397    !.
  398fa_stop_sync(Queue, Main, Why) :-
  399    asserta(fa_aborted(Queue)),
  400    debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
  401    thread_signal(Main, throw(Why)).
  402
  403jobs(Jobs, Options) :-
  404    (   option(threads(Jobs), Options)
  405    ->  true
  406    ;   current_prolog_flag(cpu_count, Jobs)
  407    ->  true
  408    ;   Jobs = 1
  409    ).
  410
  411safe_abort(Thread) :-
  412    catch(thread_signal(Thread, abort), error(_,_), true).
  413safe_join(Thread) :-
  414    E = error(_,_),
  415    catch(thread_join(Thread, _Status), E, true).
  416
  417
  418		 /*******************************
  419		 *              AND		*
  420		 *******************************/
 concurrent_and(:Generator, :Test)
 concurrent_and(:Generator, :Test, +Options)
Concurrent version of (Generator,Test). This predicate creates a thread providing solutions for Generator that are handed to a pool of threads that run Test for the different instantiations provided by Generator concurrently. The predicate is logically equivalent to a simple conjunction except for two aspects: (1) terms are copied from Generator to the test Test threads while answers are copied back to the calling thread and (2) answers may be produced out of order.

If the evaluation of some Test raises an exception, concurrent_and/2,3 is terminated with this exception. If the caller commits after a given answer or raises an exception while concurrent_and/2,3 is active with pending choice points, all involved resources are reclaimed.

Options:

threads(+Count)
Create a worker pool holding Count threads. The default is the Prolog flag cpu_count.

This predicate was proposed by Jan Burse as balance((Generator,Test)).

  449concurrent_and(Gen, Test) :-
  450    concurrent_and(Gen, Test, []).
  451
  452concurrent_and(Gen, Test, Options) :-
  453    jobs(Jobs, Options),
  454    MaxSize is Jobs*4,
  455    message_queue_create(JobQueue, [max_size(MaxSize)]),
  456    message_queue_create(AnswerQueue, [max_size(MaxSize)]),
  457    ca_template(Gen, Test, Templ),
  458    term_variables(Gen+Test, AllVars),
  459    ReplyTempl =.. [v|AllVars],
  460    length(Workers, Jobs),
  461    Alive is 1<<Jobs-1,
  462    maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
  463                                    Templ, Test, ReplyTempl)),
  464            Workers),
  465    thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
  466                  GenThread),
  467    State = state(Alive),
  468    call_cleanup(
  469        ca_gather(State, AnswerQueue, ReplyTempl, Workers),
  470        ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
  471
  472ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
  473    repeat,
  474       thread_get_message(AnswerQueue, Msg),
  475       (   Msg = true(ReplyTempl)
  476       ->  true
  477       ;   Msg = done(Worker)
  478       ->  nth0(Done, Workers, Worker),
  479           arg(1, State, Alive0),
  480           Alive1 is Alive0 /\ \(1<<Done),
  481           debug(concurrent(and), 'Alive = ~2r', [Alive1]),
  482           (   Alive1 =:= 0
  483           ->  !,
  484               fail
  485           ;   nb_setarg(1, State, Alive1),
  486               fail
  487           )
  488       ;   Msg = error(E)
  489       ->  throw(E)
  490       ).
  491
  492ca_template(Gen, Test, Templ) :-
  493    term_variables(Gen,  GVars),
  494    term_variables(Test, TVars),
  495    sort(GVars, GVarsS),
  496    sort(TVars, TVarsS),
  497    ord_intersection(GVarsS, TVarsS, Shared),
  498    ord_union(GVarsS, Shared, TemplVars),
  499    Templ =.. [v|TemplVars].
  500
  501ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
  502    thread_self(Me),
  503    EG = error(existence_error(message_queue, _), _),
  504    repeat,
  505    catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
  506    (   Req = job(Templ)
  507    ->  (   catch(Test, E, true),
  508            (   var(E)
  509            ->  thread_send_message(AnswerQueue, true(ReplyTempl))
  510            ;   thread_send_message(AnswerQueue, error(E))
  511            ),
  512            fail
  513        )
  514    ;   Req == done
  515    ->  !,
  516        message_queue_destroy(JobQueue),
  517        thread_send_message(AnswerQueue, done(Me))
  518    ;   assertion(Req == all_done)
  519    ->  !,
  520        thread_send_message(AnswerQueue, done(Me))
  521    ).
  522
  523ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
  524    (   catch(Gen, E, true),
  525        (   var(E)
  526        ->  thread_send_message(JobQueue, job(Templ))
  527        ;   thread_send_message(AnswerQueue, error(E))
  528        ),
  529        fail
  530    ;   thread_send_message(JobQueue, done)
  531    ).
  532
  533ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
  534    safe_abort(GenThread),
  535    safe_join(GenThread),
  536    maplist(safe_abort, Workers),
  537    maplist(safe_join, Workers),
  538    message_queue_destroy(AnswerQueue),
  539    catch(message_queue_destroy(JobQueue), error(_,_), true).
  540
  541
  542                 /*******************************
  543                 *             MAPLIST          *
  544                 *******************************/
 concurrent_maplist(:Goal, +List) is semidet
 concurrent_maplist(:Goal, +List1, +List2) is semidet
 concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet
Concurrent version of maplist/2. This predicate uses concurrent/3, using multiple worker threads. The number of threads is the minimum of the list length and the number of cores available. The number of cores is determined using the prolog flag cpu_count. If this flag is absent or 1 or List has less than two elements, this predicate calls the corresponding maplist/N version using a wrapper based on once/1. Note that all goals are executed as if wrapped in once/1 and therefore these predicates are semidet.

Note that the the overhead of this predicate is considerable and therefore Goal must be fairly expensive before one reaches a speedup.

  563concurrent_maplist(Goal, List) :-
  564    workers(List, WorkerCount),
  565    !,
  566    maplist(ml_goal(Goal), List, Goals),
  567    concurrent(WorkerCount, Goals, []).
  568concurrent_maplist(M:Goal, List) :-
  569    maplist(once_in_module(M, Goal), List).
  570
  571once_in_module(M, Goal, Arg) :-
  572    call(M:Goal, Arg), !.
  573
  574ml_goal(Goal, Elem, call(Goal, Elem)).
  575
  576concurrent_maplist(Goal, List1, List2) :-
  577    same_length(List1, List2),
  578    workers(List1, WorkerCount),
  579    !,
  580    maplist(ml_goal(Goal), List1, List2, Goals),
  581    concurrent(WorkerCount, Goals, []).
  582concurrent_maplist(M:Goal, List1, List2) :-
  583    maplist(once_in_module(M, Goal), List1, List2).
  584
  585once_in_module(M, Goal, Arg1, Arg2) :-
  586    call(M:Goal, Arg1, Arg2), !.
  587
  588ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
  589
  590concurrent_maplist(Goal, List1, List2, List3) :-
  591    same_length(List1, List2, List3),
  592    workers(List1, WorkerCount),
  593    !,
  594    maplist(ml_goal(Goal), List1, List2, List3, Goals),
  595    concurrent(WorkerCount, Goals, []).
  596concurrent_maplist(M:Goal, List1, List2, List3) :-
  597    maplist(once_in_module(M, Goal), List1, List2, List3).
  598
  599once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
  600    call(M:Goal, Arg1, Arg2, Arg3), !.
  601
  602ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
  603
  604workers(List, Count) :-
  605    current_prolog_flag(cpu_count, Cores),
  606    Cores > 1,
  607    length(List, Len),
  608    Count is min(Cores,Len),
  609    Count > 1,
  610    !.
  611
  612same_length([], [], []).
  613same_length([_|T1], [_|T2], [_|T3]) :-
  614    same_length(T1, T2, T3).
  615
  616
  617                 /*******************************
  618                 *             FIRST            *
  619                 *******************************/
 first_solution(-X, :Goals, +Options) is semidet
Try alternative solvers concurrently, returning the first answer. In a typical scenario, solving any of the goals in Goals is satisfactory for the application to continue. As soon as one of the tried alternatives is successful, all the others are killed and first_solution/3 succeeds.

For example, if it is unclear whether it is better to search a graph breadth-first or depth-first we can use:

search_graph(Grap, Path) :-
         first_solution(Path, [ breadth_first(Graph, Path),
                                depth_first(Graph, Path)
                              ],
                        []).

Options include thread stack-sizes passed to thread_create, as well as the options on_fail and on_error that specify what to do if a solver fails or triggers an error. By default execution of all solvers is terminated and the result is returned. Sometimes one may wish to continue. One such scenario is if one of the solvers may run out of resources or one of the solvers is known to be incomplete.

on_fail(Action)
If stop (default), terminate all threads and stop with the failure. If continue, keep waiting.
on_error(Action)
As above, re-throwing the error if an error appears.
bug
- first_solution/3 cannot deal with non-determinism. There is no obvious way to fit non-determinism into it. If multiple solutions are needed wrap the solvers in findall/3.
  659first_solution(X, M:List, Options) :-
  660    message_queue_create(Done),
  661    thread_options(Options, ThreadOptions, RestOptions),
  662    length(List, JobCount),
  663    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
  664    wait_for_one(JobCount, Done, Result, RestOptions),
  665    concur_cleanup(kill, Solvers, [Done]),
  666    (   Result = done(_, Var)
  667    ->  X = Var
  668    ;   Result = error(_, Error)
  669    ->  throw(Error)
  670    ).
  671
  672create_solvers([], _, _, _, [], _).
  673create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
  674    thread_create(solve(M:H, X, Done), Id, Options),
  675    create_solvers(T, M, X, Done, IDs, Options).
  676
  677solve(Goal, Var, Queue) :-
  678    thread_self(Me),
  679    (   catch(Goal, E, true)
  680    ->  (   var(E)
  681        ->  thread_send_message(Queue, done(Me, Var))
  682        ;   thread_send_message(Queue, error(Me, E))
  683        )
  684    ;   thread_send_message(Queue, failed(Me))
  685    ).
  686
  687wait_for_one(0, _, failed, _) :- !.
  688wait_for_one(JobCount, Queue, Result, Options) :-
  689    thread_get_message(Queue, Msg),
  690    LeftCount is JobCount - 1,
  691    (   Msg = done(_, _)
  692    ->  Result = Msg
  693    ;   Msg = failed(_)
  694    ->  (   option(on_fail(stop), Options, stop)
  695        ->  Result = Msg
  696        ;   wait_for_one(LeftCount, Queue, Result, Options)
  697        )
  698    ;   Msg = error(_, _)
  699    ->  (   option(on_error(stop), Options, stop)
  700        ->  Result = Msg
  701        ;   wait_for_one(LeftCount, Queue, Result, Options)
  702        )
  703    ).
 thread_options(+Options, -ThreadOptions, -RestOptions) is det
Split the option list over thread(-size) options and other options.
  711thread_options([], [], []).
  712thread_options([H|T], [H|Th], O) :-
  713    thread_option(H),
  714    !,
  715    thread_options(T, Th, O).
  716thread_options([H|T], Th, [H|O]) :-
  717    thread_options(T, Th, O).
  718
  719thread_option(local(_)).
  720thread_option(global(_)).
  721thread_option(trail(_)).
  722thread_option(argument(_)).
  723thread_option(stack(_)).
 call_in_thread(+Thread, :Goal) is semidet
Run Goal as an interrupt in the context of Thread. This is based on thread_signal/2. If waiting times out, we inject a stop(Reason) exception into Goal. Interrupts can be nested, i.e., it is allowed to run a call_in_thread/2 while the target thread is processing such an interrupt.

This predicate is primarily intended for debugging and inspection tasks.

  737call_in_thread(Thread, Goal) :-
  738    must_be(callable, Goal),
  739    var(Thread),
  740    !,
  741    instantiation_error(Thread).
  742call_in_thread(Thread, Goal) :-
  743    thread_self(Thread),
  744    !,
  745    once(Goal).
  746call_in_thread(Thread, Goal) :-
  747    term_variables(Goal, Vars),
  748    thread_self(Me),
  749    A is random(1 000 000 000),
  750    thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
  751    catch(thread_get_message(in_thread(A,Result)),
  752          Error,
  753          forward_exception(Thread, A, Error)),
  754    (   Result = true(Vars)
  755    ->  true
  756    ;   Result = error(Error)
  757    ->  throw(Error)
  758    ;   fail
  759    ).
  760
  761run_in_thread(Goal, Vars, Id, Sender) :-
  762    (   catch_with_backtrace(call(Goal), Error, true)
  763    ->  (   var(Error)
  764        ->  thread_send_message(Sender, in_thread(Id, true(Vars)))
  765        ;   Error = stop(_)
  766        ->  true
  767        ;   thread_send_message(Sender, in_thread(Id, error(Error)))
  768        )
  769    ;   thread_send_message(Sender, in_thread(Id, false))
  770    ).
  771
  772forward_exception(Thread, Id, Error) :-
  773    kill_with(Error, Kill),
  774    thread_signal(Thread, kill_task(Id, Kill)),
  775    throw(Error).
  776
  777kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
  778    !.
  779kill_with(_, stop(interrupt)).
  780
  781kill_task(Id, Exception) :-
  782    prolog_current_frame(Frame),
  783    prolog_frame_attribute(Frame, parent_goal,
  784                           run_in_thread(_Goal, _Vars, Id, _Sender)),
  785    !,
  786    throw(Exception).
  787kill_task(_, _)