View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2024, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3,    % +Redis, +Command, -Properties
   75
   76            sentinel_slave/4            % +ServerId, +Pool, -Slave, +Options
   77          ]).   78:- autoload(library(socket), [tcp_connect/3]).   79:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   80:- autoload(library(broadcast), [broadcast/1]).   81:- autoload(library(error),
   82            [ must_be/2,
   83	      type_error/2,
   84              instantiation_error/1,
   85              uninstantiation_error/1,
   86              existence_error/2,
   87              existence_error/3
   88            ]).   89:- autoload(library(lazy_lists), [lazy_list/2]).   90:- autoload(library(lists), [append/3, member/2]).   91:- autoload(library(option), [merge_options/3, option/2,
   92			      option/3, select_option/4]).   93:- autoload(library(pairs), [group_pairs_by_key/2]).   94:- autoload(library(time), [call_with_time_limit/2]).   95:- use_module(library(debug), [debug/3, assertion/1]).   96:- use_module(library(settings), [setting/4, setting/2]).   97:- if(exists_source(library(ssl))).   98:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   99:- endif.  100
  101:- use_foreign_library(foreign(redis4pl)).  102
  103:- setting(max_retry_count, nonneg, 8640, % one day
  104           "Max number of retries").  105:- setting(max_retry_wait, number, 10,
  106           "Max time to wait between recovery attempts").  107:- setting(sentinel_timeout, number, 0.2,
  108	   "Time to wait for a sentinel").  109
  110:- predicate_options(redis_server/3, 3,
  111                     [ pass_to(redis:redis_connect/3, 3)
  112                     ]).  113:- predicate_options(redis_connect/3, 3,
  114                     [ reconnect(boolean),
  115                       user(atom),
  116                       password(atomic),
  117                       version(between(2,3))
  118                     ]).  119:- predicate_options(redis_disconnect/2, 2,
  120                     [ force(boolean)
  121                     ]).  122:- predicate_options(redis_scan/3, 3,
  123                     [ match(atomic),
  124                       count(nonneg),
  125                       type(atom)
  126                     ]).  127% Actually not passing, but the same
  128:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  129:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  130:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  131
  132
  133/** <module> Redis client
  134
  135This library is a client  to   [Redis](https://redis.io),  a popular key
  136value store to  deal  with  caching   and  communication  between  micro
  137services.
  138
  139In the typical use case we register  the   details  of one or more Redis
  140servers using redis_server/3. Subsequenly, redis/2-3   is  used to issue
  141commands on the server.  For example:
  142
  143```
  144?- redis_server(default, redis:6379, [password("secret")]).
  145?- redis(default, set(user, "Bob")).
  146?- redis(default, get(user), User).
  147User = "Bob"
  148```
  149*/
  150
  151:- dynamic server/3.  152
  153:- dynamic ( connection/2,              % ServerName, Stream
  154	     sentinel/2			% Pool, Address
  155           ) as volatile.  156
  157%!  redis_server(+ServerName, +Address, +Options) is det.
  158%
  159%   Register a redis server without  connecting   to  it. The ServerName
  160%   acts as a lazy connection alias.  Initially the ServerName `default`
  161%   points at `localhost:6379` with no   connect  options. The `default`
  162%   server is used for redis/1 and redis/2 and may be changed using this
  163%   predicate.  Options are described with redis_connect/3.
  164%
  165%   Connections  established  this  way  are  by  default  automatically
  166%   reconnected if the connection  is  lost   for  some  reason unless a
  167%   reconnect(false) option is specified.
  168
  169redis_server(Alias, Address, Options) :-
  170    must_be(ground, Alias),
  171    retractall(server(Alias, _, _)),
  172    asserta(server(Alias, Address, Options)).
  173
  174server(default, localhost:6379, []).
  175
  176%!  redis_connect(-Connection) is det.
  177%!  redis_connect(+Address, -Connection, +Options) is det.
  178%!  redis_connect(-Connection, +Host, +Port) is det.
  179%
  180%   Connect to a redis server. The  main mode is redis_connect(+Address,
  181%   -Connection,   +Options).   redis_connect/1   is     equivalent   to
  182%   redis_connect(localhost:6379, Connection, []).  Options:
  183%
  184%     - reconnect(+Boolean)
  185%       If `true`, try to reconnect to the service when the connection
  186%       seems lost.  Default is `true` for connections specified using
  187%       redis_server/3 and `false` for explictly opened connections.
  188%     - user(+User)
  189%       If version(3) and password(Password) are specified, these
  190%       are used to authenticate using the `HELLO` command.
  191%     - password(+Password)
  192%       Authenticate using Password
  193%     - version(+Version)
  194%       Specify the connection protocol version.  Initially this is
  195%       version 2.  Redis 6 also supports version 3.  When specified
  196%       as `3`, the `HELLO` command is used to upgrade the protocol.
  197%     - tls(true)
  198%       When specified, initiate a TLS connection.  If this option is
  199%       specified we must also specify the `cacert`, `key` and `cert`
  200%       options.
  201%     - cacert(+File)
  202%       CA Certificate file to verify with.
  203%     - cert(+File)
  204%       Client certificate to authenticate with.
  205%     - key(+File)
  206%       Private key file to authenticate with.
  207%     - sentinels(+ListOfAddresses)
  208%       Used together with an Address of the form sentinel(MasterName)
  209%       to enable contacting a network of Redis servers guarded by a
  210%       sentinel network.
  211%     - sentinel_user(+User)
  212%     - sentinel_password(+Password)
  213%       Authentication information for the senitels.  When omitted we
  214%       try to connect withour authentication.
  215%
  216%   Instead of using these predicates, redis/2  and redis/3 are normally
  217%   used with a _server name_  argument registered using redis_server/3.
  218%   These  predicates  are  meant  for   creating  a  temporary  paralel
  219%   connection or using a connection with a _blocking_ call.
  220%
  221%   @compat   redis_connect(-Connection,   +Host,     +Port)    provides
  222%   compatibility to the original GNU-Prolog interface and is equivalent
  223%   to redis_connect(Host:Port, Connection, []).
  224%
  225%   @arg Address is a term Host:Port, unix(File) or the name of a server
  226%   registered  using  redis_server/3.  The  latter   realises  a  _new_
  227%   connection that is typically used for   blocking redis commands such
  228%   as listening for published messages, waiting on a list or stream.
  229
  230redis_connect(Conn) :-
  231    redis_connect(default, Conn, []).
  232
  233redis_connect(Conn, Host, Port) :-
  234    var(Conn),
  235    ground(Host), ground(Port),
  236    !,                                  % GNU-Prolog compatibility
  237    redis_connect(Host:Port, Conn, []).
  238redis_connect(Server, Conn, Options) :-
  239    atom(Server),
  240    !,
  241    (   server(Server, Address, DefaultOptions)
  242    ->  merge_options(Options, DefaultOptions, Options2),
  243        do_connect(Server, Address, Conn, [address(Address)|Options2])
  244    ;   existence_error(redis_server, Server)
  245    ).
  246redis_connect(Address, Conn, Options) :-
  247    do_connect(Address, Address, Conn, [address(Address)|Options]).
  248
  249%!  do_connect(+Id, +Address, -Conn, +Options)
  250%
  251%   Open the connection.  A connection is a compound term of the shape
  252%
  253%       redis_connection(Id, Stream, Failures, Options)
  254
  255do_connect(Id, sentinel(Pool), Conn, Options) =>
  256    sentinel_master(Id, Pool, Conn, Options).
  257do_connect(Id, Address0, Conn, Options) =>
  258    tcp_address(Address0, Address),
  259    tcp_connect(Address, Stream0, Options),
  260    tls_upgrade(Address, Stream0, Stream, Options),
  261    Conn = redis_connection(Id, Stream, 0, Options),
  262    hello(Conn, Options).
  263
  264tcp_address(unix(Path), Path) :-
  265    !.                                  % Using an atom is ambiguous
  266tcp_address(Address, Address).
  267
  268%!  tls_upgrade(+Address, +Raw, -Stream, +Options) is det.
  269%
  270%   Upgrade to a TLS connection when tls(true) is specified.
  271
  272:- if(current_predicate(ssl_context/3)).  273tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  274    option(tls(true), Options),
  275    !,
  276    must_have_option(cacert(CacertFile), Options),
  277    must_have_option(key(KeyFile), Options),
  278    must_have_option(cert(CertFile), Options),
  279    ssl_context(client, SSL,
  280		[ host(Host),
  281		  certificate_file(CertFile),
  282		  key_file(KeyFile),
  283		  cacerts([file(CacertFile)]),
  284		  cert_verify_hook(tls_verify),
  285		  close_parent(true)
  286		]),
  287    stream_pair(Raw, RawRead, RawWrite),
  288    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  289    stream_pair(Stream, Read, Write).
  290:- endif.  291tls_upgrade(_, Stream, Stream, _).
  292
  293:- if(current_predicate(ssl_context/3)).  294
  295%!  tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet.
  296%
  297%   Accept  or reject  the certificate  verification.  Similar  to the
  298%   Redis  command   line  client   (``redis-cli``),  we   accept  the
  299%   certificate as long as it is signed, not verifying the hostname.
  300
  301:- public tls_verify/5.  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  305    !.
  306tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  307    fail.
  308
  309:- endif.  310
  311%!  sentinel_master(+ServerId, +SentinelPool, -Connection, +Options) is det.
  312%
  313%   Discover the master and connect to it.
  314
  315sentinel_master(Id, Pool, Master, Options) :-
  316    sentinel_connect(Id, Pool, Conn, Options),
  317    setting(sentinel_timeout, TMO),
  318    call_cleanup(
  319	query_sentinel(Pool, Conn, MasterAddr),
  320	redis_disconnect(Conn)),
  321    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  322    do_connect(Id, MasterAddr, Master, Options),
  323    debug(redis(sentinel), 'Connected to claimed master', []),
  324    redis(Master, role, Role),
  325    (   Role = [master|_Slaves]
  326    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  327    ;   redis_disconnect(Master),
  328	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  329	sleep(TMO),
  330	sentinel_master(Id, Pool, Master, Options)
  331    ).
  332
  333sentinel_connect(Id, Pool, Conn, Options) :-
  334    must_have_option(sentinels(Sentinels), Options),
  335    sentinel_auth(Options, Options1),
  336    setting(sentinel_timeout, TMO),
  337    (   sentinel(Pool, Sentinel)
  338    ;   member(Sentinel, Sentinels)
  339    ),
  340    catch(call_with_time_limit(
  341	      TMO,
  342	      do_connect(Id, Sentinel, Conn,
  343			 [sentinel(true)|Options1])),
  344	  Error,
  345	  (print_message(warning, Error),fail)),
  346    !,
  347    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  348    redis(Conn, sentinel(sentinels, Pool), Peers),
  349    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  350
  351sentinel_auth(Options0, Options) :-
  352    option(sentinel_user(User), Options0),
  353    option(sentinel_password(Passwd), Options0),
  354    !,
  355    merge_options([user(User), password(Passwd)], Options0, Options).
  356sentinel_auth(Options0, Options) :-
  357    select_option(password(_), Options0, Options, _).
  358
  359
  360query_sentinel(Pool, Conn, Host:Port) :-
  361    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  362    MasterData = [Host,Port].
  363
  364update_known_sentinels(Pool, Sentinel, Peers) :-
  365    retractall(sentinel(Pool, _)),
  366    maplist(update_peer_sentinel(Pool), Peers),
  367    asserta(sentinel(Pool, Sentinel)).
  368
  369update_peer_sentinel(Pool, Attrs),
  370  memberchk(ip-Host, Attrs),
  371  memberchk(port-Port, Attrs) =>
  372    asserta(sentinel(Pool, Host:Port)).
  373
  374must_have_option(Opt, Options) :-
  375    option(Opt, Options),
  376    !.
  377must_have_option(Opt, Options) :-
  378    existence_error(option, Opt, Options).
  379
  380%!  sentinel_slave(+ServerId, +Pool, -Slave, +Options) is nondet.
  381%
  382%   True when Slave is a slave server  in the sentinel cluster. Slave is
  383%   a dict holding the keys and values as described by the Redis command
  384%
  385%       SENTINEL SLAVES mastername
  386
  387sentinel_slave(ServerId, Pool, Slave, Options) :-
  388    sentinel_connect(ServerId, Pool, Conn, Options),
  389    redis(Conn, sentinel(slaves, Pool), Slaves),
  390    member(Pairs, Slaves),
  391    dict_create(Slave, redis, Pairs).
  392
  393%!  hello(+Connection, +Option)
  394%
  395%   Initialize the connection. This is  used   to  upgrade  to the RESP3
  396%   protocol and/or to authenticate.
  397
  398hello(Con, Options) :-
  399    option(version(V), Options),
  400    V >= 3,
  401    !,
  402    (   option(user(User), Options),
  403        option(password(Password), Options)
  404    ->  redis(Con, hello(3, auth, User, Password))
  405    ;   redis(Con, hello(3))
  406    ).
  407hello(Con, Options) :-
  408    option(password(Password), Options),
  409    !,
  410    redis(Con, auth(Password)).
  411hello(_, _).
  412
  413%!  redis_stream(+Spec, --Stream, +DoConnect) is det.
  414%
  415%   Get the stream to a Redis server from  Spec. Spec is either the name
  416%   of       a       registered       server       or       a       term
  417%   redis_connection(Id,Stream,Failures,Options).  If  the    stream  is
  418%   disconnected it will be reconnected.
  419
  420redis_stream(Var, S, _) :-
  421    (   var(Var)
  422    ->  !, instantiation_error(Var)
  423    ;   nonvar(S)
  424    ->  !, uninstantiation_error(S)
  425    ).
  426redis_stream(ServerName, S, Connect) :-
  427    atom(ServerName),
  428    !,
  429    (   connection(ServerName, S0)
  430    ->  S = S0
  431    ;   Connect == true,
  432        server(ServerName, Address, Options)
  433    ->  redis_connect(Address, Connection, Options),
  434        redis_stream(Connection, S, false),
  435        asserta(connection(ServerName, S))
  436    ;   existence_error(redis_server, ServerName)
  437    ).
  438redis_stream(redis_connection(_,S0,_,_), S, _) :-
  439    S0 \== (-),
  440    !,
  441    S = S0.
  442redis_stream(Redis, S, _) :-
  443    Redis = redis_connection(Id,-,_,Options),
  444    option(address(Address), Options),
  445    do_connect(Id,Address,Redis2,Options),
  446    arg(2, Redis2, S0),
  447    nb_setarg(2, Redis, S0),
  448    S = S0.
  449
  450has_redis_stream(Var, _) :-
  451    var(Var),
  452    !,
  453    instantiation_error(Var).
  454has_redis_stream(Alias, S) :-
  455    atom(Alias),
  456    !,
  457    connection(Alias, S).
  458has_redis_stream(redis_connection(_,S,_,_), S) :-
  459    S \== (-).
  460
  461
  462%!  redis_disconnect(+Connection) is det.
  463%!  redis_disconnect(+Connection, +Options) is det.
  464%
  465%   Disconnect from a redis server. The   second  form takes one option,
  466%   similar to close/2:
  467%
  468%     - force(Force)
  469%       When `true` (default `false`), do not raise any errors if
  470%       Connection does not exist or closing the connection raises
  471%       a network or I/O related exception.  This version is used
  472%       internally if a connection is in a broken state, either due
  473%       to a protocol error or a network issue.
  474
  475redis_disconnect(Redis) :-
  476    redis_disconnect(Redis, []).
  477
  478redis_disconnect(Redis, Options) :-
  479    option(force(true), Options),
  480    !,
  481    (   Redis = redis_connection(_Id, S, _, _Opts)
  482    ->  (   S == (-)
  483        ->  true
  484        ;   close(S, [force(true)]),
  485            nb_setarg(2, Redis, -)
  486        )
  487    ;   has_redis_stream(Redis, S)
  488    ->  close(S, [force(true)]),
  489        retractall(connection(_,S))
  490    ;   true
  491    ).
  492redis_disconnect(Redis, _Options) :-
  493    redis_stream(Redis, S, false),
  494    close(S),
  495    retractall(connection(_,S)).
  496
  497%!  redis(+Connection, +Request) is semidet.
  498%
  499%   This predicate is overloaded to handle two types of requests. First,
  500%   it is a shorthand for `redis(Connection, Command, _)` and second, it
  501%   can be used to exploit  Redis   _pipelines_  and _transactions_. The
  502%   second form is acticated if Request is  a _list_. In that case, each
  503%   element of the list is either a term  `Command -> Reply` or a simple
  504%   `Command`. Semantically this represents a   sequence  of redis/3 and
  505%   redis/2 calls.  It differs in the following aspects:
  506%
  507%     - All commands are sent in one batch, after which all replies are
  508%       read.  This reduces the number of _round trips_ and typically
  509%       greatly improves performance.
  510%     - If the first command is `multi` and the last `exec`, the
  511%       commands are executed as a Redis _transaction_, i.e., they
  512%       are executed _atomically_.
  513%     - If one of the commands returns an error, the subsequent commands
  514%       __are still executed__.
  515%     - You can not use variables from commands earlier in the list for
  516%       commands later in the list as a result of the above execution
  517%       order.
  518%
  519%   Procedurally, the process takes the following steps:
  520%
  521%     1. Send all commands
  522%     2. Read all replies and push messages
  523%     3. Handle all callbacks from push messages
  524%     4. Check whether one of the replies is an error.  If so,
  525%        raise this error (subsequent errors are lost)
  526%     5. Bind all replies for the `Command -> Reply` terms.
  527%
  528%   Examples
  529%
  530%   ```
  531%   ?- redis(default,
  532%            [ lpush(li,1),
  533%              lpush(li,2),
  534%              lrange(li,0,-1) -> List
  535%            ]).
  536%   List = ["2", "1"].
  537%   ```
  538
  539redis(Redis, PipeLine) :-
  540    is_list(PipeLine),
  541    !,
  542    redis_pipeline(Redis, PipeLine).
  543redis(Redis, Req) :-
  544    redis(Redis, Req, _).
  545
  546%!  redis(+Connection, +Command, -Reply) is semidet.
  547%
  548%   Execute a redis Command on  Connnection.   Next,  bind  Reply to the
  549%   returned result. Command is a  callable   term  whose functor is the
  550%   name of the Redis command  and   whose  arguments  are translated to
  551%   Redis arguments according to the rules below.  Note that all text is
  552%   always represented using UTF-8 encoding.
  553%
  554%     - Atomic values are emitted verbatim
  555%     - A term A:B:... where all arguments are either atoms,
  556%       strings or integers (__no floats__) is translated into
  557%       a string `"A:B:..."`.  This is a common shorthand for
  558%       representing Redis keys.
  559%     - A term Term as prolog is emitted as "\u0000T\u0000" followed
  560%       by Term in canonical form.
  561%     - Any other term is emitted as write/1.
  562%
  563%   Reply is either a plain term (often a  variable) or a term `Value as
  564%   Type`. In the latter form,  `Type`   dictates  how  the Redis _bulk_
  565%   reply is translated to Prolog. The default equals to `auto`, i.e.,
  566%   as a number of the content satisfies the Prolog number syntax and
  567%   as an atom otherwise.
  568%
  569%     - status(Atom)
  570%       Returned if the server replies with ``+ Status``.  Atom
  571%       is the textual value of `Status` converted to lower case,
  572%       e.g., status(ok) or status(pong).
  573%     - `nil`
  574%       This atom is returned for a NIL/NULL value.  Note that if
  575%       the reply is only `nil`, redis/3 _fails_.  The `nil` value
  576%       may be embedded inside lists or maps.
  577%     - A number
  578%       Returned if the server replies an integer (":Int"), double
  579%       (",Num") or big integer ("(Num")
  580%     - A string
  581%       Returned on a _bulk_ reply.  Bulk replies are supposed to be
  582%       in UTF-8 encoding.  The the bulk reply starts with
  583%       "\u0000T\u0000" it is supposed to be a Prolog term.
  584%       Note that this intepretation means it is __not__ possible
  585%       to read arbitrary binary blobs.
  586%     - A list of replies.  A list may also contain `nil`.  If Reply
  587%       as a whole would be `nil` the call fails.
  588%     - A list of _pairs_.  This is returned for the redis version 3
  589%       protocol "%Map".  Both the key and value respect the same
  590%       rules as above.
  591%
  592%   Redis _bulk_ replies are translated depending  on the `as` `Type` as
  593%   explained above.
  594%
  595%     - string
  596%     - string(Encoding)
  597%       Create a SWI-Prolog string object interpreting the blob as
  598%       following Encoding. Encoding is a restricted set of SWI-Prolog's
  599%       encodings: `bytes` (`iso_latin_1`), `utf8` and `text` (the
  600%       current locale translation).
  601%     - atom
  602%     - atom(Encoding)
  603%       As above, producing an atom.
  604%     - codes
  605%     - codes(Encoding)
  606%       As above, producing a list of integers (Unicode code points)
  607%     - chars
  608%     - chars(Encoding)
  609%       As above, producing a list of one-character atoms.
  610%     - integer
  611%     - float
  612%     - rational
  613%     - number
  614%       Interpret the bytes as a string representing a number.  If
  615%       the string does not represent a number of the requested type
  616%       a type_error(Type, String) is raised.
  617%     - tagged_integer
  618%       Same as integer, but demands the value to be between the Prolog
  619%       flags `min_tagged_integer` and `max_tagged_integer`, allowing
  620%       the value to be used as a dict key.
  621%     - auto
  622%       Same as auto(atom, number)
  623%     - auto(AsText,AsNumber)
  624%       If the bulk string confirms the syntax of AsNumber, convert
  625%       the value to the requested numberical type.  Else convert
  626%       the value to text according to AsText.  This is similar to
  627%       the Prolog predicate name/2.
  628%     - dict_key
  629%       Alias for auto(atom,tagged_integer).  This allows the value
  630%       to be used as a key for a SWI-Prolog dict.
  631%     - pairs(AsKey, AsValue)
  632%       Convert a map or array of even length into pairs for which the
  633%       key satisfies AsKey and the value AsValue.  The `pairs` type
  634%       can also be applied to a Redis array.  In this case the array
  635%       length must be even.  This notably allows fetching a Redis
  636%       _hash_ as pairs using ``HGETALL`` using version 2 of the
  637%       Redis protocol.
  638%     - dict(AsKey, AsValue)
  639%       Similar to pairs(AsKey, AsValue), but convert the resulting
  640%       pair list into a SWI-Prolog dict.  AsKey must convert to a
  641%       valid dict key, i.e., an atom or tagged integer. See `dict_key`.
  642%     - dict(AsValue)
  643%       Shorthand for dict(dict_key, AsValue).
  644%
  645%   Here are some simple examples
  646%
  647%   ```
  648%   ?- redis(default, set(a, 42), X).
  649%   X = status("OK").
  650%   ?- redis(default, get(a), X).
  651%   X = "42".
  652%   ?- redis(default, get(a), X as integer).
  653%   X = 42.
  654%   ?- redis(default, get(a), X as float).
  655%   X = 42.0.
  656%   ?- redis(default, set(swipl:version, 8)).
  657%   true.
  658%   ?- redis(default, incr(swipl:version), X).
  659%   X = 9.
  660%   ```
  661%
  662%   @error redis_error(Code, String)
  663
  664redis(Redis, Req, Out) :-
  665    out_val(Out, Val),
  666    redis1(Redis, Req, Out),
  667    Val \== nil.
  668
  669out_val(Out, Val) :-
  670    (   nonvar(Out),
  671        Out = (Val as _)
  672    ->  true
  673    ;   Val = Out
  674    ).
  675
  676redis1(Redis, Req, Out) :-
  677    Error = error(Formal, _),
  678    catch(redis2(Redis, Req, Out), Error, true),
  679    (   var(Formal)
  680    ->  true
  681    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  682    ).
  683
  684redis2(Redis, Req, Out) :-
  685    atom(Redis),
  686    !,
  687    redis_stream(Redis, S, true),
  688    with_mutex(Redis,
  689               ( redis_write_msg(S, Req),
  690                 redis_read_stream(Redis, S, Out)
  691               )).
  692redis2(Redis, Req, Out) :-
  693    redis_stream(Redis, S, true),
  694    redis_write_msg(S, Req),
  695    redis_read_stream(Redis, S, Out).
  696
  697%!  redis_pipeline(+Redis, +PipeLine)
  698
  699redis_pipeline(Redis, PipeLine) :-
  700    Error = error(Formal, _),
  701    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  702    (   var(Formal)
  703    ->  true
  704    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  705    ).
  706
  707redis_pipeline2(Redis, PipeLine) :-
  708    atom(Redis),
  709    !,
  710    redis_stream(Redis, S, true),
  711    with_mutex(Redis,
  712               redis_pipeline3(Redis, S, PipeLine)).
  713redis_pipeline2(Redis, PipeLine) :-
  714    redis_stream(Redis, S, true),
  715    redis_pipeline3(Redis, S, PipeLine).
  716
  717redis_pipeline3(Redis, S, PipeLine) :-
  718    maplist(write_pipeline(S), PipeLine),
  719    flush_output(S),
  720    read_pipeline(Redis, S, PipeLine).
  721
  722write_pipeline(S, Command -> _Reply) :-
  723    !,
  724    redis_write_msg_no_flush(S, Command).
  725write_pipeline(S, Command) :-
  726    redis_write_msg_no_flush(S, Command).
  727
  728read_pipeline(Redis, S, PipeLine) :-
  729    E = error(Formal,_),
  730    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  731    (   var(Formal)
  732    ->  true
  733    ;   reconnect_error(E)
  734    ->  redis_disconnect(Redis, [force(true)]),
  735        throw(E)
  736    ;   resync(Redis),
  737        throw(E)
  738    ).
  739
  740read_pipeline2(Redis, S, PipeLine) :-
  741    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  742    maplist(handle_push(Redis), Pushed),
  743    maplist(handle_error, Errors),
  744    maplist(bind_reply, PipeLine, Replies).
  745
  746redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  747    !,
  748    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  749redis_read_msg3(S, Var, Reply, Error, Push) :-
  750    redis_read_msg(S, Var, Reply, Error, Push).
  751
  752handle_push(Redis, Pushed) :-
  753    handle_push_messages(Pushed, Redis).
  754handle_error(Error) :-
  755    (   var(Error)
  756    ->  true
  757    ;   throw(Error)
  758    ).
  759bind_reply(_Command -> Reply0, Reply) :-
  760    !,
  761    Reply0 = Reply.
  762bind_reply(_Command, _).
  763
  764
  765%!  recover(+Error, +Redis, :Goal)
  766%
  767%   Error happened while running Goal on Redis. If this is a recoverable
  768%   error (i.e., a network or disconnected peer),  wait a little and try
  769%   running Goal again.
  770
  771:- meta_predicate recover(+, +, 0).  772
  773recover(Error, Redis, Goal) :-
  774    Error = error(Formal, _),
  775    reconnect_error(Formal),
  776    auto_reconnect(Redis),
  777    !,
  778    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  779          [Redis, Error]),
  780    redis_disconnect(Redis, [force(true)]),
  781    (   wait_to_retry(Redis, Error)
  782    ->  call(Goal),
  783        retractall(failure(Redis, _))
  784    ;   throw(Error)
  785    ).
  786recover(Error, _, _) :-
  787    throw(Error).
  788
  789auto_reconnect(redis_connection(_,_,_,Options)) :-
  790    !,
  791    option(reconnect(true), Options).
  792auto_reconnect(Server) :-
  793    ground(Server),
  794    server(Server, _, Options),
  795    option(reconnect(true), Options, true).
  796
  797reconnect_error(io_error(_Action, _On)).
  798reconnect_error(socket_error(_Code, _)).
  799reconnect_error(syntax_error(unexpected_eof)).
  800reconnect_error(existence_error(stream, _)).
  801
  802%!  wait(+Redis, +Error)
  803%
  804%   Wait for some time after a failure. First  we wait for 10ms. This is
  805%   doubled on each failure upto the   setting  `max_retry_wait`. If the
  806%   setting `max_retry_count` is exceeded we fail and the called signals
  807%   an exception.
  808
  809:- dynamic failure/2 as volatile.  810
  811wait_to_retry(Redis, Error) :-
  812    redis_failures(Redis, Failures),
  813    setting(max_retry_count, Count),
  814    Failures < Count,
  815    Failures2 is Failures+1,
  816    redis_set_failures(Redis, Failures2),
  817    setting(max_retry_wait, MaxWait),
  818    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  819    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  820    retry_message_level(Failures, Level),
  821    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  822    sleep(Wait).
  823
  824redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  825    !,
  826    Failures = Failures0.
  827redis_failures(Server, Failures) :-
  828    atom(Server),
  829    (   failure(Server, Failures)
  830    ->  true
  831    ;   Failures = 0
  832    ).
  833
  834redis_set_failures(Connection, Count) :-
  835    compound(Connection),
  836    !,
  837    nb_setarg(3, Connection, Count).
  838redis_set_failures(Server, Count) :-
  839    atom(Server),
  840    retractall(failure(Server, _)),
  841    asserta(failure(Server, Count)).
  842
  843retry_message_level(0, warning) :- !.
  844retry_message_level(_, silent).
  845
  846
  847%!  redis(+Request)
  848%
  849%   Connect to the default redis server,   call  redist/3 using Request,
  850%   disconnect and print the result.  This   predicate  is  intended for
  851%   interactive usage.
  852
  853redis(Req) :-
  854    setup_call_cleanup(
  855        redis_connect(default, C, []),
  856        redis1(C, Req, Out),
  857        redis_disconnect(C)),
  858    print(Out).
  859
  860%!  redis_write(+Redis, +Command) is det.
  861%!  redis_read(+Redis, -Reply) is det.
  862%
  863%   Write command and read replies from a Redis server. These are
  864%   building blocks for subscribing to event streams.
  865
  866redis_write(Redis, Command) :-
  867    redis_stream(Redis, S, true),
  868    redis_write_msg(S, Command).
  869
  870redis_read(Redis, Reply) :-
  871    redis_stream(Redis, S, true),
  872    redis_read_stream(Redis, S, Reply).
  873
  874
  875		 /*******************************
  876		 *      HIGH LEVEL ACCESS	*
  877		 *******************************/
  878
  879%!  redis_get_list(+Redis, +Key, -List) is det.
  880%!  redis_get_list(+Redis, +Key, +ChunkSize, -List) is det.
  881%
  882%   Get the content of a Redis list in   List. If ChunkSize is given and
  883%   smaller than the list length, List is returned as a _lazy list_. The
  884%   actual values are requested using   redis  ``LRANGE`` requests. Note
  885%   that this results in O(N^2) complexity. Using   a  lazy list is most
  886%   useful for relatively short lists holding possibly large items.
  887%
  888%   Note that values retrieved are _strings_, unless the value was added
  889%   using `Term as prolog`.
  890%
  891%   It seems possible for ``LLEN`` to return   ``OK``. I don't know why.
  892%   As a work-around we return the empty list rather than an error.
  893%
  894%   @see lazy_list/2 for a discussion  on   the  difference between lazy
  895%   lists and normal lists.
  896
  897redis_get_list(Redis, Key, List) :-
  898    redis_get_list(Redis, Key, -1, List).
  899
  900redis_get_list(Redis, Key, Chunk, List) :-
  901    redis(Redis, llen(Key), Len),
  902    (   Len == status(ok)
  903    ->  List = []
  904    ;   (   Chunk >= Len
  905        ;   Chunk == -1
  906        )
  907    ->  (   Len == 0
  908        ->  List = []
  909        ;   End is Len-1,
  910            list_range(Redis, Key, 0, End, List)
  911        )
  912    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  913    ).
  914
  915rlist_next(State, List, Tail) :-
  916    State = s(Redis,Key,Offset,Slice,Len),
  917    End is min(Len-1, Offset+Slice-1),
  918    list_range(Redis, Key, Offset, End, Elems),
  919    (   End =:= Len-1
  920    ->  List = Elems,
  921        Tail = []
  922    ;   Offset2 is Offset+Slice,
  923        nb_setarg(3, State, Offset2),
  924        append(Elems, Tail, List)
  925    ).
  926
  927% Redis LRANGE demands End > Start and returns inclusive.
  928
  929list_range(DB, Key, Start, Start, [Elem]) :-
  930    !,
  931    redis(DB, lindex(Key, Start), Elem).
  932list_range(DB, Key, Start, End, List) :-
  933    !,
  934    redis(DB, lrange(Key, Start, End), List).
  935
  936
  937
  938%!  redis_set_list(+Redis, +Key, +List) is det.
  939%
  940%   Associate a Redis key with a list.  As   Redis  has no concept of an
  941%   empty list, if List is `[]`, Key  is _deleted_. Note that key values
  942%   are always strings in  Redis.  The   same  conversion  rules  as for
  943%   redis/1-3 apply.
  944
  945redis_set_list(Redis, Key, List) :-
  946    redis(Redis, del(Key), _),
  947    (   List == []
  948    ->  true
  949    ;   Term =.. [rpush,Key|List],
  950        redis(Redis, Term, _Count)
  951    ).
  952
  953
  954%!  redis_get_hash(+Redis, +Key, -Data:dict) is det.
  955%!  redis_set_hash(+Redis, +Key, +Data:dict) is det.
  956%
  957%   Put/get a Redis hash as a Prolog  dict. Putting a dict first deletes
  958%   Key. Note that in many cases   applications will manage Redis hashes
  959%   by key. redis_get_hash/3 is notably a   user friendly alternative to
  960%   the Redis ``HGETALL`` command. If the  Redis   hash  is  not used by
  961%   other (non-Prolog) applications one  may   also  consider  using the
  962%   `Term as prolog` syntax to store the Prolog dict as-is.
  963
  964redis_get_hash(Redis, Key, Dict) :-
  965    redis(Redis, hgetall(Key), Dict as dict(auto)).
  966
  967redis_set_hash(Redis, Key, Dict) :-
  968    redis_array_dict(Array, _, Dict),
  969    Term =.. [hset,Key|Array],
  970    redis(Redis, del(Key), _),
  971    redis(Redis, Term, _Count).
  972
  973%!  redis_array_dict(?Array, ?Tag, ?Dict) is det.
  974%
  975%   Translate a Redis reply representing  hash   data  into a SWI-Prolog
  976%   dict. Array is either a list  of   alternating  keys and values or a
  977%   list of _pairs_. When translating to an array, this is always a list
  978%   of alternating keys and values.
  979%
  980%   @arg Tag is the SWI-Prolog dict tag.
  981
  982redis_array_dict(Array, Tag, Dict) :-
  983    nonvar(Array),
  984    !,
  985    array_to_pairs(Array, Pairs),
  986    dict_pairs(Dict, Tag, Pairs).
  987redis_array_dict(TwoList, Tag, Dict) :-
  988    dict_pairs(Dict, Tag, Pairs),
  989    pairs_to_array(Pairs, TwoList).
  990
  991array_to_pairs([], []) :-
  992    !.
  993array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  994    !,                                  % RESP3 returns a map as pairs.
  995    atom_string(Name, NameS),
  996    array_to_pairs(T0, T).
  997array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  998    atom_string(Name, NameS),
  999    array_to_pairs(T0, T).
 1000
 1001pairs_to_array([], []) :-
 1002    !.
 1003pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
 1004    atom_string(Name, NameS),
 1005    pairs_to_array(T0, T).
 1006
 1007%!  redis_scan(+Redis, -LazyList, +Options) is det.
 1008%!  redis_sscan(+Redis, +Set, -LazyList, +Options) is det.
 1009%!  redis_hscan(+Redis, +Hash, -LazyList, +Options) is det.
 1010%!  redis_zscan(+Redis, +Set, -LazyList, +Options) is det.
 1011%
 1012%   Map the Redis ``SCAN``, ``SSCAN``,   ``HSCAN`` and `ZSCAN`` commands
 1013%   into a _lazy list_. For redis_scan/3 and redis_sscan/4 the result is
 1014%   a list of strings. For redis_hscan/4   and redis_zscan/4, the result
 1015%   is a list of _pairs_.   Options processed:
 1016%
 1017%     - match(Pattern)
 1018%       Adds the ``MATCH`` subcommand, only returning matches for
 1019%       Pattern.
 1020%     - count(Count)
 1021%       Adds the ``COUNT`` subcommand, giving a hint to the size of the
 1022%       chunks fetched.
 1023%     - type(Type)
 1024%       Adds the ``TYPE`` subcommand, only returning answers of the
 1025%       indicated type.
 1026%
 1027%   @see lazy_list/2.
 1028
 1029redis_scan(Redis, LazyList, Options) :-
 1030    scan_options([match,count,type], Options, Parms),
 1031    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1032
 1033redis_sscan(Redis, Set, LazyList, Options) :-
 1034    scan_options([match,count,type], Options, Parms),
 1035    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1036
 1037redis_hscan(Redis, Hash, LazyList, Options) :-
 1038    scan_options([match,count,type], Options, Parms),
 1039    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1040
 1041redis_zscan(Redis, Set, LazyList, Options) :-
 1042    scan_options([match,count,type], Options, Parms),
 1043    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1044
 1045scan_options([], _, []).
 1046scan_options([H|T0], Options, [H,V|T]) :-
 1047    Term =.. [H,V],
 1048    option(Term, Options),
 1049    !,
 1050    scan_options(T0, Options, T).
 1051scan_options([_|T0], Options, T) :-
 1052    scan_options(T0, Options, T).
 1053
 1054
 1055scan_next(State, List, Tail) :-
 1056    State = s(Command,Redis,Cursor,Params),
 1057    Command =.. CList,
 1058    append(CList, [Cursor|Params], CList2),
 1059    Term =.. CList2,
 1060    redis(Redis, Term, [NewCursor,Elems0]),
 1061    scan_pairs(Command, Elems0, Elems),
 1062    (   NewCursor == 0
 1063    ->  List = Elems,
 1064        Tail = []
 1065    ;   nb_setarg(3, State, NewCursor),
 1066        append(Elems, Tail, List)
 1067    ).
 1068
 1069scan_pairs(hscan(_), List, Pairs) :-
 1070    !,
 1071    scan_pairs(List, Pairs).
 1072scan_pairs(zscan(_), List, Pairs) :-
 1073    !,
 1074    scan_pairs(List, Pairs).
 1075scan_pairs(_, List, List).
 1076
 1077scan_pairs([], []).
 1078scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1079    !,
 1080    scan_pairs(T0, T).
 1081scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1082    scan_pairs(T0, T).
 1083
 1084
 1085		 /*******************************
 1086		 *              ABOUT		*
 1087		 *******************************/
 1088
 1089%!  redis_current_command(+Redis, ?Command) is nondet.
 1090%!  redis_current_command(+Redis, ?Command, -Properties) is nondet.
 1091%
 1092%   True when Command has Properties. Fails   if Command is not defined.
 1093%   The redis_current_command/3 version  returns   the  command argument
 1094%   specification. See Redis documentation for an explanation.
 1095
 1096redis_current_command(Redis, Command) :-
 1097    redis_current_command(Redis, Command, _).
 1098
 1099redis_current_command(Redis, Command, Properties) :-
 1100    nonvar(Command),
 1101    !,
 1102    redis(Redis, command(info, Command), [[_|Properties]]).
 1103redis_current_command(Redis, Command, Properties) :-
 1104    redis(Redis, command, Commands),
 1105    member([Name|Properties], Commands),
 1106    atom_string(Command, Name).
 1107
 1108%!  redis_property(+Redis, ?Property) is nondet.
 1109%
 1110%   True if Property is a property of   the Redis server. Currently uses
 1111%   redis(info, String) and parses the result.   As  this is for machine
 1112%   usage, properties names *_human are skipped.
 1113
 1114redis_property(Redis, Property) :-
 1115    redis(Redis, info, String),
 1116    info_terms(String, Terms),
 1117    member(Property, Terms).
 1118
 1119info_terms(Info, Pairs) :-
 1120    split_string(Info, "\n", "\r\n ", Lines),
 1121    convlist(info_line_term, Lines, Pairs).
 1122
 1123info_line_term(Line, Term) :-
 1124    sub_string(Line, B, _, A, :),
 1125    !,
 1126    sub_atom(Line, 0, B, _, Name),
 1127    \+ sub_atom(Name, _, _, 0, '_human'),
 1128    sub_string(Line, _, A, 0, ValueS),
 1129    (   number_string(Value, ValueS)
 1130    ->  true
 1131    ;   Value = ValueS
 1132    ),
 1133    Term =.. [Name,Value].
 1134
 1135
 1136		 /*******************************
 1137		 *            SUBSCRIBE		*
 1138		 *******************************/
 1139
 1140%!  redis_subscribe(+Redis, +Channels, -Id, +Options) is det.
 1141%
 1142%   Subscribe to one or more  Redis   PUB/SUB  channels.  This predicate
 1143%   creates a thread using thread_create/3 with  the given Options. Once
 1144%   running, the thread listens for messages.   The message content is a
 1145%   string or Prolog term  as  described   in  redis/3.  On  receiving a
 1146%   message, the following message is broadcasted:
 1147%
 1148%       redis(Id, Channel, Data)
 1149%
 1150%   If redis_unsubscribe/2 removes the  last   subscription,  the thread
 1151%   terminates.
 1152%
 1153%   To simply print the incomming messages use e.g.
 1154%
 1155%       ?- listen(redis(_, Channel, Data),
 1156%                 format('Channel ~p got ~p~n', [Channel,Data])).
 1157%       true.
 1158%       ?- redis_subscribe(default, test, Id, []).
 1159%       Id = redis_pubsub_3,
 1160%       ?- redis(publish(test, "Hello world")).
 1161%       Channel test got "Hello world"
 1162%       1
 1163%       true.
 1164%
 1165%   @arg Id is the thread identifier of  the listening thread. Note that
 1166%   the Options alias(Name) can be used to get a system wide name.
 1167
 1168:- dynamic ( subscription/2,            % Id, Channel
 1169             listening/3                % Id, Connection, Thread
 1170           ) as volatile. 1171
 1172redis_subscribe(Redis, Spec, Id, Options) :-
 1173    atom(Redis),
 1174    !,
 1175    channels(Spec, Channels),
 1176    pubsub_thread_options(ThreadOptions, Options),
 1177    thread_create(setup_call_cleanup(
 1178                      redis_connect(Redis, Conn, [reconnect(true)]),
 1179                      redis_subscribe1(Redis, Conn, Channels),
 1180                      redis_disconnect(Conn)),
 1181                  Thread,
 1182                  ThreadOptions),
 1183    pubsub_id(Thread, Id).
 1184redis_subscribe(Redis, Spec, Id, Options) :-
 1185    channels(Spec, Channels),
 1186    pubsub_thread_options(ThreadOptions, Options),
 1187    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1188                  Thread,
 1189                  ThreadOptions),
 1190    pubsub_id(Thread, Id).
 1191
 1192pubsub_thread_options(ThreadOptions, Options) :-
 1193    merge_options(Options, [detached(true)], ThreadOptions).
 1194
 1195pubsub_id(Thread, Thread).
 1196%pubsub_id(Thread, Id) :-
 1197%    thread_property(Thread, id(TID)),
 1198%    atom_concat('redis_pubsub_', TID, Id).
 1199
 1200redis_subscribe1(Redis, Conn, Channels) :-
 1201    Error = error(Formal, _),
 1202    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1203    (   var(Formal)
 1204    ->  true
 1205    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1206        thread_self(Me),
 1207        pubsub_id(Me, Id),
 1208        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1209        redis_subscribe1(Redis, Conn, CurrentChannels)
 1210    ).
 1211
 1212redis_subscribe2(Redis, Conn, Channels) :-
 1213    redis_subscribe3(Conn, Channels),
 1214    redis_listen(Redis, Conn).
 1215
 1216redis_subscribe3(Conn, Channels) :-
 1217    thread_self(Me),
 1218    pubsub_id(Me, Id),
 1219    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1220    maplist(register_subscription(Id), Channels),
 1221    redis_stream(Conn, S, true),
 1222    Req =.. [subscribe|Channels],
 1223    redis_write_msg(S, Req).
 1224
 1225pubsub_clean(Id) :-
 1226    retractall(listening(Id, _Connection, _Thread)),
 1227    retractall(subscription(Id, _Channel)).
 1228
 1229%!  redis_subscribe(+Id, +Channels) is det.
 1230%!  redis_unsubscribe(+Id, +Channels) is det.
 1231%
 1232%   Add/remove channels from for the   subscription. If no subscriptions
 1233%   remain, the listening thread terminates.
 1234%
 1235%   @arg Channels is either a single  channel   or  a list thereof. Each
 1236%   channel specification is either an atom   or a term `A:B:...`, where
 1237%   all parts are atoms.
 1238
 1239redis_subscribe(Id, Spec) :-
 1240    channels(Spec, Channels),
 1241    (   listening(Id, Connection, _Thread)
 1242    ->  true
 1243    ;   existence_error(redis_pubsub, Id)
 1244    ),
 1245    maplist(register_subscription(Id), Channels),
 1246    redis_stream(Connection, S, true),
 1247    Req =.. [subscribe|Channels],
 1248    redis_write_msg(S, Req).
 1249
 1250redis_unsubscribe(Id, Spec) :-
 1251    channels(Spec, Channels),
 1252    (   listening(Id, Connection, _Thread)
 1253    ->  true
 1254    ;   existence_error(redis_pubsub, Id)
 1255    ),
 1256    maplist(unregister_subscription(Id), Channels),
 1257    redis_stream(Connection, S, true),
 1258    Req =.. [unsubscribe|Channels],
 1259    redis_write_msg(S, Req).
 1260
 1261%!  redis_current_subscription(?Id, ?Channels)
 1262%
 1263%   True when a PUB/SUB subscription with Id is listening on Channels.
 1264
 1265redis_current_subscription(Id, Channels) :-
 1266    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1267    keysort(Pairs, Sorted),
 1268    group_pairs_by_key(Sorted, Grouped),
 1269    member(Id-Channels, Grouped).
 1270
 1271channels(Spec, List) :-
 1272    is_list(Spec),
 1273    !,
 1274    maplist(channel_name, Spec, List).
 1275channels(Ch, [Key]) :-
 1276    channel_name(Ch, Key).
 1277
 1278channel_name(Atom, Atom) :-
 1279    atom(Atom),
 1280    !.
 1281channel_name(Key, Atom) :-
 1282    phrase(key_parts(Key), Parts),
 1283    !,
 1284    atomic_list_concat(Parts, :, Atom).
 1285channel_name(Key, _) :-
 1286    type_error(redis_key, Key).
 1287
 1288key_parts(Var) -->
 1289    { var(Var), !, fail }.
 1290key_parts(Atom) -->
 1291    { atom(Atom) },
 1292    !,
 1293    [Atom].
 1294key_parts(A:B) -->
 1295    key_parts(A),
 1296    key_parts(B).
 1297
 1298
 1299
 1300
 1301register_subscription(Id, Channel) :-
 1302    (   subscription(Id, Channel)
 1303    ->  true
 1304    ;   assertz(subscription(Id, Channel))
 1305    ).
 1306
 1307unregister_subscription(Id, Channel) :-
 1308    retractall(subscription(Id, Channel)).
 1309
 1310redis_listen(Redis, Conn) :-
 1311    thread_self(Me),
 1312    pubsub_id(Me, Id),
 1313    setup_call_cleanup(
 1314        assertz(listening(Id, Conn, Me), Ref),
 1315        redis_listen_loop(Redis, Id, Conn),
 1316        erase(Ref)).
 1317
 1318redis_listen_loop(Redis, Id, Conn) :-
 1319    redis_stream(Conn, S, true),
 1320    (   subscription(Id, _)
 1321    ->  redis_read_stream(Redis, S, Reply),
 1322        redis_broadcast(Redis, Reply),
 1323        redis_listen_loop(Redis, Id, Conn)
 1324    ;   true
 1325    ).
 1326
 1327redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1328    !.
 1329redis_broadcast(Redis, [message, Channel, Data]) :-
 1330    !,
 1331    catch(broadcast(redis(Redis, Channel, Data)),
 1332          Error,
 1333          print_message(error, Error)).
 1334redis_broadcast(Redis, Message) :-
 1335    assertion((Message = [Type, Channel, _Data],
 1336               atom(Type),
 1337               atom(Channel))),
 1338    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1339          [Redis,Message]).
 1340
 1341
 1342		 /*******************************
 1343		 *          READ/WRITE		*
 1344		 *******************************/
 1345
 1346%!  redis_read_stream(+Redis, +Stream, -Term) is det.
 1347%
 1348%   Read a message from a Redis stream.  Term is one of
 1349%
 1350%     - A list of terms (array)
 1351%     - A list of pairs (map, RESP3 only)
 1352%     - The atom `nil`
 1353%     - A number
 1354%     - A term status(String)
 1355%     - A string
 1356%     - A boolean (`true` or `false`).  RESP3 only.
 1357%
 1358%   If something goes wrong, the connection   is closed and an exception
 1359%   is raised.
 1360
 1361redis_read_stream(Redis, SI, Out) :-
 1362    E = error(Formal,_),
 1363    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1364    (   var(Formal)
 1365    ->  handle_push_messages(Push, Redis),
 1366        (   var(Error)
 1367        ->  Out = Out0
 1368        ;   resync(Redis),
 1369            throw(Error)
 1370        )
 1371    ;   redis_disconnect(Redis, [force(true)]),
 1372        throw(E)
 1373    ).
 1374
 1375handle_push_messages([], _).
 1376handle_push_messages([H|T], Redis) :-
 1377    (   catch(handle_push_message(H, Redis), E,
 1378              print_message(warning, E))
 1379    ->  true
 1380    ;   true
 1381    ),
 1382    handle_push_messages(T, Redis).
 1383
 1384handle_push_message(["pubsub"|List], Redis) :-
 1385    redis_broadcast(Redis, List).
 1386% some protocol version 3 push messages (such as
 1387% __keyspace@* events) seem to come directly
 1388% without a pubsub header
 1389handle_push_message([message|List], Redis) :-
 1390    redis_broadcast(Redis, [message|List]).
 1391
 1392
 1393%!  resync(+Redis) is det.
 1394%
 1395%   Re-synchronize  after  an  error.  This  may  happen  if  some  type
 1396%   conversion fails and we have read  a   partial  reply. It is hard to
 1397%   figure out what to read from where we are, so we echo a random magic
 1398%   sequence and read until we find the reply.
 1399
 1400resync(Redis) :-
 1401    E = error(Formal,_),
 1402    catch(do_resync(Redis), E, true),
 1403    (   var(Formal)
 1404    ->  true
 1405    ;   redis_disconnect(Redis, [force(true)])
 1406    ).
 1407
 1408do_resync(Redis) :-
 1409    A is random(1_000_000_000),
 1410    redis_stream(Redis, S, true),
 1411    redis_write_msg(S, echo(A)),
 1412    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1413          time_limit_exceeded,
 1414          throw(error(time_limit_exceeded,_))).
 1415
 1416
 1417%!  redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det.
 1418%!  redis_write_msg(+Stream, +Message) is det.
 1419%
 1420%   Read/write a Redis message. Both these predicates are in the foreign
 1421%   module `redis4pl`.
 1422%
 1423%   @arg PushMessages is a list of push   messages that may be non-[] if
 1424%   protocol version 3 (see redis_connect/3) is selected. Using protocol
 1425%   version 2 this list is always empty.
 1426
 1427
 1428
 1429		 /*******************************
 1430		 *            MESSAGES		*
 1431		 *******************************/
 1432
 1433:- multifile
 1434    prolog:error_message//1,
 1435    prolog:message//1. 1436
 1437prolog:error_message(redis_error(Code, String)) -->
 1438    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1439
 1440prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1441    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1442    [ '    '-[] ], '$messages':translate_message(Error)