View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2010-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(tipc_linda,
   36          [
   37           linda/0,                  %
   38           linda/1,                  % +Term
   39           linda_client/1,           % +Address
   40           close_client/0,           %
   41           linda_timeout/1,          % +Number
   42           linda_timeout/2,          % ?Number, ?Number
   43           out/1,                    % +Term
   44           in/1,                     % ?Term
   45           in_noblock/1,             % ?Term
   46           in/2,                     % +List, ?Term
   47           rd/1,                     % ?Term
   48           rd_noblock/1,             % ?Term
   49           rd/2,                     % +List, ?Term
   50           bagof_rd_noblock/3,       % +Template, ?Term, -Bag
   51           bagof_in_noblock/3,       % +Template, ?Term, -Bag
   52           linda_eval/1,             % :Head
   53           linda_eval/2,             % ?Head, :Body
   54           linda_eval_detached/1,    % :Head
   55           linda_eval_detached/2,    % ?Head, :Body
   56           tuple/1,                  % :Goal
   57           tuple/2,                  % ?Head, :Body
   58           tipc_linda_server/0,      %
   59           tipc_initialize/0
   60          ]).   61
   62:- use_module(library(tipc/tipc_broadcast)).   63:- use_module(library(unix)).   64
   65:- require([ broadcast/1
   66           , broadcast_request/1
   67           , throw/1
   68           , member/2
   69           , must_be/2
   70           , catch/3
   71           , flag/3
   72           , listen/3
   73           , setup_call_cleanup/3
   74           , strip_module/3
   75           ]).

A Process Communication Interface

Linda is a framework for building systems that are composed of programs that cooperate among themselves in order to realize a larger goal. A Linda application is composed of two or more processes acting in concert. One process acts as a server and the others act as clients. Fine-grained communications between client and server is provided by way of message passing over sockets and support networks, TIPC sockets in this case. Clients interact indirectly by way of the server. The server is in principle an eraseable blackboard that clients can use to write (out/1), read (rd/1) and remove (in/1) messages called tuples. Some predicates will fail if a requested tuple is not present on the blackboard. Others will block until a tuple instance becomes available. Tuple instances are made available to clients by writing them on the blackboard using out/1.

In TIPC Linda, there is a subtle difference between the in and the rd predicates that is worth noting. The in predicates succeed exactly once for each tuple placed in the tuple space. The tuple is provided to exactly one requesting client. Clients can contend for tuples in this way, thus enabling multi-server operations. The rd predicates succeed nondeterministically, providing all matching tuples in the tuple space at a given time to the requesting client as a choice point without disturbing them.

TIPC Linda is inspired by and adapted from the SICStus Prolog API. But unlike SICStus TCP Linda, TIPC Linda is connectionless. There is no specific session between client and server. The server receives and responds to datagrams originated by clients in an epiperiodic manner.

Example: A simple producer-consumer.

In client 1:

init_producer :-
       linda_client(global),
       producer.

producer :-
       produce(X),
       out(p(X)),
       producer.

produce(X) :- .....

In client 2:

init_consumer :-
        linda_client(global),
        consumer.

consumer :-
       in(p(A)),
       consume(A),
       consumer.

consume(A) :- .....

Example: Synchronization

       ...,
       in(ready),  %Waits here until someone does out(ready)
       ...,

Example: A critical region

       ...,
       in(region_free),  % wait for region to be free
       critical_part,
       out(region_free), % let next one in
       ...,

Example: Reading global data

       ...,
       rd(data(Data)),
       ...,

or, without blocking:

       ...,
       (rd_noblock(data(Data)) ->
             do_something(Data)
       ;     write('Data not available!'),nl
       ),
       ...,

Example: Waiting for any one of several events

       ...,
       in([e(1),e(2),...,e(n)], E),
%  Here is E instantiated to the first tuple that became available
       ...,

Example: Producers and Consumers in the same process using linda_eval threads and/or tuple predicates

  consumer1 :-
        repeat,
        in([p(_), quit], Y),
        (   Y = p(Z) -> writeln(consuming(Z)); !),
        fail.

  producer1 :-
        forall(between(1,40, X), out(p(X))).

  producer_consumer1 :-
        linda_eval(consumer1),
        call_cleanup(producer1, out(quit)), !.
