|Did you know ...||Search Documentation:|
|redis_streams.pl -- Using Redis streams|
A Redis stream is a set of messages consisting of key-value pairs that are identified by a time and sequence number. Streams are powerful objects that can roughly be used for three purposes:
This library abstracts the latter two scenarios. The main predicates are
MAXLEN ~Count option to the
XADDcommand, capping the length of the stream. See also Redis as a message brokering system
maxlen(Count). If Id is unbound, generating the id is left to the server and Id is unified with the returned id. The returned id is a string consisting of the time stamp in milliseconds and a sequence number. See Redis docs for details.
XREADon one or more Streams on the server Redis. For each message that arrives, call broadcast/1, where Data is a dict representing the message.
broadcast(redis(Redis, Stream, Id, Data))
0to start get all messages from the epoch or
$to get messages starting with the last. Default is
Note that this predicate does not terminate. It is normally
executed in a thread. The following call listens to the streams
key2 on the default Redis server. Using
reconnect(true), the client will try to re-establish a connection if
the collection got lost.
?- redis_connect(default, C, [reconnect(true)]), thread_create(xlisten(C, [key1, key2], [start($)]), _, [detached(true)]).
XACKis sent to the server.
XREADGROUPto return with timeout when no messages arrive within Seconds. On a timeout, xidle_group/5 is called which will try to handle messages to other consumers pending longer than Seconds. Choosing the time depends on the application. Notably:
max_deliveries(Count)is exceeded. Note that the original receiver does not notice that the job is claimed and thus multiple consumers may ultimately answer the message.
XCLAIM) a message max Count times. Exceeding this calls xhook/2. Default Count is
redis(stop(Leave)), which is caught by xlisten_group/5.
XACK. From introduction to streams:
"So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter."