1:- module(transport, [
2 perform_request/6, 3 perform_request/7 4]).
12:- use_module(library(uri)). 13:- use_module(library(lists)). 14:- use_module(library(http/http_client)). 15:- use_module(library(http/http_open)). 16:- use_module(library(http/http_json)). 17:- use_module(library(http/json)). 18
19:- use_module(connection_pool). 20:- use_module(registry). 21:- use_module(util).
28perform_request(Ps, get, Context, Params, Status, Reply) :- !,
29 debug(transport, 'GET ~w', [Context]),
30 http_operation_with_retry(Ps, Context, Params, http_get, Status, Reply).
31
32perform_request(Ps, delete, Context, Params, Status, Reply) :- !,
33 debug(transport, 'DELETE ~w', [Context]),
34 http_operation_with_retry(Ps, Context, Params, http_delete, Status, Reply).
35
36perform_request(Ps, head, Context, Params, Status, Reply) :- !,
37 debug(transport, 'HEAD ~w', [Context]),
38 http_operation_with_retry(Ps, Context, Params, http_head, Status, Reply).
39
40perform_request(Ps, get, Context, Params, Body, Status, Reply) :- !,
41 debug(transport, 'GET context ~w body ~w', [Context, Body]),
42 ( nonvar(Body)
43 -> perform_request(Ps, post, Context, Params, Body, Status, Reply)
44 ; perform_request(Ps, get, Context, Params, Status, Reply)
45 ).
46
47perform_request(Ps, delete, Context, Params, Body, Status, Reply) :- !,
48 debug(transport, 'DELETE context ~w body ~w', [Context, Body]),
49 ( nonvar(Body)
50 -> perform_request(Ps, delete_ex, Context, Params, Body, Status, Reply)
51 ; perform_request(Ps, delete, Context, Params, Status, Reply)
52 ).
53
54perform_request(Ps, delete_ex, Context, Params, Body, Status, Reply) :- !,
55 wrap_body(Body, WrappedBody),
56 debug(transport, 'DELETE_EX context ~w body ~w', [Context, WrappedBody]),
57 http_operation_with_retry(Ps, Context, Params, http_delete_ex(WrappedBody), Status, Reply).
58
59perform_request(Ps, post, Context, Params, Body, Status, Reply) :- !,
60 wrap_body(Body, WrappedBody),
61 debug(transport, 'POST context ~w body ~w', [Context, WrappedBody]),
62 http_operation_with_retry(Ps, Context, Params, http_post(WrappedBody), Status, Reply).
63
64perform_request(Ps, put, Context, Params, Body, Status, Reply) :- !,
65 wrap_body(Body, WrappedBody),
66 debug(transport, 'PUT context ~w body ~w', [Context, WrappedBody]),
67 http_operation_with_retry(Ps, Context, Params, http_put(WrappedBody), Status, Reply).
68
69http_head(URL, Reply, Options) :-
70 http_get(URL, Reply, [method(head)|Options]).
71
72http_delete_ex(URL, Data, Reply, Options) :-
73 http_post(URL, Data, Reply, [method(delete)|Options]).
74
75wrap_body(Body, WrappedBody) :-
76 is_dict(Body), !,
77 WrappedBody = json(Body).
78
79wrap_body(Body, WrappedBody) :-
80 once((string(Body); atom(Body))), !,
81 WrappedBody = codes(Body).
82
83wrap_body(Body, WrappedBody) :-
84 var(Body), !,
85 WrappedBody = codes('').
86
87http_operation_with_retry(Ps, Context, Params, Operation, Status, Reply) :-
88 options(Ps, Options),
89 memberchk(retry_on_status(RetryOnStatus), Options),
90 memberchk(retry_on_timeout(RetryOnTimeout), Options),
91 memberchk(max_retries(MaxRetries), Options),
92 extract_param(Params, NewParams, timeout, Timeout, infinite),
93 extract_param(NewParams, NewParams1, ignore, Ignore, []),
94 http_operation_with_retry0(Ps, Context, NewParams1, Operation, RetryOnStatus, RetryOnTimeout, MaxRetries, Timeout, Ignore, Status, Reply).
95
96http_operation_with_retry0(Ps, Context, Params, Operation, RetryOnStatus, RetryOnTimeout, MaxRetries, Timeout, Ignore, Status, Reply) :-
97 get_connection(Ps, Connection),
98 compose_url(Connection, Context, Params, URL),
99 Operation =.. [Name|Args],
100 Operation1 =.. [Name|[URL|Args]],
101 ( catch(call(Operation1, Reply0, [status_code(Status0), timeout(Timeout), json_object(dict)]), E, true)
102 -> ( var(E)
103 -> handle_status(Status0, Reply0, RetryOnStatus, Ignore, Success, Retry)
104 ; handle_exception(E, RetryOnTimeout, Success, Retry)
105 )
106 ),
107 ( \+ Success
108 -> ( Retry
109 -> mark_dead(Ps, Connection),
110 ( MaxRetries > 0
111 -> debug(transport, 'retrying... count ~w', [MaxRetries]),
112 http_operation_with_retry0(Ps, Context, Params, Operation, RetryOnStatus,
113 RetryOnTimeout, MaxRetries - 1, Timeout, Ignore, Status, Reply)
114 ; throw(error(plasticsearch_exception(Status0, Reply0)))
115 )
116 ; throw(error(plasticsearch_exception(Status0, Reply0)))
117 )
118 ; mark_alive(Ps, Connection),
119 Status = Status0,
120 Reply = Reply0
121 ).
122
123get_timeout_option(Params, Timeout, Params1) :-
124 ( del_dict(timeout, Params, Timeout, Params1)
125 -> true
126 ; Timeout = infinite,
127 Params1 = Params
128 ).
129
130compose_url(uri_components(Scheme, Authority, Path, Search, Fragment), Context, Params, URL) :-
131 atom_concat(Path, Context, NewPath),
132 dict_pairs(Params, _, Pairs0),
133 ( nonvar(Search)
134 -> uri_query_components(Search, Pairs1),
135 append(Pairs1, Pairs0, Pairs)
136 ; Pairs = Pairs0
137 ),
138 uri_query_components(NewSearch, Pairs),
139 uri_components(URL, uri_components(Scheme, Authority, NewPath, NewSearch, Fragment)).
140
141handle_status(Status, Reply, RetryOnStatus, Ignore, Success, Retry) :-
142 debug(transport, 'status code ~w, reply ~w', [Status, Reply]),
143 ( once((Status >= 200, Status < 300; memberchk(Status, Ignore)))
144 -> Success = true
145 ; match_status_and_throw_immediately(Status, Reply),
146 ( memberchk(Status, RetryOnStatus)
147 -> Retry = true
148 ; Retry = false
149 ),
150 Success = false
151 ).
152
153match_status_and_throw_immediately(Status, Reply) :-
154 ( memberchk(Status, [400, 401, 403, 404, 409])
155 -> throw(error(plasticsearch_exception(Status, Reply)))
156 ; true
157 ).
158
159handle_exception(E, RetryOnTimeout, false, Retry) :-
160 debug(transport, 'Exception ~w', [E]),
161 ( E = error(socket_error(_), _)
162 -> Retry = true
163 ; ( E = error(timeout_error(_, _) ,_)
164 -> Retry = RetryOnTimeout
165 ; Retry = false
166 )
167 )
Transport related logic.