%
%
  consumer2 :-
       between(1,4,_),
       in_noblock(p(X)), !,
       writeln(consuming(X)),
       consumer2.

  producer2 :-
        linda_eval(p(X), between(1,40, X)).

  producer_consumer2 :-
        producer2,
        linda_eval(consumer2), !.
%
%
  consumer3 :-
        forall(rd_noblock(p(X)), writeln(consuming(X))).

  producer3 :-
        tuple(p(X), between(1,40, X)).

  producer_consumer3 :-
        producer3,
        linda_eval(done, consumer3),
        in(done), !.

Servers

The server is the process running the "blackboard process". It is part of TIPC Linda. It is a collection of predicates that are registered as tipc_broadcast listeners. The server process can be run on a separate machine if necessary.

To load the package, enter the query:

?- use_module(library(tipc/tipc_linda)).

?- linda.
   TIPC Linda server now listening at: port_id('<1.1.1:3200515722>')
   true.

Clients

The clients are one or more Prolog processes that have connection(s) to the server.

To load the package, enter the query:

?- use_module(library(tipc/tipc_linda)).

?- linda_client(global).
   TIPC Linda server listening at: port_id('<1.1.1:3200515722>')
   true.
author
- Jeffrey A. Rosenwald
See also
- Nicholas Carriero and David Gelernter. How to Write Parallel Programs: A First Course. The MIT Press, Cambridge, MA, 1990.
Compatibility
- SWI-Prolog for Linux only
- tipc_broadcast library */
  258:- meta_predicate eventually_implies(0,0), ~>(0,0), safely(0).  259
  260safely(Goal) :-
  261    catch(Goal, Err, (print_message(error, Err), fail)).
  262
  263eventually_implies(P, Q) :-
  264    setup_call_cleanup(P, (Foo = true; Foo = false), assertion(Q)),
  265    Foo == true.
  266
  267:- op(950, xfy, ~>).  268
  269~>(P, Q) :- eventually_implies(P, Q).
  270
  271
  272:- dynamic(linda_data/1).  273
  274%
  275%    This is the backend state machine
  276%
  277
  278linda_action(rd(listening)) :- !.
  279
  280linda_action(in(TupleList, Tuple)) :-
  281    member(Tuple, TupleList),
  282    retract(linda_data(Tuple)),
  283    !.
  284
  285linda_action(in(Tuple)) :-
  286    retract(linda_data(Tuple)),
  287    !.
  288
  289linda_action(out(Tuple)) :-
  290    assert(linda_data(Tuple)).
  291
  292linda_action(rd(TupleList, Tuple)) :-
  293    member(Tuple, TupleList),
  294    linda_data(Tuple).
  295
  296linda_action(rd(Tuple)) :-
  297    linda_data(Tuple).
  298
  299linda_action(bagof_rd_noblock(Template, Var^Tuple, Bag)) :-
  300    !, bagof(Template, Var^linda_data(Tuple), Bag).
  301
  302linda_action(bagof_rd_noblock(Template, Tuple, Bag)) :-
  303    !, bagof(Template, linda_data(Tuple), Bag).
  304
  305linda_action(bagof_in_noblock(Template, Var^Tuple, Bag)) :-
  306    Datum = linda_data(Tuple),
  307    !, bagof(Template, Var^(Datum, retract(Datum)), Bag).
  308
  309linda_action(bagof_in_noblock(Template, Tuple, Bag)) :-
  310    !, bagof(Template, retract(linda_data(Tuple)), Bag).
  311
  312%
  313%    This is the user interface
  314%
 linda is det
 linda(:Goal) is det
Starts a Linda-server in this process. The network address is written to current output stream as a TIPC port_id/2 reference (e.g. port_id('<1.1.1:3200515722>') ). This predicates looks to see if a server is already listening on the cluster. If so, it reports the address of the existing server. Otherwise, it registers a new server and reports its address.
?- linda.
   TIPC Linda server now listening at: port_id('<1.1.1:3200515722>')
   true.

