View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2020, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(redis_streams,
   36          [ xstream_set/3,              % +Redis, +Key, +Option
   37            xadd/4,                     % +Redis, +Key, ?Id, +Data:dict
   38            xlisten/3,                  % +Redis, +Streams, +Options
   39            xlisten_group/5,            % +Redis, +Group, +Consumer,
   40                                        % +Streams, +Options
   41            xconsumer_stop/1            % +Leave
   42          ]).   43:- use_module(library(redis)).   44:- use_module(library(error)).   45:- use_module(library(apply)).   46:- use_module(library(broadcast)).   47:- use_module(library(lists)).   48:- use_module(library(option)).   49:- use_module(library(debug)).   50
   51:- meta_predicate
   52    xlisten(+, +, 5, 5, +).   53
   54:- multifile
   55    xhook/2.                            % +Stream, +Event
   56
   57:- predicate_options(xlisten/3, 3,
   58                     [ count(nonneg),
   59                       start(one_of([$,0])),
   60                       starts(list)
   61                     ]).   62:- predicate_options(xlisten_group/5, 5,
   63                     [ block(number),
   64                       max_deliveries(nonneg),
   65                       max_claim(nonneg)
   66                     ]).

Using Redis streams

A Redis stream is a set of messages consisting of key-value pairs that are identified by a time and sequence number. Streams are powerful objects that can roughly be used for three purposes:

This library abstracts the latter two scenarios. The main predicates are

See also
- https://redis.io/topics/streams-intro */
   96:- dynamic
   97    xstream_option/3.
 xstream_set(+Redis, +Key, +Option)
Set an option on for Key on Redis. Currently supports:
maxlen(+Count)
Make xadd/4 add a MAXLEN ~ Count option to the XADD command, capping the length of the stream. See also Redis as a message brokering system
  108xstream_set(Redis, KeyS, Option) :-
  109    must_be(atom, Redis),
  110    atom_string(Key, KeyS),
  111    valid_option(Option),
  112    functor(Option, Name, Arity),
  113    functor(Gen, Name, Arity),
  114    retractall(xstream_option(Redis, Key, Gen)),
  115    asserta(xstream_option(Redis, Key, Option)).
  116
  117valid_option(Option) :-
  118    stream_option(Option),
  119    !.
  120valid_option(Option) :-
  121    domain_error(redis_stream_option, Option).
  122
  123stream_option(maxlen(X)) :- must_be(integer, X).
 xadd(+Redis, +Key, ?Id, +Data:dict) is det
Add a message to a the stream Key on Redis. The length of the stream can be capped using the xstream_set/3 option maxlen(Count). If Id is unbound, generating the id is left to the server and Id is unified with the returned id. The returned id is a string consisting of the time stamp in milliseconds and a sequence number. See Redis docs for details.
  135xadd(DB, Key, Id, Dict) :-
  136    redis_array_dict(Array, _, Dict),
  137    (   var(Id)
  138    ->  IdIn = '*'
  139    ;   IdIn = Id
  140    ),
  141    (   xstream_option(DB, Key, maxlen(MaxLen))
  142    ->  Command =.. [xadd, Key, maxlen, ~, MaxLen, IdIn|Array]
  143    ;   Command =.. [xadd, Key, IdIn|Array]
  144    ),
  145    redis(DB, Command, Reply),
  146    return_id(Id, Reply).
  147
  148return_id(Id, Reply) :-
  149    var(Id),
  150    !,
  151    Id = Time-Seq,
  152    split_string(Reply, "-", "", [TimeS,SeqS]),
  153    number_string(Time, TimeS),
  154    number_string(Seq, SeqS).
  155return_id(_, _).
  156
  157
  158		 /*******************************
  159		 *           SUBSCRIBE		*
  160		 *******************************/
 xlisten(+Redis, +Streams, +Options)
Listen using XREAD on one or more Streams on the server Redis. For each message that arrives, call broadcast/1, where Data is a dict representing the message.
broadcast(redis(Redis, Stream, Id, Data))

Options:

count(+Count)
Process at most Count messages per stream for each request.
start(+Start)
Normally either 0 to start get all messages from the epoch or $ to get messages starting with the last. Default is $.
starts(+List)
May be used as an alternative to the start/1 option to specify the start for each stream. This may be used to restart listening if the application remembers the last processed id.

