1:- module(stompl, [
    2                   connection/2,    % +Address, -Connection
    3                   connection/3,    % +Address, +CallbackDict, -Connection
    4                   setup/1,         % +Connection
    5                   teardown/1,      % +Connection
    6                   connect/3,       % +Connection, +Host, +Headers
    7                   send/3,          % +Connection, +Destination, +Headers
    8                   send/4,          % +Connection, +Destination, +Headers, +Body
    9                   send_json/4,     % +Connection, +Destination, +Headers, +JSON
   10                   subscribe/4,     % +Connection, +Destination, +Id, +Headers
   11                   unsubscribe/2,   % +Connection, +Id
   12                   ack/3,           % +Connection, +MessageId, +Headers
   13                   nack/3,          % +Connection, +MessageId, +Headers
   14                   begin/2,         % +Connection, +Transaction
   15                   commit/2,        % +Connection, +Transaction
   16                   abort/2,         % +Connection, +Transaction
   17                   disconnect/2     % +Connection, +Headers
   18                  ]).

STOMP client.

A STOMP 1.0 and 1.1 compatible client.

stomp.py is used as a reference for the implementation.

author
- Hongxin Liang
See also
- http://stomp.github.io/index.html
- https://github.com/jasonrbriggs/stomp.py */
license
- Apache License Version 2.0
   32:- use_module(library(uuid)).   33:- use_module(library(socket)).   34:- use_module(library(apply)).   35:- use_module(library(http/json)).   36
   37:- dynamic
   38    connection_mapping/2.
 connected(+Address, -Connected) is det
 connected(+Address, +CallbackDict, -Connected) is det
Create a connection reference. The connection is not set up yet by this predicate. If CallbackDict is provided, it will be associated with the connection reference. Valid keys of the dict are:
on_connected
on_disconnected
on_message
on_heartbeat_timeout
on_error

When registering callbacks, both module name and predicate name shall be provided in the format of module:predicate. Valid predicate signatures for example could be:

example:on_connected_handler(Connection, Headers, Body)
example:on_disconnected_handler(Connection)
example:on_message_handler(Connection, Headers, Body)
example:on_heartbeat_timeout_handler(Connection)
example:on_error_handler(Connection, Headers, Body)
   66connection(Address, Connection) :-
   67    connection(Address, Connection, _{}).
   68
   69connection(Address, CallbackDict, Connection) :-
   70    uuid(Connection),
   71    asserta(connection_mapping(Connection,
   72                               _{
   73                                 address: Address,
   74                                 callbacks: CallbackDict
   75                                })).
 setup(+Connection) is semidet
Set up the actual socket connection and start receiving thread.
   82setup(Connection) :-
   83    get_mapping_data(Connection, address, Address),
   84    tcp_connect(Address, Stream, []),
   85    set_stream(Stream, buffer_size(4096)),
   86    thread_create(receive(Connection, Stream), ReceiverThreadId, []),
   87    update_connection_mapping(Connection, _{receiver_thread_id: ReceiverThreadId, stream:Stream}).
 teardown(+Connection) is semidet
Tear down the socket connection, stop receiving thread and heartbeat thread (if applicable).
   94teardown(Connection) :-
   95    get_mapping_data(Connection, receiver_thread_id, ReceiverThreadId),
   96    (   \+ thread_self(ReceiverThreadId),
   97        thread_property(ReceiverThreadId, status(running))
   98    ->  debug(stompl, 'attempting to kill receive thread ~w', [ReceiverThreadId]),
   99        thread_signal(ReceiverThreadId, throw(kill))
  100    ;   true
  101    ),
  102    (   get_mapping_data(Connection, heartbeat_thread_id, HeartbeatThreadId)
  103    ->  (   thread_property(HeartbeatThreadId, status(running))
  104        ->  debug(stompl, 'attempting to kill heartbeat thread ~w', [HeartbeatThreadId]),
  105            thread_signal(HeartbeatThreadId, throw(kill))
  106        )
  107    ),
  108    get_mapping_data(Connection, stream, Stream),
  109    catch(close(Stream), _, true),
  110    debug(stompl, 'retract connection mapping', []),
  111    retract(connection_mapping(Connection, _))
  111.
  112
 connect(+Connectio, +Host, +Headers) is semidet
