This module provides a STOMP (Simple (or Streaming) Text Orientated
Messaging Protocol) client. This client is based on work by Hongxin
Liang. The current version is a major rewrite, both changing the API and
the low-level STOMP frame (de)serialization.
The predicate stomp_connection/5 is used to register a connection. The
connection is established by stomp_connect/1, which is lazily called
from any of the predicates that send a STOMP frame. After establishing
the connection two threads are created. One receives STOMP frames and
the other manages and watches the heart beat.
Threading
Upon receiving a frame the callback registered with stomp_connection/5
is called in the context of the receiving thread. More demanding
applications may decide to send incomming frames to a SWI-Prolog message
queue and have one or more worker threads processing the input.
Alternatively, frames may be inspected by the receiving thread and
either processed immediately or be dispatched to either new or running
threads. The best scenario depends on the message rate, processing time
and concurrency of the Prolog application.
All message sending predicates of this library are thread safe. If two
threads send a frame to the same connection the library ensures that
both frames are properly serialized.
Reconnecting
By default this library tries to establish the connection and the user
gets notified by means of a disconnected
pseudo frame that the
connection is lost. Using the Options argument of stomp_connection/6 the
system can be configured to try and keep connecting if the server is not
available and reconnect if the connection is lost. See the pong.pl
example.
- author
- - Hongxin Liang and Jan Wielemaker
- See also
- - http://stomp.github.io/index.html
- - https://github.com/jasonrbriggs/stomp.py
- license
- - BSD-2
- To be done
- - TSL support
- stomp_connection(+Address, +Host, +Headers, :Callback, -Connection) is det
- stomp_connection(+Address, +Host, +Headers, :Callback, -Connection, +Options) is det
- Create a connection reference. The connection is not set up yet by
this predicate. Callback is called on any received frame except for
heart beat frames as below.
call(Callback, Command, Connection, Header, Body)
Where command is one of the commands below. Header is a dict
holding the STOMP frame header, where all values are strings except
for the 'content-length'
key value which is passed as an integer.
Body is a string or, if the data is of the type
application/json
, a dict.
- connected
- A connection was established. Connection and Header are valid.
- disconnected
- The connection was lost. Only Connection is valid.
- message
- A message arrived. All three arguments are valid. Body is
a dict if the
content-type
of the message is
application/json
and a string otherwise.
- heartbeat
- A heartbeat was received. Only Connection is valid. This
callback is also called for each newline that follows a frame.
These newlines can be a heartbeat, but can also be additional
newlines that follow a frame.
- heartbeat_timeout
- No heartbeat was received. Only Connection is valid.
- error
- An error happened. All three arguments are valid and handled
as
message
.
Note that stomp_teardown/1 causes the receiving and heartbeat thread
to be signalled with abort/0.
Options processed:
- reconnect(+Bool)
- Try to reestablish the connection to the server if the
connection is lost. Default is
false
- connect_timeout(+Seconds)
- Maximum time to try and reestablish a connection. The
default is
600
(10 minutes).
- json_options(+Options)
- Options passed to json_read_dict/3 to translate
application/json
content to Prolog. Default is []
.
- Arguments:
-
Address | - is a valid address for tcp_connect/3. Normally a term
Host:Port, e.g., localhost:32772 . |
Host | - is a path denoting the STOMP host. Often just / . |
Headers | - is a dict with STOMP headers used for the CONNECT
request. |
Connection | - is an opaque ground term that identifies the
connection. |
- stomp_connection_property(?Connection, ?Property) is nondet
- True when Property, is a property of Connection. Defined properties
are:
- address(Address)
- callback(Callback)
- host(Host)
- headers(Headers)
- reconnect(Bool)
- connect_timeout(Seconds)
- All the above properties result from the stomp_connection/6
registration.
- receiver_thread_id(Thread)
- stream(Stream)
- heartbeat_thread_id(Thread)
- received_heartbeat(TimeStamp)
- These describe an active STOMP connection.
- stomp_destroy_connection(+Connection)
- Destroy a connection. If it is active, first use stomp_teardown/1 to
disconnect.
- stomp_setup(+Connection, +Options) is det
- Set up the actual socket connection and start receiving thread. This
is a no-op if the connection has already been created. The Options
processed are below. Other options are passed to tcp_connect/3.
- timeout(+Seconds)
- If tcp_connect/3 fails, retry until the timeout is reached.
If Seconds is
inf
or infinite
, keep retrying forever.
- stomp_teardown(+Connection) is semidet
- Tear down the socket connection, stop receiving thread and heartbeat
thread (if applicable). The registration of the connection as
created by stomp_connection/5 is preserved and the connection may be
reconnected using stomp_connect/1.
- stomp_reconnect(+Connection) is det
- Teardown the connection and try to reconnect.
- stomp_connect(+Connection) is det
- stomp_connect(+Connection, +Options) is det
- Setup the connection. First ensures a socket connection and if this
is new send a
CONNECT
frame. Protocol version and heartbeat
negotiation will be handled. STOMP
frame is not used for
backward compatibility.
This predicate waits for the connection handshake to have completed.
Currently it waits for a maximum of 10 seconds after establishing
the socket for the server reply.
Calling this on an established connection has no effect.
- Errors
- -
stomp_error(connect, Connection, Detail)
if no connection
could be established.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame).
- stomp_send(+Connection, +Destination, +Headers, +Body) is det
- Send a
SEND
frame. If content-type
is not provided,
text/plain
will be used. content-length
will be filled in
automatically.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#SEND
- stomp_send_json(+Connection, +Destination, +Headers, +JSON) is det
- Send a
SEND
frame. JSON
can be either a JSON term or a dict.
content-type
is filled in automatically as application/json
and content-length
will be filled in automatically as well.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#SEND
- stomp_subscribe(+Connection, +Destination, +Id, +Headers) is det
- Send a
SUBSCRIBE
frame.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#SUBSCRIBE
- stomp_unsubscribe(+Connection, +Id) is det
- Send an
UNSUBSCRIBE
frame.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#UNSUBSCRIBE
- stomp_ack(+Connection, +MessageId, +Headers) is det
- Send an
ACK
frame. See stomp_ack/2 for simply passing the header
received with the message we acknowledge.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#ACK
- stomp_nack(+Connection, +MessageId, +Headers) is det
- Send a
NACK
frame. See stomp_nack/2 for simply passing the
header received with the message we acknowledge.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#NACK
- stomp_ack(+Connection, +MsgHeader) is det
- stomp_nack(+Connection, +MsgHeader) is det
- Reply with an ACK or NACK based on the received message header. On a
STOMP 1.1 request we get an
ack
field in the header and reply with
an id
. For STOMP 1.2 we reply with the message-id
and
subscription
that we received with the message.
- stomp_begin(+Connection, +Transaction) is det
- Send a
BEGIN
frame.
@see http://stomp.github.io/stomp-specification-1.2.html#BEGIN
- stomp_commit(+Connection, +Transaction) is det
- Send a
COMMIT
frame.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#COMMIT
- stomp_abort(+Connection, +Transaction) is det
- Send a
ABORT
frame.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#ABORT
- stomp_transaction(+Connection, :Goal) is semidet
- Run Goal as once/1, tagging all
SEND
messages inside the
transaction with the transaction id. If Goal fails or raises an
exception the transaction is aborted. Failure or exceptions cause
the transaction to be aborted using stomp_abort/2, after which the
result is forwarded.
- stomp_disconnect(+Connection, +Headers) is det
- Send a
DISCONNECT
frame. If the connection has the reconnect
property set to true
, this will be reset to disconnected
to
avoid reconnecting. A subsequent stomp_connect/2 resets the
reconnect status.
- See also
- - http://stomp.github.io/stomp-specification-1.2.html#DISCONNECT