Note that this predicate does not terminate. It is normally executed in a thread. The following call listens to the streams key1 and key2 on the default Redis server. Using reconnect(true), the client will try to re-establish a connection if the collection got lost.

?- redis_connect(default, C, [reconnect(true)]),
   thread_create(xlisten(C, [key1, key2], [start($)]),
                 _, [detached(true)]).
Arguments:
Redis- is either a Redis server name (see redis_server/3) or an open connection. If it is a server name, a new connection is opened that is closed if xlisten/3 completes.
See also
- redis_subscribe/2 implements the classical pub/sub system of Redis that does not have any memory.
  201xlisten(Redis, Streams, Options) :-
  202    xlisten(Redis, Streams, xbroadcast, xidle, Options).
  203
  204xbroadcast(Redis, Stream, Id, Dict, _Options) :-
  205    redis_id(Redis, RedisId),
  206    catch(broadcast(redis(RedisId, Stream, Id, Dict)), Error,
  207          print_message(error, Error)).
  208
  209redis_id(redis(Id, _, _), Id) :-
  210    !.
  211redis_id(Id, Id).
  212
  213xidle(_Redis, _Streams, _Starts, _NewStarts, _Options).
 xlisten(+Redis, +Streams, +OnBroadCast, +OnIdle, +Options)
Generalized version of xlisten/3 that is provided two callbacks: one to handle a message and one after each time the underlying XREAD or XREADGROUP has returned and the messages are processed. These callbacks are called as follows:
call(OnBroadCast, +Redis, +Stream, +MessageId, +Dict)
call(OnIdle, +Redis, +Streams, +Starts, +NewStarts, +Options)