Send a CONNECT frame. Protocol version and heartbeat negotiation will be handled. STOMP frame is not used for backward compatibility. See here.
  120connect(Connection, Host, Headers) :-
  121    create_frame('CONNECT',
  122                 Headers.put(_{
  123                               'accept-version':'1.0,1.1', 
  124                               host:Host
  125                              }
  125),
  126                 '', Frame)
  126,
  127    (   Heartbeat = Headers.get('heart-beat')
  128    ->  update_connection_mapping(Connection, _{'heart-beat':Heartbeat})
  129    ;   true
  130    ),
  131    send0(Connection, Frame)
  131.
  132
 send(+Connection, +Destination, +Headers) is semidet
 send(+Connection, +Destination, +Headers, +Body) is semidet
Send a SEND frame. If content-type is not provided, text/plain will be used. content-length will be filled in automatically. See here.
  140send(Connection, Destination, Headers) :-
  141    send(Connection, Destination, Headers, '').
  142
  143send(Connection, Destination, Headers, Body) :-
  144    create_frame('SEND', Headers.put(destination, Destination), Body, Frame),
  145    send0(Connection, Frame).
 send_json(+Connection, +Destination, +Headers, +JSON) is semidet
Send a SEND frame. JSON can be either a JSON term or a dict. content-type is filled in automatically as application/json and content-length will be filled in automatically as well. See here.
  154send_json(Connection, Destination, Headers, JSON) :-
  155    atom_json_term(Body, JSON, [as(atom)]),
  156    create_frame('SEND',
  157                 Headers.put(_{
  158                               destination:Destination,
  159                               'content-type':'application/json'
  160                              }),
  161                 Body, Frame),
  162    send0(Connection, Frame).
 subscribe(+Connection, +Destination, +Id, +Headers) is semidet
Send a SUBSCRIBE frame. See here.
  169subscribe(Connection, Destination, Id, Headers) :-
  170    create_frame('SUBSCRIBE', Headers.put(_{destination:Destination, id:Id}), '', Frame),
  171    send0(Connection, Frame).
 unsubscribe(+Connection, +Id) is semidet
Send an UNSUBSCRIBE frame. See here.
  178unsubscribe(Connection, Id) :-
  179    create_frame('UNSUBSCRIBE', _{id:Id}, '', Frame),
  180    send0(Connection, Frame).
 ack(+Connection, +MessageId, +Headers) is semidet
Send an ACK frame. See here.
  187ack(Connection, MessageId, Headers) :-
  188    create_frame('ACK', Headers.put('message-id', MessageId), '', Frame),
  189    send0(Connection, Frame).
 nack(+Connection, +MessageId, +Headers) is semidet
Send a NACK frame. See here.
  196nack(Connection, MessageId, Headers) :-
  197    create_frame('NACK', Headers.put('message-id', MessageId), '', Frame),
  198    send0(Connection, Frame).
 begin(+Connection, +Transaction) is semidet
Send a BEGIN frame. See here.
  205begin(Connection, Transaction) :-
  206    create_frame('BEGIN', _{transaction:Transaction}, '', Frame),
  207    send0(Connection, Frame).
 commit(+Connection, +Transaction) is semidet
Send a COMMIT frame. See here.
  214commit(Connection, Transaction) :-
  215    create_frame('COMMIT', _{transaction:Transaction}, '', Frame),
  216    send0(Connection, Frame).
 abort(+Connection, +Transaction) is semidet
Send a ABORT frame. See here.
  223abort(Connection, Transaction) :-
  224    create_frame('ABORT', _{transaction:Transaction}, '', Frame),
  225    send0(Connection, Frame).
 disconnect(+Connection, +Headers) is semidet
