1:- module(connection_pool, [
    2    get_connection/2,       % +Ps, -Connection
    3    mark_alive/2,           % +Ps, +Connection
    4    mark_dead/2             % +Ps, +Connection
    5]).

Connection pool.

Connection pool manages lifecycle of connections.

author
- Hongxin Liang
license
- Apache License Version 2.0 */
   14:- use_module(library(random)).   15:- use_module(library(lists)).   16
   17:- use_module(registry).   18:- use_module(util).
 get_connection(+Ps, -Connections) is det
Get one connection for pool. If there is no alive connection, a randomly selected one will be returned.
   25get_connection(Ps, Connection) :-
   26    with_mutex(Ps, get_connection0(Ps, Connection)).
   27
   28get_connection0(Ps, Connection) :-
   29    resurrect(Ps, false, _),
   30    connections(Ps, Connections),
   31    (   Connections = []
   32    ->  resurrect(Ps, true, Connection)
   33    ;   options(Ps, Options),
   34        (   memberchk(random_selector(false), Options)
   35        ->  next_rr(Ps, RR),
   36            length(Connections, Length),
   37            Index is RR mod Length,
   38            nth0(Index, Connections, Connection)
   39        ;   random_select(Connection, Connections, _)
   40        )
   41    ).
   42
   43next_rr(Ps, RR) :-
   44    recorded(Ps, Value, Ref),
   45    erase(Ref),
   46    RR is Value.vars.rr + 1,
   47    recorda(Ps, Value.put(vars, Value.vars.put(rr, RR))).
 mark_alive(+Ps, +Connection) is det
Mark a connection as alive.
   53mark_alive(Ps, Connection) :-
   54    with_mutex(Ps, mark_alive0(Ps, Connection)).
   55
   56mark_alive0(Ps, Connection) :-
   57    recorded(Ps, Value, Ref),
   58    erase(Ref),
   59    DeadCount = Value.vars.dead_count,
   60    (   selectchk(Connection-_, DeadCount, DeadCount1)
   61    ->  true
   62    ;   DeadCount1 = DeadCount
   63    ),
   64    recorda(Ps, Value.put([vars=Value.vars.put(dead_count, DeadCount1)])).
 mark_dead(+Ps, +Connection) is det
Mark a connection as dead.
   70mark_dead(Ps, Connection) :-
   71    with_mutex(Ps, mark_dead0(Ps, Connection)).
   72
   73mark_dead0(Ps, Connection) :-
   74    recorded(Ps, Value, Ref),
   75    erase(Ref),
   76    Connections = Value.vars.connections,
   77    (   selectchk(Connection, Connections, Connections1)
   78    ->  update_connection_info(Ps, Connection, Connections1, Value)
   79    ;   true
   80    ).
   81
   82update_connection_info(Ps, Connection, Connections, Value) :-
   83    _{dead_connections:DeadConnections, dead_count:DeadCount} :< Value.vars,
   84    (   selectchk(Connection-Count, DeadCount, DeadCount1)
   85    ->  true
   86    ;   Count = 0
   87    ),
   88    Count1 is Count + 1,
   89    memberchk(dead_timeout(DeadTimeout), Value.options),
   90    memberchk(timeout_cutoff(TimeoutCutoff), Value.options),
   91    Timeout is DeadTimeout * 2 ** min(Count1 - 1, TimeoutCutoff),
   92    get_time(Now),
   93    Future is Now + Timeout,
   94    DeadConnections1 = [Future-Connection|DeadConnections],
   95    debug(connection_pool,
   96        'mark connect ~w dead with timeout ~w and count ~w', [Connection, Timeout, Count1]),
   97    keysort(DeadConnections1, DeadConnections2),
   98    Value1 = Value.put(vars, Value.vars.put(_{
   99        connections:Connections,
  100        dead_connections:DeadConnections2,
  101        dead_count:[Connection-Count1|DeadCount1]})),
  102    recorda(Ps, Value1).
  103
  104resurrect(Ps, Force, Connection) :-
  105    with_mutex(Ps, resurrect0(Ps, Force, Connection)).
  106
  107resurrect0(Ps, Force, Connection) :-
  108    recorded(Ps, Value, Ref),
  109    erase(Ref),
  110    _{connections:Connections, dead_connections:DeadConnections} :< Value.vars,
  111    (   DeadConnections = []
  112    ->  (   Force
  113        ->  random_select(Connection, Value.hosts, _),
  114            debug(connection_pool,
  115                'forced to resurrect, choose a random one ~w', [Connection])
  116        ;   true
  117        ),
  118        Value1 = Value
  119    ;   DeadConnections = [Timeout-Connection|DeadConnections1],
  120        (   once((get_time(Now), Timeout =< Now; Force))
  121        ->  Connections1 = [Connection|Connections],
  122            debug(connection_pool,
  123                'dead timeout or forced ~w', [Connection]),
  124            Value1 = Value.put(vars, Value.vars.put(_{
  125                connections:Connections1, dead_connections:DeadConnections1}))
  126        ;   Value1 = Value
  127        )
  128    ),
  129    recorda(Ps, Value1)