Both callbacks must succeeds and not leave any open choice points. Failure or exception causes xlisten/5 to stop.

  228xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  229    atom(Redis),
  230    !,
  231    setup_call_cleanup(
  232        redis_connect(Redis, Conn, [reconnect(true)|Options]),
  233        xlisten_(Conn, Streams, OnBroadcast, OnIdle, Options),
  234        redis_disconnect(Conn)).
  235xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  236    xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options).
  237
  238xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  239    xread_command(Streams, Starts0, CommandTempl, Options),
  240    listen_loop(Redis, Starts0, CommandTempl,
  241                OnBroadcast, OnIdle, Streams, Options).
  242
  243xread_command(Streams, Starts0, Starts-Command, Options) :-
  244    option(group(Group-Consumer), Options),
  245    !,
  246    xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
  247    Command =.. [xreadgroup, group, Group, Consumer | CmdArgs].
  248xread_command(Streams, Starts0, Starts-Command, Options) :-
  249    xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
  250    Command =.. [xread|CmdArgs].
  251
  252xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
  253    option(count(Count), Options),
  254    !,
  255    option(block(Block), Options, 0),
  256    BlockMS is integer(Block*1000),
  257    start_templ(Streams, Starts0, Starts, StreamArgs, Options),
  258    CmdArgs = [count, Count, block, BlockMS, streams | StreamArgs].
  259xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
  260    option(block(Block), Options, 0),
  261    BlockMS is integer(Block*1000),
  262    start_templ(Streams, Starts0, Starts, StreamArgs, Options),
  263    CmdArgs = [block, BlockMS, streams | StreamArgs].
  264
  265start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
  266    option(starts(Starts0), Options),
  267    !,
  268    length(Streams, Len),
  269    length(Starts, Len),
  270    length(Starts0, LenS),
  271    (   LenS =:= Len
  272    ->  true
  273    ;   domain_error(xread_starts, Starts0)
  274    ),
  275    append(Streams, Starts, StreamArgs).
  276start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
  277    option(start(Start), Options, $),
  278    !,
  279    length(Streams, Len),
  280    length(Starts, Len),
  281    length(Starts0, Len),
  282    maplist(=(Start), Starts0),
  283    append(Streams, Starts, StreamArgs).
  284
  285listen_loop(Redis, Starts, CommandTempl, OnBroadcast, OnIdle, Streams, Options) :-
  286    copy_term(CommandTempl, Starts-Command),
  287    (   redis(Redis, Command, Reply),
  288        Reply \== nil
  289    ->  dispatch_streams(Reply, Redis, Streams, Starts, NewStarts,
  290                         OnBroadcast, OnIdle, Options)
  291    ;   NewStarts = Starts
  292    ),
  293    call(OnIdle, Redis, Streams, Starts, NewStarts, Options),
  294    listen_loop(Redis, NewStarts, CommandTempl,
  295                OnBroadcast, OnIdle, Streams, Options).
  296
  297dispatch_streams([], _, _, Starts, NewStarts, _, _, _) :-
  298    maplist(copy_start, Starts, NewStarts).
  299dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
  300                 OnBroadcast, OnIdle, Options) :-
  301    stream_tuple(Tuple, StreamS, []),
  302    atom_string(Stream, StreamS),
  303    !,                                  % xreadgroup: no more old pending stuff
  304    set_start(Stream, _Start, >, Streams, Starts, NewStarts),
  305    dispatch_streams(T, Redis, Streams, Starts, NewStarts,
  306                     OnBroadcast, OnIdle, Options).
  307dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
  308                 OnBroadcast, OnIdle, Options) :-
  309    stream_tuple(Tuple, StreamS, Messages),
  310    atom_string(Stream, StreamS),
  311    set_start(Stream, Start, NewStart, Streams, Starts, NewStarts),
  312    dispatch_messages(Messages, Stream, Redis, Start, NewStart,
  313                      OnBroadcast, Options),
  314    dispatch_streams(T, Redis, Streams, Starts, NewStarts,
  315                     OnBroadcast, OnIdle, Options).
  316
  317stream_tuple(Stream-Messages, Stream, Messages) :- !.
  318stream_tuple([Stream,Messages], Stream, Messages).
  319
  320set_start(Stream, Old, New, [Stream|_], [Old|_], [New|_]) :-
  321    !.
  322set_start(Stream, Old, New, [_|Streams], [_|OldStarts], [_|NewStarts]) :-
  323    set_start(Stream, Old, New, Streams, OldStarts, NewStarts).
  324
  325copy_start(Old, New) :-
  326    (   var(New)
  327    ->  Old = New
  328    ;   true
  329    ).
 dispatch_messages(+Messages, +Stream, +Redis, +Start0, -Start) is det
  333dispatch_messages([], _, _, Start, Start, _, _).
  334dispatch_messages([[Start,Data]|T], Stream, Redis, Start0, NewStart,
  335                  OnBroadcast, Options) :-
  336    dispatch_message(Data, Stream, Redis, Start, OnBroadcast, Options),
  337    join_starts(Start0, Start, Start1),
  338    dispatch_messages(T, Stream, Redis, Start1, NewStart, OnBroadcast, Options).
  339
  340dispatch_message(Data, Stream, Redis, Id, OnBroadcast, Options) :-
  341    (   Data == nil                     % when does this happen?
  342    ->  Dict = redis{}
  343    ;   redis_array_dict(Data, redis, Dict)
  344    ),
  345    call(OnBroadcast, Redis, Stream, Id, Dict, Options).
  346
  347join_starts(>, _Start, >) :-
  348    !.
  349join_starts(_Start0, Start, Start).
  350
  351		 /*******************************
  352		 *           CONSUMERS		*
  353		 *******************************/
 xlisten_group(+Redis, +Group, +Consumer, +Streams, +Options)
Listen as Consumer to Group. This is similar to xlisten/3, with the following differences:

Options processed:

block +Seconds
Causes XREADGROUP to return with timeout when no messages arrive within Seconds. On a timeout, xidle_group/5 is called which will try to handle messages to other consumers pending longer than Seconds. Choosing the time depends on the application. Notably:
  • Using a time shorter than the required processing time will make the job migrate from consumer to consumer until max_deliveries(Count) is exceeded. Note that the original receiver does not notice that the job is claimed and thus multiple consumers may ultimately answer the message.
  • Using a too long time causes an unnecessarily long delay if a node fails.
