View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Hongxin Liang and Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2021, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(stomp,
   36          [ stomp_connection/5,    % +Address, +Host, +Headers,
   37                                   % :Callback, -Connection
   38            stomp_connection/6,    % +Address, +Host, +Headers,
   39                                   % :Callback, -Connection, +Options
   40            stomp_connection_property/2, % ?Connection, ?Property
   41            stomp_destroy_connection/1, % +Connection
   42            stomp_connect/1,       % +Connection
   43            stomp_connect/2,       % +Connection, +Options
   44            stomp_teardown/1,      % +Connection
   45            stomp_reconnect/1,	   % +Connection
   46            stomp_send/4,          % +Connection, +Destination, +Headers, +Body
   47            stomp_send_json/4,     % +Connection, +Destination, +Headers, +JSON
   48            stomp_subscribe/4,     % +Connection, +Destination, +Id, +Headers
   49            stomp_unsubscribe/2,   % +Connection, +Id
   50            stomp_ack/2,           % +Connection, +MsgHeaders
   51            stomp_nack/2,          % +Connection, +MsgHeaders
   52            stomp_ack/3,           % +Connection, +MessageId, +Headers
   53            stomp_nack/3,          % +Connection, +MessageId, +Headers
   54            stomp_transaction/2,   % +Connection, :Goal
   55            stomp_disconnect/2,    % +Connection, +Headers
   56                                   % Low level predicates
   57            stomp_begin/2,         % +Connection, +Transaction
   58            stomp_commit/2,        % +Connection, +Transaction
   59            stomp_abort/2,         % +Connection, +Transaction
   60            stomp_setup/2          % +Connection, +Options
   61          ]).

STOMP client.

This module provides a STOMP (Simple (or Streaming) Text Orientated Messaging Protocol) client. This client is based on work by Hongxin Liang. The current version is a major rewrite, both changing the API and the low-level STOMP frame (de)serialization.

The predicate stomp_connection/5 is used to register a connection. The connection is established by stomp_connect/1, which is lazily called from any of the predicates that send a STOMP frame. After establishing the connection two threads are created. One receives STOMP frames and the other manages and watches the heart beat.

Threading

Upon receiving a frame the callback registered with stomp_connection/5 is called in the context of the receiving thread. More demanding applications may decide to send incomming frames to a SWI-Prolog message queue and have one or more worker threads processing the input. Alternatively, frames may be inspected by the receiving thread and either processed immediately or be dispatched to either new or running threads. The best scenario depends on the message rate, processing time and concurrency of the Prolog application.

All message sending predicates of this library are thread safe. If two threads send a frame to the same connection the library ensures that both frames are properly serialized.

Reconnecting

By default this library tries to establish the connection and the user gets notified by means of a disconnected pseudo frame that the connection is lost. Using the Options argument of stomp_connection/6 the system can be configured to try and keep connecting if the server is not available and reconnect if the connection is lost. See the pong.pl example.

author
- Hongxin Liang and Jan Wielemaker
See also
- http://stomp.github.io/index.html
- https://github.com/jasonrbriggs/stomp.py
license
- BSD-2
To be done
- TSL support */
  107:- meta_predicate
  108    stomp_connection(+, +, +, 4, -),
  109    stomp_connection(+, +, +, 4, -, +),
  110    stomp_transaction(+, 0).  111
  112:- use_module(library(apply)).  113:- use_module(library(debug)).  114:- use_module(library(error)).  115:- use_module(library(gensym)).  116:- use_module(library(http/http_stream)).  117:- use_module(library(http/json)).  118:- use_module(library(readutil)).  119:- use_module(library(socket)).  120:- use_module(library(uuid)).  121:- use_module(library(lists)).  122:- use_module(library(option)).  123:- use_module(library(time)).  124
  125:- dynamic
  126    connection_property/3.
 stomp_connection(+Address, +Host, +Headers, :Callback, -Connection) is det
 stomp_connection(+Address, +Host, +Headers, :Callback, -Connection, +Options) is det
Create a connection reference. The connection is not set up yet by this predicate. Callback is called on any received frame except for heart beat frames as below.
call(Callback, Command, Connection, Header, Body)

Where command is one of the commands below. Header is a dict holding the STOMP frame header, where all values are strings except for the 'content-length' key value which is passed as an integer.

Body is a string or, if the data is of the type application/json, a dict.

connected
A connection was established. Connection and Header are valid.
disconnected
The connection was lost. Only Connection is valid.
message
A message arrived. All three arguments are valid. Body is a dict if the content-type of the message is application/json and a string otherwise.
heartbeat
A heartbeat was received. Only Connection is valid. This callback is also called for each newline that follows a frame. These newlines can be a heartbeat, but can also be additional newlines that follow a frame.
heartbeat_timeout
No heartbeat was received. Only Connection is valid.
error
An error happened. All three arguments are valid and handled as message.

Note that stomp_teardown/1 causes the receiving and heartbeat thread to be signalled with abort/0.

Options processed:

