1:- module(transport, [
    2    perform_request/6,      % +Ps, +HTTPMethod, +Context, +Params, -Status, -Reply
    3    perform_request/7       % +Ps, +HTTPMethod, +Context, +Params, +Body, -Status, -Reply
    4]).

Transport related logic.

author
- Hongxin Liang
license
- Apache License Version 2.0 */
   12:- use_module(library(uri)).   13:- use_module(library(lists)).   14:- use_module(library(http/http_client)).   15:- use_module(library(http/http_open)).   16:- use_module(library(http/http_json)).   17:- use_module(library(http/json)).   18
   19:- use_module(connection_pool).   20:- use_module(registry).   21:- use_module(util).
 perform_request(+Ps, +HTTPMethod, +Context, +Params, -Status, -Reply) is semidet
 perform_request(+Ps, +HTTPMethod, +Context, +Params, +Body, -Status, -Reply) is semidet
Perform actual HTTP request. For GET and DELETE methods, body is not supported.
   28perform_request(Ps, get, Context, Params, Status, Reply) :- !,
   29    debug(transport, 'GET ~w', [Context]),
   30    http_operation_with_retry(Ps, Context, Params, http_get, Status, Reply).
   31
   32perform_request(Ps, delete, Context, Params, Status, Reply) :- !,
   33    debug(transport, 'DELETE ~w', [Context]),
   34    http_operation_with_retry(Ps, Context, Params, http_delete, Status, Reply).
   35
   36perform_request(Ps, head, Context, Params, Status, Reply) :- !,
   37    debug(transport, 'HEAD ~w', [Context]),
   38    http_operation_with_retry(Ps, Context, Params, http_head, Status, Reply).
   39
   40perform_request(Ps, get, Context, Params, Body, Status, Reply) :- !,
   41    debug(transport, 'GET context ~w body ~w', [Context, Body]),
   42    (   nonvar(Body)
   43    ->  perform_request(Ps, post, Context, Params, Body, Status, Reply)
   44    ;   perform_request(Ps, get, Context, Params, Status, Reply)
   45    ).
   46
   47perform_request(Ps, delete, Context, Params, Body, Status, Reply) :- !,
   48    debug(transport, 'DELETE context ~w body ~w', [Context, Body]),
   49    (   nonvar(Body)
   50    ->  perform_request(Ps, delete_ex, Context, Params, Body, Status, Reply)
   51    ;   perform_request(Ps, delete, Context, Params, Status, Reply)
   52    ).
   53
   54perform_request(Ps, delete_ex, Context, Params, Body, Status, Reply) :- !,
   55    wrap_body(Body, WrappedBody),
   56    debug(transport, 'DELETE_EX context ~w body ~w', [Context, WrappedBody]),
   57    http_operation_with_retry(Ps, Context, Params, http_delete_ex(WrappedBody), Status, Reply).
   58
   59perform_request(Ps, post, Context, Params, Body, Status, Reply) :- !,
   60    wrap_body(Body, WrappedBody),
   61    debug(transport, 'POST context ~w body ~w', [Context, WrappedBody]),
   62    http_operation_with_retry(Ps, Context, Params, http_post(WrappedBody), Status, Reply).
   63
   64perform_request(Ps, put, Context, Params, Body, Status, Reply) :- !,
   65    wrap_body(Body, WrappedBody),
   66    debug(transport, 'PUT context ~w body ~w', [Context, WrappedBody]),
   67    http_operation_with_retry(Ps, Context, Params, http_put(WrappedBody), Status, Reply).
   68
   69http_head(URL, Reply, Options) :-
   70    http_get(URL, Reply, [method(head)|Options]).
   71
   72http_delete_ex(URL, Data, Reply, Options) :-
   73    http_post(URL, Data, Reply, [method(delete)|Options]).
   74    
   75wrap_body(Body, WrappedBody) :-
   76    is_dict(Body), !,
   77    WrappedBody = json(Body).
   78
   79wrap_body(Body, WrappedBody) :-
   80    once((string(Body); atom(Body))), !,
   81    WrappedBody = codes(Body).
   82
   83wrap_body(Body, WrappedBody) :-
   84    var(Body), !,
   85    WrappedBody = codes('').
   86
   87http_operation_with_retry(Ps, Context, Params, Operation, Status, Reply) :-
   88    options(Ps, Options),
   89    memberchk(retry_on_status(RetryOnStatus), Options),
   90    memberchk(retry_on_timeout(RetryOnTimeout), Options),
   91    memberchk(max_retries(MaxRetries), Options),
   92    extract_param(Params, NewParams, timeout, Timeout, infinite),
   93    extract_param(NewParams, NewParams1, ignore, Ignore, []),
   94    http_operation_with_retry0(Ps, Context, NewParams1, Operation, RetryOnStatus, RetryOnTimeout, MaxRetries, Timeout, Ignore, Status, Reply).
   95
   96http_operation_with_retry0(Ps, Context, Params, Operation, RetryOnStatus, RetryOnTimeout, MaxRetries, Timeout, Ignore, Status, Reply) :-
   97    get_connection(Ps, Connection),
   98    compose_url(Connection, Context, Params, URL),
   99    Operation =.. [Name|Args],
  100    Operation1 =.. [Name|[URL|Args]],
  101    (   catch(call(Operation1, Reply0, [status_code(Status0), timeout(Timeout), json_object(dict)]), E, true)
  102    ->  (   var(E)
  103        ->  handle_status(Status0, Reply0, RetryOnStatus, Ignore, Success, Retry)
  104        ;   handle_exception(E, RetryOnTimeout, Success, Retry)
  105        )
  106    ),
  107    (   \+ Success
  108    ->  (   Retry
  109        ->  mark_dead(Ps, Connection),
  110            (   MaxRetries > 0
  111            ->  debug(transport, 'retrying... count ~w', [MaxRetries]),
  112                http_operation_with_retry0(Ps, Context, Params, Operation, RetryOnStatus,
  113                    RetryOnTimeout, MaxRetries - 1, Timeout, Ignore, Status, Reply)
  114            ;   throw(error(plasticsearch_exception(Status0, Reply0)))
  115            )
  116        ;   throw(error(plasticsearch_exception(Status0, Reply0)))
  117        )
  118    ;   mark_alive(Ps, Connection),
  119        Status = Status0,
  120        Reply = Reply0
  121    ).
  122
  123get_timeout_option(Params, Timeout, Params1) :-
  124    (   del_dict(timeout, Params, Timeout, Params1)
  125    ->  true
  126    ;   Timeout = infinite,
  127        Params1 = Params
  128    ).
  129
  130compose_url(uri_components(Scheme, Authority, Path, Search, Fragment), Context, Params, URL) :-
  131    atom_concat(Path, Context, NewPath),
  132    dict_pairs(Params, _, Pairs0),
  133    (   nonvar(Search)
  134    ->  uri_query_components(Search, Pairs1),
  135        append(Pairs1, Pairs0, Pairs)
  136    ;   Pairs = Pairs0
  137    ),
  138    uri_query_components(NewSearch, Pairs),
  139    uri_components(URL, uri_components(Scheme, Authority, NewPath, NewSearch, Fragment)).
  140
  141handle_status(Status, Reply, RetryOnStatus, Ignore, Success, Retry) :-
  142    debug(transport, 'status code ~w, reply ~w', [Status, Reply]),
  143    (   once((Status >= 200, Status < 300; memberchk(Status, Ignore)))
  144    ->  Success = true
  145    ;   match_status_and_throw_immediately(Status, Reply),
  146        (   memberchk(Status, RetryOnStatus)
  147        ->  Retry = true
  148        ;   Retry = false
  149        ),
  150        Success = false
  151    ).
  152
  153match_status_and_throw_immediately(Status, Reply) :-
  154    (   memberchk(Status, [400, 401, 403, 404, 409])
  155    ->  throw(error(plasticsearch_exception(Status, Reply)))
  156    ;   true
  157    ).
  158
  159handle_exception(E, RetryOnTimeout, false, Retry) :-
  160    debug(transport, 'Exception ~w', [E]),
  161    (   E = error(socket_error(_), _)
  162    ->  Retry = true
  163    ;   (   E = error(timeout_error(_, _) ,_)
  164        ->  Retry = RetryOnTimeout
  165        ;   Retry = false
  166        )
  167    )