?- linda.
   TIPC Linda server still listening at: port_id('<1.1.1:3200515722>')
   true.

The following will call my_init/0 in the current module after the server is successfully started or is found already listening. my_init/0 could start client-processes, initialize the tuple space, etc.

?- linda(my_init).
  345linda_listening(Addr) :-
  346    basic_request(rd(listening), Addr),
  347    !.
  348
  349linda :-
  350    linda_listening(Addr),
  351    !,
  352    format('TIPC Linda server still listening at: ~p~n', [Addr]).
  353
  354linda :-
  355    listen(tipc_linda, '$linda'(Action), linda_action(Action)),
  356    linda_listening(Addr),
  357    !,
  358    format('TIPC Linda server now listening at: ~p~n', [Addr]).
  359
  360:- meta_predicate linda(0).  361
  362linda(Hook) :-
  363    linda,
  364    call(Hook).
 linda_client(+Domain) is semidet
Establishes a connection to a Linda-server providing a named tuple space. Domain is an atom specifying a particular tuple-space, selected from a universe of tuple-spaces. At present however, only one tuple-space, global, is supported. A client may interact with any server reachable on the TIPC cluster. This predicate will fail if no server is reachable for that tuple space.
  376linda_client(global) :-
  377    linda_listening(Addr),
  378    !,
  379    format('TIPC Linda server listening at: ~p~n', [Addr]).
 close_client is det
Closes the connection to the Linda-server. Causes the server to release resources associated with this client.
  386close_client :- true.   % Presently a noop
 linda_timeout(?OldTime, ?NewTime) is semidet
Controls Linda's message-passing timeout. It specifies the time window where clients will accept server replies in response to in and rd requests. Replies arriving outside of this window are silently ignored. OldTime is unified with the old timeout and then timeout is set to NewTime. NewTime is of the form Seconds:Milliseconds. A non-negative real number, seconds, is also recognized. The default is 0.250 seconds. This timeout is thread local and is not inherited from its parent. New threads are initialized to the default.

Note: The synchronous behavior afforded by in/1 and rd/1 is implemented by periodically polling the server. The poll rate is set according to this timeout. Setting the timeout too small may result in substantial network traffic that is of little value.

throws
- error(feature_not_supported). SICStus Linda can disable the timeout by specifying off as NewTime. This feature does not exist for safety reasons.
  409:- thread_local linda_time_out/1.  410
  411linda_timeout(Time, Time) :-
  412    linda_time_out(Time),
  413    !.
  414
  415linda_timeout(_OldTime, NewTime) :-
  416    NewTime == off,
  417    throw(error(feature_not_supported)).
  418
  419linda_timeout(OldTime, NewTime) :-
  420    ground(NewTime),
  421    NewTime = Seconds:Milliseconds,
  422    NewTime1 is float(Seconds + (Milliseconds / 1000.0)),
  423    linda_timeout(OldTime, NewTime1),
  424    !.
  425
  426linda_timeout(OldTime, NewTime) :-
  427    ground(NewTime),
  428    NewTime >= 0.020,
  429    clause(linda_time_out(OldTime), true, Ref),
  430    asserta(linda_time_out(NewTime)) -> erase(Ref),
  431    !.
  432
  433linda_timeout(0.250, NewTime) :-
  434    NewTime >= 0.020,
  435    asserta(linda_time_out(NewTime)).
 linda_timeout(+NewTime) is semidet
Temporarily sets Linda's timeout. Internally, the original timeout is saved and then the timeout is set to NewTime. NewTime is as described in linda_timeout/2. The original timeout is restored automatically on cut of choice points, failure on backtracking, or uncaught exception.
  446linda_timeout(NewTime) :-
  447    linda_timeout(OldTime, NewTime) ~>
  448        linda_timeout(NewTime, OldTime).
  449
  450basic_request(Action) :-
  451    basic_request(Action, _Addr).
  452
  453basic_request(Action, Addr) :-
  454    linda_timeout(Time, Time),
  455    broadcast_request(tipc_cluster('$linda'(Action):Addr, Time)).
 out(+Tuple) is det