reconnect(+Bool)
Try to reestablish the connection to the server if the connection is lost. Default is false
connect_timeout(+Seconds)
Maximum time to try and reestablish a connection. The default is 600 (10 minutes).
json_options(+Options)
Options passed to json_read_dict/3 to translate application/json content to Prolog. Default is [].
Arguments:
Address- is a valid address for tcp_connect/3. Normally a term Host:Port, e.g., localhost:32772.
Host- is a path denoting the STOMP host. Often just /.
Headers- is a dict with STOMP headers used for the CONNECT request.
Connection- is an opaque ground term that identifies the connection.
  190stomp_connection(Address, Host, Headers, Callback, Connection) :-
  191    stomp_connection(Address, Host, Headers, Callback, Connection, []).
  192
  193stomp_connection(Address, Host, Headers, Callback, Connection, Options) :-
  194    option(reconnect(Reconnect), Options, false),
  195    option(connect_timeout(Timeout), Options, 600),
  196    option(json_options(JSONOptions), Options, []),
  197    valid_address(Address),
  198    must_be(atom, Host),
  199    must_be(dict, Headers),
  200    must_be(callable, Callback),
  201    uuid(Connection),
  202    retractall(connection_property(Connection, _, _)),
  203    update_connection_mapping(
  204        Connection,
  205        _{ address: Address,
  206           callback: Callback,
  207           host: Host,
  208           headers: Headers,
  209           reconnect: Reconnect,
  210           connect_timeout: Timeout,
  211           json_options: JSONOptions
  212         }).
  213
  214valid_address(Host:Port) :-
  215    !,
  216    must_be(atom, Host),
  217    must_be(integer, Port).
  218valid_address(Address) :-
  219    type_error(stom_address, Address).
  220
  221connection_property(address).
  222connection_property(callback).
  223connection_property(host).
  224connection_property(headers).
  225connection_property(reconnect).
  226connection_property(connect_timeout).
 stomp_connection_property(?Connection, ?Property) is nondet
True when Property, is a property of Connection. Defined properties are:
address(Address)
callback(Callback)
host(Host)
headers(Headers)
reconnect(Bool)
connect_timeout(Seconds)
All the above properties result from the stomp_connection/6 registration.
receiver_thread_id(Thread)
stream(Stream)
heartbeat_thread_id(Thread)
received_heartbeat(TimeStamp)
These describe an active STOMP connection.
  247stomp_connection_property(Connection, Property) :-
  248    var(Property),
  249    !,
  250    connection_property(Connection, Name, Value),
  251    Property =.. [Name,Value].
  252stomp_connection_property(Connection, Property) :-
  253    must_be(compound, Property),
  254    Property =.. [Name,Value],
  255    query_connection_property(Connection, Name, Value).
 stomp_destroy_connection(+Connection)
Destroy a connection. If it is active, first use stomp_teardown/1 to disconnect.
  262stomp_destroy_connection(Connection) :-
  263    must_be(ground, Connection),
  264    (   query_connection_property(Connection, address, _)
  265    ->  stomp_teardown(Connection),
  266        retractall(connection_property(Connection, _, _))
  267    ;   existence_error(stomp_connection, Connection)
  268    ).
 stomp_setup(+Connection, +Options) is det
Set up the actual socket connection and start receiving thread. This is a no-op if the connection has already been created. The Options processed are below. Other options are passed to tcp_connect/3.
timeout(+Seconds)
If tcp_connect/3 fails, retry until the timeout is reached. If Seconds is inf or infinite, keep retrying forever.
  280stomp_setup(Connection, Options) :-
  281    stomp_setup(Connection, _New, Options).
  282
  283stomp_setup(Connection, false, _) :-
  284    query_connection_property(Connection, stream, _Stream),
  285    !.
  286stomp_setup(Connection, New, Options) :-
  287    with_mutex(stomp, stomp_setup_guarded(Connection, New, Options)).
  288
  289stomp_setup_guarded(Connection, false, _) :-
  290    query_connection_property(Connection, stream, _Stream),
  291    !.
  292stomp_setup_guarded(Connection, true, Options) :-
  293    query_connection_property(Connection, address, Address),
  294    connect(Connection, Address, Stream, Options),
  295    set_stream(Stream, encoding(utf8)),
  296    gensym(stomp_receive, Alias),
  297    thread_create(receive(Connection, Stream), ReceiverThreadId, [alias(Alias)]),
  298    debug(stomp(connection), 'Handling input on thread ~p', [ReceiverThreadId]),
  299    update_connection_mapping(Connection,
  300                              _{ receiver_thread_id: ReceiverThreadId,
  301                                 stream:Stream
  302                               }).
 connect(+Connection, +Address, -Stream, +Options) is det
