1:- module(spawn, [ async/2
    2                 , async/3
    3                 , await/1
    4                 , lazy/1
    5                 , spawn/1
    6                 , spawn/2
    7                 ]).    8:- use_module(library(predicate_options)).    9:- use_module(library(record)).   10
   11% convenience for async/3 options
   12:- record opts( policy:oneof([ephemeral,lazy])=ephemeral
   13              ).
   14:- predicate_options(spawn/2,2,[pass_to(async/3,3)]).   15:- predicate_options(async/3,3, [ policy(+oneof([ephemeral,lazy]))
   16                                ]).   17
   18:- meta_predicate
   19    spawn(0),
   20    async(0,-),
   21    async(0,-,+),
   22    async_policy(+,0,-,+).   23
   24:- thread_local
   25    spawn_token_needs_await/1.
 spawn(:Goal) is det
Like spawn/2 with default options.
   30spawn(Goal) :-
   31    spawn(Goal, []).
 spawn(:Goal, +Options) is det
Seek solutions to Goal in a background thread. Solutions are communicated to the calling thread by unifying free variables in Goal. If Goal has no free variables, you must use async/3 instead. Options are passed through to async/3.

For example, the following code runs in about 1 second because both sleep/1 calls happen in parallel. When foo/0 unifies L, it blocks until silly/1 has finished.

silly(L) :-
    sleep(1),
    L = [a,b].
foo :-
    spawn(silly(L)),
    sleep(1),
    L=[A,B],  % blocks, if necessary
    writeln(A-B).

If Goal produces multiple solutions, they're iterated when backtracking over the unification (L=[A,B] above). If Goal fails or throws an exception, the calling thread sees it at the unification point.

   58spawn(Goal,Options) :-
   59    term_variables(Goal, Vars),
   60    async(Goal, Token, Options),
   61    Id is random(1<<63),
   62    assert(spawn_token_needs_await(Id)),
   63    make_opts(Options,Opts),
   64    maplist(spawn_freeze(Id,Token,Opts), Vars).
   65
   66spawn_freeze(Id,Token,Opts,Var) :-
   67    freeze(Var,spawn_thaw(Id,Token,Opts)).
   68
   69spawn_thaw(Id,Token,Opts) :-
   70    ( retract(spawn_token_needs_await(Id)) ->
   71        debug(spawn,"Await on ~d",[Id]),
   72        await(Token)
   73    ; opts_policy(Opts,lazy) ->
   74        debug(spawn,"Awaiting again on ~d",[Id]),
   75        await(Token)
   76    ; % already called await/1 ->
   77        debug(spawn,"Already did await on ~d",[Id]),
   78        true
   79    ).
 lazy(Goal) is det
Postpone execution of goal until needed. This is just spawn/1 using the lazy thread policy.

lazy/1 can be helpful when complicated or expensive goals are only needed in some code paths but duplicating those goals is too verbose. It can be an alternative to creating a new, named predicate. For example,

foo(Xs) :-
    lazy(i_am_slow(a,B,[c(C),d(d),e(etc)])), % complicated

    ( day_of_week(tuesday) ->
        append(B,C,Xs)
    ; phase_of_moon(full) ->
        append(C,B,Xs)
    ; true ->
        % i_am_slow/3 not executed in this code path
        Xs = [hi]
    ).
  103lazy(Goal) :-
  104    spawn(Goal,[policy(lazy)]).
 async(:Goal, -Token) is det
Like async/3 with default options.
  110async(Goal,Token) :-
  111    async(Goal,Token,[]).
 async(:Goal, -Token, +Options) is det
Seek solutions to Goal in a background thread. Use await/1 with Token to block until the computation is done. Solutions are communicated to the calling thread by unifying free variables in Goal. Both Goal and its corresponding solutions are copied between threads. Be aware if any of those terms are very large.

Options are as follows:

policy(Policy)
If ephemeral (default), create a new thread in which to call goal. If lazy, only execute Goal when await/1 is called; no background threads are used.
  128async(Goal,Token,Options) :-
  129    make_opts(Options,Opts),
  130    opts_policy(Opts, Policy),
  131    async_policy(Policy, Goal, Token, Opts).
  132
  133
  134async_policy(ephemeral, Goal, Token, _Opts) :-
  135    % what does the caller need to track this computation?
  136    term_variables(Goal, Vars),
  137    message_queue_create(SolutionsQ, [max_size(1)]),
  138    Token = ephemeral_token(Vars,SolutionsQ),
  139
  140    % start the worker thread
  141    Work = work(Goal,Vars,SolutionsQ),
  142    thread_create(ephemeral_worker(Work), _, [detached(true)]).
  143async_policy(lazy,Goal,Token,_Opts) :-
  144    Token = lazy_thunk(Goal).
  145
  146
  147ephemeral_worker(work(Goal,Vars,SolutionsQ)) :-
  148    debug(spawn,"Seeking solutions to: ~q", [Goal]),
  149    ( catch(call_cleanup(Goal,Done=true),E,true) *->
  150        ( nonvar(E) ->
  151            debug(spawn,"Caught exception: ~q", [E]),
  152            thread_send_message(SolutionsQ,exception(E))
  153        ; var(Done) ->
  154            debug(spawn,"Sending solution: ~q", [Vars]),
  155            thread_send_message(SolutionsQ,solution(Vars)),
  156            fail  % look for another solution
  157        ; Done=true ->
  158            debug(spawn,"Final solution: ~q", [Vars]),
  159            thread_send_message(SolutionsQ,final(Vars))
  160        )
  161    ; % no solutions ->
  162        debug(spawn, "Found no solutions", []),
  163        thread_send_message(SolutionsQ,none)
  164    ).
 await(+Token)
Wait for solutions from an async/3 call. Token is an opaque value provided by async/3 which identifies a background computation.

await/1 strives to have the same determinism as the original Goal passed to async/3. If that goal fails, await/1 fails. If that goal throws an exception, so does await/1. If that goal produces many solutions, so does await/1 on backtracking.

  176await(ephemeral_token(Vars,SolutionsQ)) :-
  177    repeat,
  178    thread_get_message(SolutionsQ,Solution),
  179    ( Solution = solution(Vars) ->
  180        true
  181    ; Solution = final(Vars) ->
  182        !,
  183        true
  184    ; Solution = none ->
  185        !,
  186        fail
  187    ; Solution = exception(E) ->
  188        throw(E)
  189    ; % what? ->
  190        throw(unexpected_await_solution(Solution))
  191    ).
  192await(lazy_thunk(Goal)) :-
  193    call(Goal)