Places a Tuple in Linda's tuple-space.
  462out(Tuple) :-
  463    broadcast(tipc_cluster('$linda'(out(Tuple)))),
  464    !.
 in(?Tuple) is det
Atomically removes the tuple Tuple from Linda's tuple-space if it is there. The tuple will be returned to exactly one requestor. If no tuple is available, the predicate blocks until it is available (that is, someone performs an out/1).
  473in(Tuple) :-
  474    repeat,
  475    in_noblock(Tuple),
  476    !.
 in_noblock(?Tuple) is semidet
Atomically removes the tuple Tuple from Linda's tuple-space if it is there. If not, the predicate fails. This predicate can fail due to a timeout.
  484in_noblock(Tuple) :-
  485    basic_request(in(Tuple)),
  486    !.
 in(+TupleList, -Tuple) is det
As in/1 but succeeds when any one of the tuples in TupleList is available. Tuple is unified with the fetched tuple.
  493in(TupleList, Tuple) :-
  494    must_be(list, TupleList),
  495    repeat,
  496    basic_request(in(TupleList, Tuple)),
  497    !.
 rd(?Tuple) is nondet
Succeeds nondeterministically if Tuple is available in the tuple-space, suspends otherwise until it is available. Compare this with in/1: the tuple is not removed.
  505rd(Tuple) :-
  506    repeat,
  507    rd_noblock(Tuple).
 rd_noblock(?Tuple) is nondet
Succeeds nondeterministically if Tuple is available in the tuple-space, fails otherwise. This predicate can fail due to a timeout.
  515rd_noblock(Tuple) :-
  516    basic_request(rd(Tuple)).
 rd(?TupleList, -Tuple) is nondet
As in/2 but provides a choice point that does not remove any tuples.
  523rd(TupleList, Tuple) :-
  524    must_be(list, TupleList),
  525    repeat,
  526    basic_request(rd(TupleList, Tuple)).
 bagof_in_noblock(?Template, ?Tuple, -Bag) is nondet
 bagof_rd_noblock(?Template, ?Tuple, -Bag) is nondet
Bag is the list of all instances of Template such that Tuple exists in the tuple-space. The behavior of variables in Tuple and Template is as in bagof/3. The variables could be existentially quantified with ^/2 as in bagof/3. The operation is performed as an atomic operation. This predicate can fail due to a timeout. Example: Assume that only one client is connected to the server and that the tuple-space initially is empty.
  ?- out(x(a,3)), out(x(a,4)), out(x(b,3)), out(x(c,3)).

  true.
  ?- bagof_rd_noblock(C-N, x(C,N), L).

  L = [a-3,a-4,b-3,c-3] .

  true.
  ?- bagof_rd_noblock(C, N^x(C,N), L).

  L = [a,a,b,c] .

  true.
  555bagof_rd_noblock(Template,  Tuple, Bag) :-
  556    !, basic_request(bagof_rd_noblock(Template, Tuple, Bag)).
  557
  558bagof_in_noblock(Template,  Tuple, Bag) :-
  559    !, basic_request(bagof_in_noblock(Template, Tuple, Bag)).
  560
  561:- meta_predicate
  562      linda_eval(?, 0),
  563      linda_eval(0),
  564      linda_eval_detached(?, 0),
  565      linda_eval_detached(0).
 linda_eval(:Goal) is det
 linda_eval(?Head, :Goal) is det
 linda_eval_detached(:Goal) is det
 linda_eval_detached(?Head, :Goal) is det
Causes Goal to be evaluated in parallel with a parent predicate. The child thread is a full-fledged client, possessing the same capabilities as the parent. Upon successful completion of Goal, unbound variables are unified and the result is sent to the Linda server via out/1, where it is made available to others. linda_eval/2 evaluates Goal, then unifies the result with Head, providing a means of customizing the resulting output structure. In linda_eval/1, Head, and Goal are identical, except that the module name for Head is stripped before output. If the child fails or receives an uncaught exception, no such output occurs.