max_deliveries(+Count)
Re-deliver (using XCLAIM) a message max Count times. Exceeding this calls xhook/2. Default Count is 3.
max_claim(+Count)
Do not claim more than Count messages during a single idle action. Default is 10.
  387xlisten_group(Redis, Group, Consumer, Streams, Options) :-
  388    catch(xlisten(Redis, Streams, xbroadcast_group, xidle_group,
  389                  [ group(Group-Consumer),
  390                    start(0)
  391                  | Options
  392                  ]),
  393          redis(stop(Leave)),
  394          true),
  395    (   Leave == true
  396    ->  xleave_group(Redis, Group, Consumer, Streams)
  397    ;   true
  398    ).
  399
  400xbroadcast_group(Connection, Stream, Id, Dict, Options) :-
  401    option(group(Group-Consumer), Options),
  402    redis_id(Connection, RedisId),
  403    (   catch(broadcast_request(redis_consume(Stream, Dict,
  404                                              _{redis:RedisId,
  405                                                message:Id,
  406                                                group:Group,
  407                                                consumer:Consumer})),
  408              Error, xbroadcast_error(Error, Connection, Stream, Group, Id))
  409    ->  redis(Connection, xack(Stream, Group, Id))
  410    ;   true
  411    ).
  412
  413xbroadcast_error(redis(stop(Unregister)), Connection, Stream, Group, Id) :-
  414    !,
  415    redis(Connection, xack(Stream, Group, Id), _),
  416    throw(redis(stop(Unregister))).
  417xbroadcast_error(Error, _Connection, _Stream, _Group, _Id) :-
  418    print_message(error, Error),
  419    fail.
 xidle_group(+Redis, +Streams, +Starts, +NewStarts, +Options) is det
Called after XREADGROUP returns and the returned messages (if any) have been processed. If Start == NewStarts no messages have been processed, indicating a timeout.

This implementation looks for idle messages on other consumer and will try to claim them.

To be done
- : max time to work on other consumers messages?
  432xidle_group(Redis, Streams, Starts, Starts, Options) :- % Idle time
  433    !,
  434    option(group(Group-_Consumer), Options),
  435    claim(Streams, Redis, Group, 0, _Claimed, Options).
  436xidle_group(_Redis, _Streams, _Starts, _NewStarts, _Options).
  437
  438claim([], _, _, Claimed, Claimed, _).
  439claim([Stream|ST], Redis, Group, Claimed0, Claimed, Options) :-
  440    claim_for_stream(Redis, Stream, Group, Claimed0, Claimed1, Options),
  441    claim(ST, Redis, Group, Claimed1, Claimed, Options).
  442
  443claim_for_stream(Redis, Stream, Group, Claimed0, Claimed, Options) :-
  444    check_limit_claimed(Claimed0, Options),
  445    redis(Redis, xpending(Stream, Group), [Count,_Oldest,_Newest, Cons]),
  446    Count > 0,
  447    !,
  448    debug(redis(claim), '~D pending messages on ~p for ~p (Consumers = ~p)',
  449          [Count, Stream, Group, Cons]),
  450    claim_for_stream_for_consumers(Cons, Redis, Stream, Group,
  451                                   Claimed0, Claimed, Options).
  452claim_for_stream(_Redis, _Stream, _Group, Claimed, Claimed, _Options).
  453
  454claim_for_stream_for_consumers([], _Redis, _Stream, _Group,
  455                               Claimed, Claimed, _Options).
  456claim_for_stream_for_consumers([C|T], Redis, Stream, Group,
  457                               Claimed0, Claimed, Options) :-
  458    claim_for_stream_for_consumer(Redis, Stream, Group, C,
  459                                  Claimed0, Claimed1, Options),
  460    claim_for_stream_for_consumers(T, Redis, Stream, Group,
  461                                   Claimed1, Claimed, Options).
  462
  463claim_for_stream_for_consumer(Redis, Stream, Group, [Consumer,_Pending],
  464                              Claimed0, Claimed, Options) :-
  465    redis(Redis, xpending(Stream, Group, -, +, 10, Consumer), Reply),
  466    claim_messages(Reply, Redis, Stream, Group, Claimed0, Claimed, Options).
  467
  468claim_messages([], _Redis, _Stream, _Group, Claimed, Claimed, _Options).
  469claim_messages([H|T], Redis, Stream, Group, Claimed0, Claimed, Options) :-
  470    debug(redis(claim), 'Found pending ~p', [H]),
  471    claim_message(H, Redis, Stream, Group, Claimed0, Claimed1, Options),
  472    claim_messages(T, Redis, Stream, Group, Claimed1, Claimed, Options).
  473
  474claim_message([Id,Consumer,Idle,Delivered], Redis, Stream, Group,
  475              Claimed0, Claimed, Options) :-
  476    option(block(Block), Options),
  477    BlockMS is integer(Block*1000),
  478    Idle > BlockMS,
  479    check_limit_deliveries(Redis, Stream, Delivered, Id, Options),
  480    check_limit_claimed(Claimed0, Options),
  481    option(group(Group-Me), Options),
  482    debug(redis(claim), '~p: trying to claim ~p from ~p (idle ~D)',
  483          [Me, Id, Consumer, Idle]),
  484    redis(Redis, xclaim(Stream, Group, Me, BlockMS, Id), ClaimedMsgs),
  485    !,
  486    Claimed is Claimed0 + 1,
  487    debug(redis(claimed), '~p: claimed ~p', [Me, ClaimedMsgs]),
  488    dispatch_claimed(ClaimedMsgs, Redis, Stream, Options).
  489claim_message(_Msg, _Redis, _Stream, _Group, Claimed, Claimed, _Options).
  490
  491dispatch_claimed([], _Redis, _Stream, _Options).
  492dispatch_claimed([[MsgId,MsgArray]|Msgs], Redis, Stream, Options) :-
  493    redis_array_dict(MsgArray, _, Dict),
  494    xbroadcast_group(Redis, Stream, MsgId, Dict, Options),
  495    dispatch_claimed(Msgs, Redis, Stream, Options).
 check_limit_deliveries(+Redis, +Stream, +Delivered, +Id, +Options)