Connect to Address. If the option timeout(Sec) is present, retry the connection until the timeout is reached.
  309connect(Connection, Address, Stream, Options) :-
  310    stomp_deadline(Connection, Deadline, Options),
  311    connect_with_deadline(Connection, Address, Stream, Deadline, Options).
  312
  313connect_with_deadline(_Connection, Address, Stream, once, Options) :-
  314    !,
  315    tcp_connect(Address, Stream, Options).
  316connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
  317    number(Deadline),
  318    !,
  319    between(0, infinite, Retry),
  320      get_time(Now),
  321      Timeout is Deadline-Now,
  322      (   Now > 0
  323      ->  (   catch(call_with_time_limit(
  324                        Timeout,
  325                        tcp_connect(Address, Stream, Options)),
  326                    Error,
  327                    true)
  328          ->  (   var(Error)
  329              ->  !
  330              ;   (   debugging(stomp(connection))
  331                  ->  print_message(warning, Error)
  332                  ;   true
  333                  ),
  334                  wait_retry(Connection, Error, Retry, Deadline)
  335              )
  336          ;   wait_retry(Connection, failed, Retry, Deadline)
  337          )
  338      ;   throw(stomp_error(connect, Connection, timeout))
  339      ).
  340connect_with_deadline(Connection, Address, Stream, Deadline, Options) :-
  341    between(0, infinite, Retry),
  342      Error = error(Formal, _),
  343      (   catch(tcp_connect(Address, Stream, Options),
  344                Error,
  345                true)
  346      ->  (   var(Formal)
  347          ->  !
  348          ;   (   debugging(stomp(connection))
  349              ->  print_message(warning, Error)
  350              ;   true
  351              ),
  352              wait_retry(Connection, Formal, Retry, Deadline)
  353          )
  354      ;   wait_retry(Connection, failed, Retry, Deadline)
  355      ).
  356
  357wait_retry(Connection, Why, _Retry, _Deadline) :-
  358    Why = error(stomp_error(connect, Connection, error(_)), _),
  359    !,
  360    throw(Why).
  361wait_retry(Connection, _Why, Retry, Deadline) :-
  362    Wait0 is min(10, 0.1 * (1<<Retry)),
  363    (   number(Deadline)
  364    ->  get_time(Now),
  365        Wait is min(Deadline-Now, Wait0)
  366    ;   Wait = Wait0
  367    ),
  368    (   Wait > 0
  369    ->  sleep(Wait),
  370        fail
  371    ;   throw(error(stomp_error(connect, Connection, timeout), _))
  372    ).
 stomp_teardown(+Connection) is semidet
Tear down the socket connection, stop receiving thread and heartbeat thread (if applicable). The registration of the connection as created by stomp_connection/5 is preserved and the connection may be reconnected using stomp_connect/1.
  382stomp_teardown(Connection) :-
  383    terminate_helper(Connection, receiver_thread_id),
  384    terminate_helper(Connection, heartbeat_thread_id),
  385    forall(query_connection_property(Connection, stream, Stream),
  386           close(Stream, [force(true)])),
  387    debug(stomp(connection), 'retract connection mapping for ~p', [Connection]),
  388    reset_connection_properties(Connection).
  389
  390terminate_helper(Connection, Helper) :-
  391    retract(connection_property(Connection, Helper, Thread)),
  392    \+ thread_self(Thread),
  393    catch(thread_signal(Thread, abort), error(_,_), fail),
  394    !,
  395    thread_join(Thread, _Status).
  396terminate_helper(_, _).
  397
  398reset_connection_properties(Connection) :-
  399    findall(P,
  400            (   query_connection_property(Connection, P, _),
  401                \+ connection_property(P)
  402            ), Ps),
  403    forall(member(P, Ps),
  404           retractall(connection_property(Connection, P, _))).
 stomp_reconnect(+Connection) is det
Teardown the connection and try to reconnect.
  410stomp_reconnect(Connection) :-
  411    stomp_teardown(Connection),
  412    stomp_connect(Connection).
 stomp_connect(+Connection) is det
 stomp_connect(+Connection, +Options) is det
Setup the connection. First ensures a socket connection and if this is new send a CONNECT frame. Protocol version and heartbeat negotiation will be handled. STOMP frame is not used for backward compatibility.

This predicate waits for the connection handshake to have completed. Currently it waits for a maximum of 10 seconds after establishing the socket for the server reply.

Calling this on an established connection has no effect.