Send a DISCONNECT frame. See here.
  232disconnect(Connection, Headers) :-
  233    create_frame('DISCONNECT', Headers, '', Frame),
  234    send0(Connection, Frame).
  235
  236send0(Connection, Frame) :-
  237    send0(Connection, Frame, true).
  238
  239send0(Connection, Frame, EndWithNull) :-
  240    (   EndWithNull
  241    ->  atom_concat(Frame, '\x00', Frame1)
  242    ;   Frame1 = Frame
  243    ),
  244    debug(stompl, 'frame to send~n~w', [Frame1]),
  245    get_mapping_data(Connection, stream, Stream),
  246    format(Stream, '~w', [Frame1]),
  247    flush_output(Stream).
  248
  249create_frame(Command, Headers, Body, Frame) :-
  250    (   Body \= ''
  251    ->  atom_length(Body, Length),
  252        atom_number(Length1, Length),
  253        Headers1 = Headers.put('content-length', Length1),
  254        (   \+ _ = Headers1.get('content-type')
  255        ->  Headers2 = Headers1.put('content-type', 'text/plain')
  256        ;   Headers2 = Headers1
  257        )
  258    ;   Headers2 = Headers
  259    ),
  260    create_header_lines(Headers2, HeaderLines),
  261    (   HeaderLines \= ''
  262    ->  atomic_list_concat([Command, HeaderLines], '\n', WithoutBody)
  263    ;   WithoutBody = Command
  264    ),
  265    atomic_list_concat([WithoutBody, Body], '\n\n', Frame).
  266
  267create_header_lines(Headers, HeaderLines) :-
  268    dict_pairs(Headers, _, Pairs),
  269    maplist(create_header_line, Pairs, HeaderLines0),
  270    atomic_list_concat(HeaderLines0, '\n', HeaderLines).
  271
  272create_header_line(K-V, HeaderLine) :-
  273    atomic_list_concat([K, V], ':', HeaderLine).
  274
  275receive(Connection, Stream) :-
  276    receive0(Connection, Stream, '').
  277
  278receive0(Connection, Stream, Buffered) :-
  279    (   catch(receive_frames(Stream, Frames, Buffered, Buffered1), E, true)
  280    ->  (   nonvar(E)
  281        ->  E = exception(disconnected),
  282            debug(stompl, 'disconnected', []),
  283            notify(Connection, disconnected)
  284        ;   debug(stompl, 'frames received~n~w', [Frames]),
  285            handle_frames(Connection, Frames),
  286            debug(stompl, 'frames handled', []),
  287            receive0(Connection, Stream, Buffered1)
  288        )
  289    ).
  290
  291receive_frames(Stream, Frames, Buffered0, Buffered) :-
  292    (   at_end_of_stream(Stream)
  293    ->  throw(exception(disconnected))
  294    ;   read_pending_input(Stream, Codes, [])
  295    ),
  296    atom_codes(Chars, Codes),
  297    debug(stompl, 'received~n~w', [Chars]),
  298    (   Chars = '\x0a'
  299    ->  Buffered = Buffered0,
  300        Frames = [Chars]
  301    ;   atom_concat(Buffered0, Chars, Buffered1),
  302        debug(stompl, 'current buffer~n~w', [Buffered1]),
  303        extract_frames(Frames, Buffered1, Buffered)
  304    ).
  305
  306extract_frames(Frames, Buffered0, Buffered) :-
  307    extract_frames0([], Frames0, Buffered0, Buffered),
  308    reverse(Frames0, Frames).
  309
  310extract_frames0(Frames, Frames, '', '') :- !.
  311extract_frames0(Frames, Frames, Buffered, Buffered) :-
  312    \+ sub_atom(Buffered, _, 1, _, '\x00'), !.
  313extract_frames0(Frames0, Frames, Buffered0, Buffered) :-
  314    sub_atom(Buffered0, FrameLength, 1, _, '\x00'), !,
  315    sub_atom(Buffered0, 0, FrameLength, _, Frame),
  316    (  sub_atom(Frame, PreambleLength, 2, _, '\n\n'), !
  317    -> (   check_frame(Frame, Buffered0, FrameLength, PreambleLength, Frame1, Buffered1)
  318       ->  Frames1 = [Frame1|Frames0],
  319           extract_frames0(Frames1, Frames, Buffered1, Buffered)
  320       ;   Frames = Frames0,
  321           Buffered = Buffered0
  322       )
  323    ;  Frames1 = [Frame|Frames0],
  324       Length is FrameLength + 1,
  325       sub_atom(Buffered0, Length, _, 0, Buffered1),
  326       extract_frames0(Frames1, Frames, Buffered1, Buffered)
  327    ).
  328
  329check_frame(Frame0, Buffered0, FrameLength, PreambleLength, Frame, Buffered) :-
  330    (   read_content_length(Frame0, ContentLength)
  331    ->  ContentOffset is PreambleLength + 2,
  332        FrameSize is ContentOffset + ContentLength,
  333        (   FrameSize > FrameLength
  334        ->  atom_length(Buffered0, Length),
  335            (   FrameSize < Length
  336            ->  sub_atom(Buffered0, 0, FrameSize, _, Frame),
  337                Length is FrameSize + 1,
  338                sub_atom(Buffered0, Length, _, 0, Buffered)
  339            )
  340        ;   Frame = Frame0,
  341            Length is FrameLength + 1,
  342            sub_atom(Buffered0, Length, _, 0, Buffered)
  343        )
  344    ;   Frame = Frame0,
  345        Length is FrameLength + 1,
  346        sub_atom(Buffered0, Length, _, 0, Buffered)
  347    ).
  348
  349read_content_length(Frame, Length) :-
  350    atomic_list_concat([_, Frame1], 'content-length:', Frame),
  351    atomic_list_concat([Length0|_], '\n', Frame1),
  352    atomic_list_concat(L, ' ', Length0),
  353    last(L, Length1),
  354    atom_number(Length1, Length).
  355
  356handle_frames(_, []) :- !.
  357handle_frames(Connection, [H|T]) :-
  358    parse_frame(H, ParsedFrame),
  359    debug(stompl, 'parsed frame~n~w', [ParsedFrame]),
  360    process_frame(Connection, ParsedFrame),
  361    handle_frames(Connection, T).
  362
  363parse_frame('\x0a', _{cmd:heartbeat}) :- !.
  364parse_frame(Frame, ParsedFrame) :-
  365    sub_atom(Frame, PreambleLength, 2, _, '\n\n'), !,
  366    sub_atom(Frame, 0, PreambleLength, _, Preamble),
  367    Begin is PreambleLength + 2,
  368    sub_atom(Frame, Begin, _, 0, Body),
  369    parse_headers(Preamble, Command, Headers),
  370    ParsedFrame = _{cmd:Command, headers:Headers, body:Body}.
  371
  372parse_headers(Preamble, Command, Headers) :-
  373    atomic_list_concat([Command|PreambleLines], '\n', Preamble),
  374    parse_headers0(PreambleLines, _{}, Headers).
  375
  376parse_headers0([], Headers, Headers) :- !.
  377parse_headers0([H|T], Headers0, Headers) :-
  378    atomic_list_concat([Key0, Value0], ':', H),
  379    replace(Key0, Key),
  380    (   \+ Headers0.get(Key)
  381    ->  sub_atom(Value0, _, _, 0, Value1),
  382        \+ sub_atom(Value1, 0, 1, _, ' '), !,
  383        replace(Value1, Value),
  384        Headers1 = Headers0.put(Key, Value)
  385    ;   Headers1 = Headers0
  386    )
  386,
  387    parse_headers0(T, Headers1, Headers)
  387.
  388
  389replace(A0, A) :-
  390    atomic_list_concat(L0, '\\n', A0),
  391    atomic_list_concat(L0, '\n', A1),
  392    atomic_list_concat(L1, '\\r', A1),
  393    atomic_list_concat(L1, '\r', A2),
  394    atomic_list_concat(L2, '\\\\', A2),
  395    atomic_list_concat(L2, '\\', A3),
  396    atomic_list_concat(L3, '\\c', A3),
  397    atomic_list_concat(L3, ':', A).
  398
  399process_frame(Connection, Frame) :-
  400    Frame.cmd = heartbeat, !,
  401    get_time(Now),
  402    debug(stompl, 'received heartbeat at ~w', [Now]),
  403    update_connection_mapping(Connection, _{received_heartbeat:Now}).
  404process_frame(Connection, Frame) :-
  405    downcase_atom(Frame.cmd, FrameType),
  406    (   FrameType = connected
  407    ->  start_heartbeat_if_required(Connection, Frame.headers)
  408    ;   true
  409    ),
  410    notify(Connection, FrameType, Frame).
  411
  412start_heartbeat_if_required(Connection, Headers) :-
  413    (   get_mapping_data(Connection, 'heart-beat', CHB),
  414        SHB = Headers.get('heart-beat')
  415    ->  start_heartbeat(Connection, CHB, SHB)
  416    ;   true
  417    ).
  418
  419start_heartbeat(Connection, CHB, SHB) :-
  420    extract_heartbeats(CHB, CX, CY),
  421    extract_heartbeats(SHB, SX, SY),
  422    calculate_heartbeats(CX-CY, SX-SY, X-Y),
  423    X-Y \= 0-0, !,
  424    debug(stompl, 'calculated heartbeats are ~w,~w', [X, Y]),
  425    SendSleep is X / 1000,
  426    ReceiveSleep is Y / 1000 + 2,
  427    (   SendSleep = 0
  428    ->  SleepTime = ReceiveSleep
  429    ;   (   ReceiveSleep = 0
  430        ->  SleepTime = SendSleep
  431        ;   SleepTime is gcd(SendSleep, ReceiveSleep) / 2
  432        )
  433    ),
  434    get_time(Now),
  435    thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, Now),
  436                  HeartbeatThreadId, []),
  437    update_connection_mapping(Connection,
  438                              _{
  439                                heartbeat_thread_id:HeartbeatThreadId,
  440                                received_heartbeat:Now
  441                               }).
  442start_heartbeat(_, _, _).
  443
  444extract_heartbeats(Heartbeat, X, Y) :-
  445    atomic_list_concat(L, ' ', Heartbeat),
  446    atomic_list_concat(L, '', Heartbeat1),
  447    atomic_list_concat([X0, Y0], ',', Heartbeat1),
  448    atom_number(X0, X),
  449    atom_number(Y0, Y).
  450
  451calculate_heartbeats(CX-CY, SX-SY, X-Y) :-
  452    (   CX \= 0, SY \= 0
  453    ->  X is max(CX, floor(SY))
  454    ;   X = 0
  455    ),
  456    (   CY \= 0, SX \= 0
  457    ->  Y is max(CY, floor(SX))
  458    ;   Y = 0
  459    ).
  460
  461heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, SendTime) :-
  462    sleep(SleepTime),
  463    get_time(Now),
  464    (   Now - SendTime > SendSleep
  465    ->  SendTime1 = Now,
  466        debug(stompl, 'sending a heartbeat message at ~w', [Now]),
  467        send0(Connection, '\x0a', false)
  468    ;   SendTime1 = SendTime
  469    ),
  470    get_mapping_data(Connection, received_heartbeat, ReceivedHeartbeat),
  471    DiffReceive is Now - ReceivedHeartbeat,
  472    (   DiffReceive > ReceiveSleep
  473    ->  debug(stompl,
  474              'heartbeat timeout: diff_receive=~w, time=~w, lastrec=~w',
  475              [DiffReceive, Now, ReceivedHeartbeat]),
  476        notify(Connection, heartbeat_timeout)
  477    ;   true
  478    ),
  479    heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, SendTime1).
  480
  481notify(Connection, FrameType) :-
  482    get_mapping_data(Connection, callbacks, CallbackDict),
  483    atom_concat(on_, FrameType, Key),
  484    (   Predicate = CallbackDict.get(Key)
  485    ->  debug(stompl, 'callback predicate ~w', [Predicate]),
  486        ignore(call(Predicate, Connection))
  487    ;   true
  488    ).
  489
  490notify(Connection, FrameType, Frame) :-
  491    get_mapping_data(Connection, callbacks, CallbackDict),
  492    atom_concat(on_, FrameType, Key),
  493    (   Predicate = CallbackDict.get(Key)
  494    ->  debug(stompl, 'callback predicate ~w', [Predicate]),
  495        ignore(call(Predicate, Connection, Frame.headers, Frame.body))
  496    ;   true
  497    ).
  498
  499get_mapping_data(Connection, Key, Value) :-
  500    connection_mapping(Connection, Dict),
  501    Value = Dict.get(Key).
  502
  503update_connection_mapping(Connection, Dict) :-
  504    connection_mapping(Connection, OldDict),
  505    retract(connection_mapping(Connection, OldDict)),
  506    asserta(connection_mapping(Connection, OldDict.put(Dict)))