Joining Threads: Threads created using linda_eval/(1-2) are not allowed to linger. They are joined (blocking the parent, if necessary) under three conditions: backtracking on failure into an linda_eval/(1-2), receipt of an uncaught exception, and cut of choice-points. Goals are evaluated using forall/2. They are expected to provide nondeterministic behavior. That is they may succeed zero or more times on backtracking. They must however, eventually fail or succeed deterministically. Otherwise, the thread will hang, which will eventually hang the parent thread. Cutting choice points in the parent's body has the effect of joining all children created by the parent. This provides a barrier that guarantees that all child instances of Goal have run to completion before the parent proceeds. Detached threads behave as above, except that they operate independently and cannot be joined. They will continue to run while the host process continues to run.

Here is an example of a parallel quicksort:

qksort([], []).

qksort([X | List], Sorted) :-
      partition(@>(X), List, Less, More),
      linda_eval(qksort(More, SortedMore)),
      qksort(Less, SortedLess), !,
      in_noblock(qksort(More, SortedMore)),
      append(SortedLess, [X | SortedMore], Sorted).
  612linda_eval(Head) :-
  613    linda_eval(Head, Head).
  614
  615linda_eval(Head, Body) :-
  616    must_be(callable, Body),
  617    strip_module(Head, _Module, Plain),
  618    thread_create(forall(Body, out(Plain)), Id, []) ~>
  619       thread_join(Id, true).
  620
  621linda_eval_detached(Head) :-
  622    linda_eval_detached(Head, Head).
  623
  624linda_eval_detached(Head, Body) :-
  625    must_be(callable, Body),
  626    strip_module(Head, _Module, Plain),
  627    thread_create(forall(Body, out(Plain)), _Id, [detached(true)]).
 tuple(:Goal) is det
 tuple(?Head, :Goal) is det
registers Head as a virtual tuple in TIPC Linda's tuple space. On success, any client on the cluster may reference the tuple, Head, using rd/1 or rd_noblock/1. On reference, Goal is executed by a separate thread of execution in the host client's Prolog process. The result is unified with Head, which is then returned to the guest client. As in linda_eval/(1-2) above, Goal is evaluated using forall/2. The virtual tuple is unregistered on backtracking into a tuple/(1-2), receipt of uncaught exception, or cut of choice-points. In tuple/1, Head and Goal are identical, except that the module name is stripped from Head.

Note: A virtual tuple is an extension of the server. Even though it is operating in the client's Prolog environment, it is restricted in the server operations that it may perform. It is generally safe for tuple predicates to perform out/1 operations, but it is unsafe for them to perform any variant of in or rd, either directly or indirectly. This restriction is however, relaxed if the server and client are operating in separate heavyweight processes (not threads) on the node or cluster. This is most easily achieved by starting a stand-alone Linda server somewhere on the cluster. See tipc_linda_server/0, below.

  654:- meta_predicate tuple(?, 0), tuple(0).  655
  656tuple(Head) :-
  657    tuple(Head, Head).
  658
  659tuple(Head, Body) :-
  660    must_be(callable, Body),
  661    strip_module(Head, _Module, Plain),
  662    listen(user, '$linda'(rd(Plain)), Body) ~>
  663        unlisten(user, '$linda'(rd(Plain)), Body).
 tipc_linda_server is nondet
Acts as a stand-alone Linda server. This predicate initializes the TIPC stack and then starts a Linda server in the current thread. If a client performs an out(server_quit), the server's Prolog process will exit via halt/1. It is intended for use in scripting as follows:
swipl -q -g 'use_module(library(tipc/tipc_linda)),
       tipc_linda_server' -t 'halt(1)'

See also manual section 2.10.2.1 Using PrologScript.

Note: Prolog will return a non-zero exit status if this predicate is executed on a cluster that already has an active server. An exit status of zero is returned on graceful shutdown.

throws
- error(permission_error(halt,thread,2),context(halt/1,Only from thread 'main')), if this predicate is executed in a thread other than main.
  689wait_for_quit :-
  690    linda_timeout(6.0),
  691    in(server_quit),
  692    halt(0).
  693
  694tipc_linda_server :-
  695%   detach_IO,               % become a daemon
  696            tipc_initialize,
  697            (   linda_client(global) -> true; linda(wait_for_quit)).
 tipc_initialize is semidet
See tipc:tipc_initialize/0.