Errors
- stomp_error(connect, Connection, Detail) if no connection could be established.
See also
- http://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame).
  432stomp_connect(Connection) :-
  433    stomp_connect(Connection, []).
  434
  435stomp_connect(Connection, Options) :-
  436    update_reconnect_property(Connection),
  437    stomp_deadline(Connection, Deadline, Options),
  438    stomp_deadline_connect(Connection, Deadline, Options).
  439
  440update_reconnect_property(Connection) :-
  441    query_connection_property(Connection, reconnect, disconnected),
  442    !,
  443    update_connection_property(Connection, reconnect, true).
  444update_reconnect_property(_).
  445
  446stomp_deadline_connect(Connection, Deadline, Options) :-
  447    between(0, infinite, Retry),
  448      stomp_setup(Connection, New, [deadline(Deadline)|Options]),
  449      (   New == true
  450      ->  Error = error(Formal, _),
  451          catch(connect_handshake(Connection), Error, true),
  452          (   var(Formal)
  453          ->  !
  454          ;   stomp_teardown(Connection),
  455              wait_retry(Connection, Error, Retry, Deadline)
  456          )
  457      ;   query_connection_property(Connection, connected, _)
  458      ->  true
  459      ;   wait_connected(Connection)
  460      ->  true
  461      ;   stomp_teardown(Connection),
  462          wait_retry(Connection, failed, Retry, Deadline)
  463      ).
  464
  465connect_handshake(Connection) :-
  466    query_connection_property(Connection, headers, Headers),
  467    query_connection_property(Connection, host, Host),
  468    send_frame(Connection,
  469               connect,
  470               Headers.put(_{ 'accept-version':'1.0,1.1,1.2',
  471                              host:Host
  472                            })),
  473    (   Heartbeat = Headers.get('heart-beat')
  474    ->  update_connection_property(Connection, 'heart-beat', Heartbeat)
  475    ;   true
  476    ),
  477    thread_self(Self),
  478    update_connection_property(Connection, waiting_thread, Self),
  479    (   thread_get_message(Self, stomp(connected(Connection, Status)),
  480                           [timeout(10)])
  481    ->  (   Status == true
  482        ->  get_time(Now),
  483            update_connection_property(Connection, connected, Now)
  484        ;   throw(error(stomp_error(connect, Connection, Status), _))
  485        )
  486    ;   throw(error(stomp_error(connect, Connection, timeout), _))
  487    ).
  488
  489wait_connected(Connection) :-
  490    thread_wait(query_connection_property(Connection, connected, _),
  491                [ timeout(10),
  492                  wait_preds([connection_property/3])
  493                ]),
  494    !.
  495wait_connected(Connection) :-
  496    throw(error(stomp_error(connect, Connection, timeout), _)).
 stomp_deadline(+Connection, -Deadline, +Options) is det
True when there is a connection timeout and Deadline is the deadline for establishing a connection. Deadline is one of
Number
The deadline as a time stamp
infinite
Keep trying
once
Try to connect once.
  510stomp_deadline(_Connection, Deadline, Options) :-
  511    option(deadline(Deadline), Options),
  512    !.
  513stomp_deadline(Connection, Deadline, Options) :-
  514    (   option(timeout(Time), Options)
  515    ;   query_connection_property(Connection, connect_timeout, Time)
  516    ),
  517    !,
  518    (   number(Time)
  519    ->  get_time(Now),
  520        Deadline is Now+Time
  521    ;   must_be(oneof([inf,infinite]), Time),
  522        Deadline = infinite
  523    ).
  524stomp_deadline(_, once, _).
 stomp_send(+Connection, +Destination, +Headers, +Body) is det
Send a SEND frame. If content-type is not provided, text/plain will be used. content-length will be filled in automatically.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SEND
  535stomp_send(Connection, Destination, Headers, Body) :-
  536    add_transaction(Headers, Headers1),
  537    send_frame(Connection, send, Headers1.put(destination, Destination), Body).
 stomp_send_json(+Connection, +Destination, +Headers, +JSON) is det
Send a SEND frame. JSON can be either a JSON term or a dict. content-type is filled in automatically as application/json and content-length will be filled in automatically as well.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SEND
  547stomp_send_json(Connection, Destination, Headers, JSON) :-
  548    add_transaction(Headers, Headers1),
  549    atom_json_term(Body, JSON,
  550                   [ as(string),
  551                     width(0)           % No layout for speed
  552                   ]),
  553    send_frame(Connection, send,
  554               Headers1.put(_{ destination:Destination,
  555                               'content-type':'application/json'
  556                             }),
  557               Body).
 stomp_subscribe(+Connection, +Destination, +Id, +Headers) is det
Send a SUBSCRIBE frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
  565stomp_subscribe(Connection, Destination, Id, Headers) :-
  566    send_frame(Connection, subscribe,
  567               Headers.put(_{destination:Destination, id:Id})).
 stomp_unsubscribe(+Connection, +Id) is det
Send an UNSUBSCRIBE frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#UNSUBSCRIBE
  575stomp_unsubscribe(Connection, Id) :-
  576    send_frame(Connection, unsubscribe, _{id:Id}).
 stomp_ack(+Connection, +MessageId, +Headers) is det
Send an ACK frame. See stomp_ack/2 for simply passing the header received with the message we acknowledge.
See also
- http://stomp.github.io/stomp-specification-1.2.html#ACK
  585stomp_ack(Connection, MessageId, Headers) :-
  586    send_frame(Connection, ack, Headers.put('message-id', MessageId)).
 stomp_nack(+Connection, +MessageId, +Headers) is det
Send a NACK frame. See stomp_nack/2 for simply passing the header received with the message we acknowledge.
See also
- http://stomp.github.io/stomp-specification-1.2.html#NACK
  595stomp_nack(Connection, MessageId, Headers) :-
  596    send_frame(Connection, nack, Headers.put('message-id', MessageId)).
 stomp_ack(+Connection, +MsgHeader) is det
 stomp_nack(+Connection, +MsgHeader) is det
Reply with an ACK or NACK based on the received message header. On a STOMP 1.1 request we get an ack field in the header and reply with an id. For STOMP 1.2 we reply with the message-id and subscription that we received with the message.
  606stomp_ack(Connection, Header) :-
  607    stomp_ack_nack(Connection, ack, Header).
  608
  609stomp_nack(Connection, Header) :-
  610    stomp_ack_nack(Connection, nack, Header).
  611
  612stomp_ack_nack(Connection, Type, Header) :-
  613    (   Id = Header.get(ack)
  614    ->  send_frame(Connection, Type, _{id: Id})
  615    ;   Pass = _{'message-id':_, subscription:_},
  616        Pass :< Header
  617    ->  send_frame(Connection, Type, Pass)
  618    ).
 stomp_begin(+Connection, +Transaction) is det
Send a BEGIN frame. @see http://stomp.github.io/stomp-specification-1.2.html#BEGIN
  626stomp_begin(Connection, Transaction) :-
  627    send_frame(Connection, begin, _{transaction:Transaction}).
 stomp_commit(+Connection, +Transaction) is det
Send a COMMIT frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#COMMIT
  635stomp_commit(Connection, Transaction) :-
  636    send_frame(Connection, commit, _{transaction:Transaction}).
 stomp_abort(+Connection, +Transaction) is det
Send a ABORT frame.
See also
- http://stomp.github.io/stomp-specification-1.2.html#ABORT
  644stomp_abort(Connection, Transaction) :-
  645    send_frame(Connection, abort, _{transaction:Transaction}).
 stomp_transaction(+Connection, :Goal) is semidet
Run Goal as once/1, tagging all SEND messages inside the transaction with the transaction id. If Goal fails or raises an exception the transaction is aborted. Failure or exceptions cause the transaction to be aborted using stomp_abort/2, after which the result is forwarded.
  655stomp_transaction(Connection, Goal) :-
  656    uuid(TransactionID),
  657    stomp_transaction(Connection, Goal, TransactionID).
  658
  659stomp_transaction(Connection, Goal, TransactionID) :-
  660    stomp_begin(Connection, TransactionID),
  661    (   catch(once(Goal), Error, true)
  662    ->  (   var(Error)
  663        ->  stomp_commit(Connection, TransactionID)
  664        ;   stomp_abort(Connection, TransactionID),
  665            throw(Error)
  666        )
  667    ;   stomp_abort(Connection, TransactionID),
  668        fail
  669    ).
  670
  671in_transaction(TransactionID) :-
  672    prolog_current_frame(Frame),
  673    prolog_frame_attribute(
  674        Frame, parent_goal,
  675        stomp_transaction(_Connection, _Goal, TransactionID)).
  676
  677add_transaction(Headers, Headers1) :-
  678    in_transaction(TransactionID),
  679    !,
  680    Headers1 = Headers.put(transaction, TransactionID).
  681add_transaction(Headers, Headers).
 stomp_disconnect(+Connection, +Headers) is det
Send a DISCONNECT frame. If the connection has the reconnect property set to true, this will be reset to disconnected to avoid reconnecting. A subsequent stomp_connect/2 resets the reconnect status.
See also
- http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT
  693stomp_disconnect(Connection, Headers) :-
  694    (   query_connection_property(Connection, reconnect, true)
  695    ->  update_connection_property(Connection, reconnect, disconnected)
  696    ;   true
  697    ),
  698    send_frame(Connection, disconnect, Headers).
 send_frame(+Connection, +Command, +Headers) is det
 send_frame(+Connection, +Command, +Headers, +Body) is det