If a message gets delivered to several nodes and none of the nodes is able to process it, we should stop trying to do so at some point because the failure is most likely due to persistent error and piling up such messages will harm the cluster.
  505check_limit_deliveries(_Redis, _Stream, Delivered, _Id, Options) :-
  506    option(max_deliveries(Max), Options, 3),
  507    Delivered =< Max,
  508    !.
  509check_limit_deliveries(Redis, Stream, Delivered, Id, Options) :-
  510    option(group(Group-_Me), Options),
  511    (   xhook(Stream, delivery_failed(Id,Group,Delivered))
  512    ->  true
  513    ;   print_message(warning, redis(delivery_failed(Id,Group,Delivered))),
  514        redis(Redis, xack(Stream, Group, Id))
  515    ),
  516    fail.
  517
  518check_limit_claimed(Claimed0, Options) :-
  519    option(max_claim(Max), Options, 10),
  520    Claimed0 < Max.
 xleave_group(+Redis, +Group, +Consumer, +Streams) is det
Remove Consumer from Group.
To be done
- XGROUP DELCONSUMER only takes a single stream. Why?
  529xleave_group(Redis, Group, Consumer, [Stream|_]) :-
  530    redis(Redis, xgroup(delconsumer, Stream, Group, Consumer), _).
 xconsumer_stop(+Leave)
May be called from a consumer listener to stop the consumer. This predicate throws the exception redis(stop(Leave)), which is caught by xlisten_group/5.
  538xconsumer_stop(Leave) :-
  539    throw(redis(stop(Leave))).
  540
  541
  542		 /*******************************
  543		 *             HOOKS		*
  544		 *******************************/
 xhook(+Stream, +Event)
This multifile predicate is called on certain stream events. Defined events are:
delivery_failed(Id, Group, Delivered)
A message was delivered more than specified by max_deliveries/1 of xlisten_group/5. Id is the message id, Group the group and Delivered the current delivery count. If the hooks fails, the message is acknowledged using XACK. From introduction to streams:
"So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter."
  565		 /*******************************
  566		 *            MESSAGES		*
  567		 *******************************/
  568
  569:- multifile prolog:message//1.  570
  571prolog:message(redis(Message)) -->
  572    [ 'REDIS: '-[] ],
  573    redis_message(Message).
  574
  575redis_message(delivery_failed(Id,Group,Delivered)) -->
  576    [ 'Failed to deliver ~p to group ~p (tried ~D times)'-
  577      [Id, Group, Delivered]
  578    ]