Send a frame. If no connection is established connect first. If the sending results in an I/O error, disconnect, reconnect and try again if the reconnect propertys is set.
  707send_frame(Connection, Command, Headers) :-
  708    send_frame(Connection, Command, Headers, "").
  709
  710send_frame(Connection, Command, Headers, Body) :-
  711    Error = error(Formal,_),
  712    catch(send_frame_guarded(Connection, Command, Headers, Body),
  713          Error,
  714          true),
  715    (   var(Formal)
  716    ->  true
  717    ;   query_connection_property(Connection, reconnect, true)
  718    ->  notify(Connection, disconnected),
  719        stomp_teardown(Connection),
  720        debug(stomp(connection), 'Sending thread reconnects', []),
  721        send_frame(Connection, Command, Headers, Body)
  722    ;   notify(Connection, disconnected),
  723        throw(Error)
  724    ).
  725
  726send_frame_guarded(Connection, Command, Headers, Body) :-
  727    has_body(Command),
  728    !,
  729    connection_stream(Connection, Stream),
  730    default_content_type('text/plain', Headers, Headers1),
  731    body_bytes(Body, ContentLength),
  732    Headers2 = Headers1.put('content-length', ContentLength),
  733    with_output_to(Stream,
  734                   ( send_command(Stream, Command),
  735                     send_header(Stream, Headers2),
  736                     format(Stream, '~w\u0000\n', [Body]),
  737                     flush_output(Stream))).
  738send_frame_guarded(Connection, heartbeat, _Headers, _Body) :-
  739    !,
  740    connection_stream(Connection, Stream),
  741    nl(Stream),
  742    flush_output(Stream).
  743send_frame_guarded(Connection, Command, Headers, _Body) :-
  744    connection_stream(Connection, Stream),
  745    with_output_to(Stream,
  746                   ( send_command(Stream, Command),
  747                     send_header(Stream, Headers),
  748                     format(Stream, '\u0000\n', []),
  749                     flush_output(Stream))).
 connection_stream(+Connection, -Stream)
  753connection_stream(Connection, Stream) :-
  754    query_connection_property(Connection, stream, Stream),
  755    !.
  756connection_stream(Connection, Stream) :-
  757    stomp_connect(Connection),
  758    query_connection_property(Connection, stream, Stream).
  759
  760send_command(Stream, Command) :-
  761    string_upper(Command, Upper),
  762    format(Stream, '~w\n', [Upper]).
  763
  764send_header(Stream, Headers) :-
  765    dict_pairs(Headers, _, Pairs),
  766    maplist(send_header_line(Stream), Pairs),
  767    nl(Stream).
  768
  769send_header_line(Stream, Name-Value) :-
  770    (   number(Value)
  771    ->  format(Stream, '~w:~w\n', [Name,Value])
  772    ;   escape_value(Value, String),
  773        format(Stream, '~w:~w\n', [Name,String])
  774    ).
  775
  776escape_value(Value, String) :-
  777    split_string(Value, "\n:\\", "", [_]),
  778    !,
  779    String = Value.
  780escape_value(Value, String) :-
  781    string_codes(Value, Codes),
  782    phrase(escape(Codes), Encoded),
  783    string_codes(String, Encoded).
  784
  785escape([]) --> [].
  786escape([H|T]) --> esc(H), escape(T).
  787
  788esc(0'\r) --> !, "\\r".
  789esc(0'\n) --> !, "\\n".
  790esc(0':)  --> !, "\\c".
  791esc(0'\\) --> !, "\\\\".
  792esc(C)    --> [C].
  793
  794default_content_type(ContentType, Header0, Header) :-
  795    (   get_dict('content-type', Header0, _)
  796    ->  Header = Header0
  797    ;   put_dict('content-type', Header0, ContentType, Header)
  798    ).
  799
  800body_bytes(String, Bytes) :-
  801    setup_call_cleanup(
  802        open_null_stream(Out),
  803        ( write(Out, String),
  804          byte_count(Out, Bytes)
  805        ),
  806        close(Out)).
  807
  808
  809		 /*******************************
  810		 *        INCOMING DATA		*
  811		 *******************************/
 read_frame(+Connection, +Stream, -Frame) is det
Read a frame from a STOMP stream. On end-of-file, Frame is unified with the atom end_of_file. Otherwise it is a dict holding the cmd, headers (another dict) and body (a string)
  819read_frame(Connection, Stream, Frame) :-
  820    read_command(Stream, Command),
  821    (   Command == end_of_file
  822    ->  Frame = end_of_file
  823    ;   Command == heartbeat
  824    ->  Frame = stomp_frame{cmd:heartbeat}
  825    ;   read_header(Stream, Header),
  826        (   has_body(Command)
  827        ->  read_content(Connection, Stream, Header, Content),
  828            Frame = stomp_frame{cmd:Command, headers:Header, body:Content}
  829        ;   Frame = stomp_frame{cmd:Command, headers:Header}
  830        )
  831    ).
  832
  833has_body(send).
  834has_body(message).
  835has_body(error).
  836
  837read_command(Stream, Command) :-
  838    read_line_to_string(Stream, String),
  839    debug(stomp(command), 'Got line ~p', [String]),
  840    (   String == end_of_file
  841    ->  Command = end_of_file
  842    ;   String == ""
  843    ->  Command = heartbeat
  844    ;   string_lower(String, Lwr),
  845        atom_string(Command, Lwr)
  846    ).
  847
  848read_header(Stream, Header) :-
  849    read_header_lines(Stream, Pairs, []),
  850    dict_pairs(Header, stomp_header, Pairs).
  851
  852read_header_lines(Stream, Header, Seen) :-
  853    read_line_to_string(Stream, Line),
  854    (   Line == ""
  855    ->  Header = []
  856    ;   sub_string(Line, Before, _, After, ":")
  857    ->  sub_atom(Line, 0, Before, _, Name),
  858        (   memberchk(Name, Seen)
  859        ->  read_header_lines(Stream, Header, Seen)
  860        ;   sub_string(Line, _, After, 0, Value0),
  861            convert_value(Name, Value0, Value),
  862            Header = [Name-Value|More],
  863            read_header_lines(Stream, More, [Name|Seen])
  864        )
  865    ).
  866
  867convert_value('content-length', String, Bytes) :-
  868    !,
  869    number_string(Bytes, String).
  870convert_value(_, String, Value) :-
  871    unescape_header_value(String, Value).
  872
  873unescape_header_value(String, Value) :-
  874    sub_atom(String, _, _, _, "\\"),
  875    !,
  876    string_codes(String, Codes),
  877    phrase(unescape(Plain), Codes),
  878    string_codes(Value, Plain).
  879unescape_header_value(String, String).
  880
  881unescape([H|T]) --> "\\", !, unesc(H), unescape(T).
  882unescape([H|T]) --> [H], !, unescape(T).
  883unescape([]) --> [].
  884
  885unesc(0'\r) --> "r", !.
  886unesc(0'\n) --> "n", !.
  887unesc(0':)  --> "c", !.
  888unesc(0'\\) --> "\\", !.
  889unesc(_) --> [C], { syntax_error(invalid_stomp_escape(C)) }.
 read_content(+Connection, +Stream, +Header, -Content) is det
Read the body. Note that the body may be followed by an arbitrary number of newlines. We leave them in place to avoid blocking.
  896read_content(Connection, Stream, Header, Content) :-
  897    _{ 'content-length':Bytes,
  898       'content-type':Type
  899     } :< Header,
  900    setup_call_cleanup(
  901        stream_range_open(Stream, DataStream, [size(Bytes)]),
  902        read_content(Connection, Type, DataStream, Header, Content),
  903        close(DataStream)).
  904
  905read_content(Connection, "application/json", Stream, _Header, Content) :-
  906    !,
  907    query_connection_property(Connection, json_options, Options),
  908    json_read_dict(Stream, Content, Options).
  909read_content(_Connection, _Type, Stream, _Header, Content) :-
  910    read_string(Stream, _, Content).
 receive(+Connection, +Stream) is det
Read and process incoming messages from Stream.
  917receive(Connection, Stream) :-
  918    E = error(Formal, _),
  919    catch(read_frame(Connection, Stream, Frame), E, true),
  920    !,
  921    (   var(Formal)
  922    ->  debug(stomp(receive), 'received frame ~p', [Frame]),
  923        (   Frame == end_of_file
  924        ->  receive_done(Connection, end_of_file)
  925        ;   process_frame(Connection, Frame),
  926            receive(Connection, Stream)
  927        )
  928    ;   receive_done(Connection, E)
  929    ).
  930receive(Connection, _Stream) :-
  931    receive_done(Connection, "failed to read frame").
 receive_done(+Connection, +Why)
The receiver thread needs to close the connection due to reading end-of-file, an I/O error or failure to parse a frame. If connection is configured to be restarted this thread will try to reestablish the connection. After reestablishing the connection this receiver thread terminates.
  941receive_done(Connection, Why) :-
  942    (   Why = error(_,_)
  943    ->  print_message(warning, Why)
  944    ;   true
  945    ),
  946    notify(Connection, disconnected),
  947    stomp_teardown(Connection),
  948    (   query_connection_property(Connection, reconnect, true)
  949    ->  debug(stomp(connection), 'Receiver thread reconnects (~p)', [Why]),
  950        stomp_connect(Connection)
  951    ;   debug(stomp(connection), 'Receiver thread terminates (~p)', [Why])
  952    ),
  953    thread_self(Me),
  954    thread_detach(Me).
 process_frame(+Connection, +Frame) is det
Process an incoming frame.
  960process_frame(Connection, Frame) :-
  961    Frame.cmd = heartbeat, !,
  962    get_time(Now),
  963    debug(stomp(heartbeat), 'received heartbeat at ~w', [Now]),
  964    update_connection_property(Connection, received_heartbeat, Now),
  965    notify(Connection, heartbeat, _{}, "").
  966process_frame(Connection, Frame) :-
  967    _{cmd:FrameType, headers:Headers, body:Body} :< Frame,
  968    !,
  969    process_connect(FrameType, Connection, Frame),
  970    notify(Connection, FrameType, Headers, Body).
  971process_frame(Connection, Frame) :-
  972    _{cmd:FrameType, headers:Headers} :< Frame,
  973    process_connect(FrameType, Connection, Frame),
  974    notify(Connection, FrameType, Headers).
  975
  976process_connect(connected, Connection, Frame) :-
  977    retract(connection_property(Connection, waiting_thread, Waiting)),
  978    !,
  979    thread_send_message(Waiting, stomp(connected(Connection, true))),
  980    start_heartbeat_if_required(Connection, Frame.headers).
  981process_connect(error, Connection, Frame) :-
  982    retract(connection_property(Connection, waiting_thread, Waiting)),
  983    !,
  984    thread_send_message(
  985        Waiting,
  986        stomp(connected(Connection, error(Frame.body)))).
  987process_connect(_, _, _).
  988
  989start_heartbeat_if_required(Connection, Headers) :-
  990    (   query_connection_property(Connection, 'heart-beat', CHB),
  991        SHB = Headers.get('heart-beat')
  992    ->  start_heartbeat(Connection, CHB, SHB)
  993    ;   true
  994    ).
  995
  996start_heartbeat(Connection, CHB, SHB) :-
  997    extract_heartbeats(CHB, CX, CY),
  998    extract_heartbeats(SHB, SX, SY),
  999    calculate_heartbeats(CX-CY, SX-SY, X-Y),
 1000    \+ (X =:= 0, Y =:= 0),
 1001    !,
 1002    debug(stomp(heartbeat), 'calculated heartbeats are ~p,~p', [X, Y]),
 1003    SendSleep is X / 1000,
 1004    ReceiveSleep is Y / 1000 + 2,
 1005    (   X =:= 0
 1006    ->  SleepTime = ReceiveSleep
 1007    ;   (   Y =:= 0
 1008        ->  SleepTime = SendSleep
 1009        ;   SleepTime is gcd(X, Y) / 2000
 1010        )
 1011    ),
 1012    get_time(Now),
 1013    gensym(stomp_heartbeat, Alias),
 1014    debug(stomp(heartbeat), 'Creating heartbeat thread (~p ~p ~p)',
 1015          [SendSleep, ReceiveSleep, SleepTime]),
 1016    thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep,
 1017                                 SleepTime, Now, Now),
 1018                  HeartbeatThreadId, [alias(Alias)]),
 1019    update_connection_mapping(Connection,
 1020                              _{ heartbeat_thread_id:HeartbeatThreadId,
 1021                                 received_heartbeat:Now
 1022                               }).
 1023start_heartbeat(_, _, _).
 1024
 1025extract_heartbeats(Heartbeat, X, Y) :-
 1026    split_string(Heartbeat, ",", " ", [XS, YS]),
 1027    number_string(X, XS),
 1028    number_string(Y, YS).
 1029
 1030calculate_heartbeats(CX-CY, SX-SY, X-Y) :-
 1031    (   CX =\= 0, SY =\= 0
 1032    ->  X is max(CX, floor(SY))
 1033    ;   X = 0
 1034    ),
 1035    (   CY =\= 0, SX =\= 0
 1036    ->  Y is max(CY, floor(SX))
 1037    ;   Y = 0
 1038    ).
 1039
 1040heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
 1041               SendTime, ReceiveTime) :-
 1042    sleep(SleepTime),
 1043    get_time(Now),
 1044    (   Now - SendTime > SendSleep
 1045    ->  SendTime1 = Now,
 1046        debug(stomp(heartbeat), 'sending a heartbeat message at ~p', [Now]),
 1047        send_frame(Connection, heartbeat, _{})
 1048    ;   SendTime1 = SendTime
 1049    ),
 1050    (   Now - ReceiveTime > ReceiveSleep
 1051    ->  ReceiveTime1 = Now,
 1052        check_receive_heartbeat(Connection, Now, ReceiveSleep)
 1053    ;   ReceiveTime1 = ReceiveTime
 1054    ),
 1055    heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime,
 1056                   SendTime1, ReceiveTime1).
 1057
 1058check_receive_heartbeat(Connection, Now, ReceiveSleep) :-
 1059    query_connection_property(Connection, received_heartbeat, ReceivedHeartbeat),
 1060    DiffReceive is Now - ReceivedHeartbeat,
 1061    (   DiffReceive > ReceiveSleep
 1062    ->  debug(stomp(heartbeat),
 1063              'Heartbeat timeout: diff_receive=~p, time=~p, lastrec=~p',
 1064              [DiffReceive, Now, ReceivedHeartbeat]),
 1065        notify(Connection, heartbeat_timeout)
 1066    ;   true
 1067    ).
 notify(+Connection, +FrameType) is det
 notify(+Connection, +FrameType, +Header) is det
 notify(+Connection, +FrameType, +Header, +Body) is det
Call the callback using FrameType.
 1075notify(Connection, FrameType) :-
 1076    notify(Connection, FrameType, stomp_header{cmd:FrameType}, "").
 1077
 1078notify(Connection, FrameType, Header) :-
 1079    notify(Connection, FrameType, Header, "").
 1080
 1081notify(Connection, FrameType, Header, Body) :-
 1082    query_connection_property(Connection, callback, Callback),
 1083    Error = error(Formal,_),
 1084    (   catch_with_backtrace(
 1085            call(Callback, FrameType, Connection, Header, Body),
 1086            Error, true)
 1087    ->  (   var(Formal)
 1088        ->  true
 1089        ;   print_message(warning, Error)
 1090        )
 1091    ;   true
 1092    ).
 1093
 1094update_connection_mapping(Connection, Dict) :-
 1095    dict_pairs(Dict, _, Pairs),
 1096    maplist(update_connection_property(Connection), Pairs).
 1097
 1098update_connection_property(Connection, Name-Value) :-
 1099    update_connection_property(Connection, Name, Value).
 1100
 1101update_connection_property(Connection, Name, Value) :-
 1102    transaction(update_connection_property_(Connection, Name, Value)).
 1103
 1104update_connection_property_(Connection, Name, Value) :-
 1105    retractall(connection_property(Connection, Name, _)),
 1106    asserta(connection_property(Connection, Name, Value)).
 1107
 1108query_connection_property(Connection, Name, Value) :-
 1109    (   nonvar(Name),
 1110        nonvar(Connection)
 1111    ->  connection_property(Connection, Name, Value),
 1112        !
 1113    ;   connection_property(Connection, Name, Value)
 1114    ).
 1115
 1116
 1117		 /*******************************
 1118		 *            MESSAGES		*
 1119		 *******************************/
 1120
 1121:- multifile prolog:error_message//1. 1122
 1123prolog:error_message(stomp_error(connect, Connection, error(Message))) -->
 1124    { connection_property(Connection, address, Address) },
 1125    [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Message] ].
 1126prolog:error_message(stomp_error(connect, Connection, Detail)) -->
 1127    { connection_property(Connection, address, Address) },
 1128    [ 'STOMPL: Failed to connect to ~p: ~p'-[Address, Detail] ]