1:- module(http2_client, [http2_open/3,
    2                         http2_close/1,
    3                         http2_request/4]).

HTTP/2 client

author
- James Cash */
    9:- use_module(library(predicate_options)).   10:- use_module(library(list_util), [split_at/4]).   11:- use_module(library(ssl), [ssl_context/3,
   12                             ssl_negotiate/5,
   13                             cert_accept_any/5]).   14:- use_module(library(socket), [tcp_connect/3,
   15                                tcp_select/3,
   16                                tcp_host_to_address/2]).   17:- use_module(library(url), [parse_url/2]).   18:- use_module(library(record)).   19:- use_module(frames).   20:- use_module(hpack, [lookup_header/3]).   21
   22:- multifile prolog:message//1.   23prolog:message(unknown_frame(Code, In, State)) -->
   24    [ "Unknown HTTP/2 frame ~w: ~w~nState: ~w"-[Code, In, State] ].
   25prolog:message(bad_frame(State, In)) -->
   26    [ "Couldn't read frame from ~w~nState: ~w"-[In, State] ].
   27prolog:message(connection_closed(Error, Data, State)) -->
   28    [ "Connection closed with error code ~w: ~w~nClient state: ~w"-[Error, Data, State] ].
   29prolog:message(worker_died) --> [ "HTTP/2 client worker thread died"-[] ].
   30
   31connection_preface(`PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n`).
   32
   33default_complete_cb(Headers, _Body) :-
   34    debug(http2_client(open), "Complete without callback set ~w", [Headers]).
   35% [TODO] store state of connection, to determine what's valid to recieve/send
   36:- record http2_stream(headers=[],
   37                       data=[],
   38                       done=false,
   39                       complete_cb=default_complete_cb).
   40
   41default_close_cb(Data) :-
   42    debug(http2_client(open), "Connection closed without callback set ~w", [Data]).
   43
   44:- record http2_state(authority=false,
   45                      stream=false,
   46                      settings=settings{header_table_size: 4096,
   47                                        enable_push: 0, %1,
   48                                        max_concurrent_streams: unlimited,
   49                                        initial_window_size: 65535,
   50                                        max_frame_size: 16384,
   51                                        max_header_list_size: unlimited},
   52                      recv_header_table=[],
   53                      recv_header_table_size=4096,
   54                      send_header_table=[],
   55                      send_header_table_size=4096,
   56                      next_stream_id=1,
   57                      last_stream_id=0,
   58                      substreams=streams{},
   59                      close_cb=default_close_cb).
   60
   61:- predicate_options(http2_open/3, 3, [close_cb(callable),
   62                                       pass_to(ssl_context/3)]).   63:- record http2_ctx(stream=false,                    worker_thread_id=false).
 http2_open(+URL, -HTTP2Ctx, +Options) is det
Open Stream as an HTTP/2 connection to URL's host.
Arguments:
Options- passed to ssl_context/3. http2_open-specific options:
close_cb(Callable)
Predicate to be called when the connection receives a GOAWAY frame.
   71http2_open(URL, Http2Ctx, Options) :-
   72    % Open TLS connection
   73    parse_url(URL, [protocol(https),host(Host)|Attrs]),
   74    (memberchk(port(Port), Attrs) ; Port = 443), !,
   75    debug(http2_client(open), "URL ~w -> Host ~w:~w", [URL, Host, Port]),
   76    ssl_context(client, Ctx, [host(Host),
   77                              close_parent(true),
   78                              alpn_protocols([h2]),
   79                              cacert_file(system(root_certificates))
   80                              |Options]),
   81    tcp_host_to_address(Host, Address),
   82    debug(http2_client(open), "Host ~w -> Address ~w", [Host, Address]),
   83    tcp_connect(Address:Port, PlainStreamPair, []),
   84    debug(http2_client(open), "Connected", []),
   85    stream_pair(PlainStreamPair, PlainRead, PlainWrite),
   86    set_stream(PlainRead, buffer(false)),
   87    ssl_negotiate(Ctx, PlainRead, PlainWrite,
   88                  SSLRead, SSLWrite),
   89    debug(http2_client(open), "Negotiated", []),
   90    stream_pair(Stream, SSLRead, SSLWrite),
   91    % HTTP/2 connection starts with preface...
   92    connection_preface(ConnectionPreface),
   93    format(Stream, "~s", [ConnectionPreface]),
   94    % ...then SETTINGS frame
   95    send_frame(Stream, settings_frame([enable_push-0])),
   96    flush_output(Stream),
   97    % XXX: ...then we read a SETTINGS from from server & ACK it
   98    (memberchk(close_cb(CloseCb), Options), ! ; CloseCb = default_close_cb),
   99    make_http2_state([authority(Host),
  100                      stream(Stream),
  101                      close_cb(CloseCb)],
  102                     State),
  103    thread_create(listen_socket(State), WorkerThreadId, [at_exit(warn_worker_died(Stream, CloseCb))]),
  104    make_http2_ctx([stream(Stream), worker_thread_id(WorkerThreadId)],
  105                   Http2Ctx).
  106
  107warn_worker_died(Stream, CloseCb) :-
  108    thread_self(ThreadId),
  109    (thread_property(ThreadId, status(exception(finished)))
  110    -> debug(http2_client(open), "Worker thread exited normally", [])
  111    ;  (print_message(warning, worker_died),
  112        close(Stream),
  113        thread_property(ThreadId, status(Status)),
  114        call(CloseCb, _{cause: Status,
  115                        msg: "Worker thread died"}))).
 http2_close(+Ctx) is det
Close the given stream.
  120http2_close(Http2Ctx) :-
  121    http2_ctx_worker_thread_id(Http2Ctx, ThreadId),
  122    thread_send_message(ThreadId, done).
  123
  124:- meta_predicate http2_request(+, +, +, 2).
 http2_request(+Stream, +Headers, +Body, :Response) is det
Send an HTTP/2 request using the previously-opened HTTP/2 connection Stream.
See also
- http2_open/3
  130http2_request(Ctx, Headers, Body, ResponseCb) :-
  131    debug(http2_client(request), "Sending request ~w", [Ctx]),
  132    http2_ctx_worker_thread_id(Ctx, WorkerId),
  133    Msg = request{headers: Headers,
  134                  body: Body,
  135                  on_complete: ResponseCb},
  136    thread_send_message(WorkerId, Msg).
  137
  138% Worker thread
  139
  140listen_socket(State0) :-
  141    http2_state_stream(State0, Stream),
  142    stream_to_lazy_list(Stream, StreamList),
  143    listen_socket(State0, StreamList).
  144listen_socket(State0, StreamList0) :-
  145    thread_self(ThreadId),
  146    (thread_get_message(ThreadId, Msg, [timeout(0)])
  147    -> (debug(http2_client(request), "Client msg ~k", [Msg]),
  148        handle_client_request(Msg, State0, State1),
  149        debug(http2_client(request), "Msg sent new state ~w", [State1]))
  150    ;  State1 = State0), !,
  151
  152    http2_state_stream(State1, Stream),
  153    tcp_select([Stream], Input, 0),
  154    (( Input = [Stream] ; \+ attvar(StreamList0) )
  155    -> (debug(http2_client(response), "Data available", []),
  156        read_frame(State1, StreamList0, State2, StreamList1),
  157        debug(http2_client(response), "Read data, rest ~w", [StreamList1]))
  158    ; (State1 = State2, StreamList1 = StreamList0)),
  159
  160    listen_socket(State2, StreamList1).
  161
  162worker_shutdown(State, Cause) :-
  163    http2_state_stream(State, Stream),
  164    close(Stream),
  165    debug(http2_client(open), "...closed", []),
  166
  167    http2_state_close_cb(State, CloseCb),
  168    http2_state_last_stream_id(State, LastStreamId),
  169    call(CloseCb, _{last_stream_id: LastStreamId,
  170                    cause: Cause}),
  171    throw(finished).
  172
  173% Worker thread - sending requests
  174
  175handle_client_request(done, State, _) :-
  176    http2_state_stream(State, Stream),
  177    http2_state_last_stream_id(State, LastId),
  178    debug(http2_client(open), "Closing connection ~w...", [LastId]),
  179    send_frame(Stream, goaway_frame(LastId, 0, [])),
  180    flush_output(Stream),
  181    worker_shutdown(State, "Client closed").
  182handle_client_request(Msg, State0, State4) :-
  183    Msg = request{headers: Headers_,
  184                  body: Body,
  185                  on_complete: ResponseCb},
  186    http2_state_authority(State0, Authority),
  187    Headers = [':authority'-Authority,':scheme'-https|Headers_],
  188    http2_state_next_stream_id(State0, Ident),
  189    stream_info(State0, Ident, StreamInfo0),
  190    set_complete_cb_of_http2_stream(ResponseCb, StreamInfo0, StreamInfo1),
  191    update_state_substream(Ident, StreamInfo1, State0, State1),
  192    NextIdent is Ident + 2,
  193    set_next_stream_id_of_http2_state(NextIdent, State1, State2),
  194    debug(http2_client(request), "Sending headers ~w ~w", [Headers, State1]),
  195    (Body = [] -> HeadersEnd = true ; HeadersEnd = false),
  196    send_request_headers(Headers, Ident, HeadersEnd, State2, State3),
  197    send_request_body(Body, Ident, State3, State4),
  198    http2_state_stream(State4, Stream),
  199    flush_output(Stream).
  200
  201send_request_headers(Headers_, Ident, EndStream, State0, State1) :-
  202    http2_state_send_header_table(State0, Table0),
  203    wrapped_headers(Table0, Headers_, Headers),
  204    http2_state_send_header_table_size(State0, TableSize),
  205    http2_state_stream(State0, Stream),
  206    http2_state_settings(State0, Settings),
  207    MaxSize = Settings.max_frame_size,
  208    send_frame(Stream,
  209               header_frames(MaxSize,
  210                             Ident, Headers, TableSize-Table0-TableSize1-Table1,
  211                             [end_headers(true), end_stream(EndStream)])),
  212    debug(http2_client(request), "Sent headers", []),
  213    set_http2_state_fields(
  214        [send_header_table(Table1),
  215         send_header_table_size(TableSize1)],
  216        State0, State1).
  217
  218send_request_body([], _, State, State) :- !.
  219send_request_body(Body, Ident, State0, State0) :-
  220    % [TODO] make end_stream configurable? If wanting to do streaming
  221    % or something?
  222    http2_state_stream(State0, Stream),
  223    http2_state_settings(State0, Settings),
  224    MaxSize = Settings.max_frame_size,
  225    send_body_parts(Stream, Ident, MaxSize, Body).
  226
  227send_body_parts(_, _, _, []) :- !.
  228send_body_parts(Stream, Ident, MaxSize, Body) :-
  229    length(Body, BodyL),
  230    BodyL =< MaxSize, !,
  231    send_frame(Stream,
  232              data_frame(Ident, Body, [end_stream(true)])).
  233send_body_parts(Stream, Ident, MaxSize, Body) :-
  234    split_at(MaxSize, Body, ToSend, Rest),
  235    send_frame(Stream, data_frame(Ident, ToSend, [])),
  236    send_body_parts(Stream, Ident, MaxSize, Rest).
  237
  238wrapped_headers(_, [], []) :- !.
  239wrapped_headers(Table, [K-V|RestH], [indexed(K-V)|RestW]) :-
  240    lookup_header(Table, K-V, _), !,
  241    wrapped_headers(Table, RestH, RestW).
  242wrapped_headers(Table, [K-V|RestH], [literal_inc(K-V)|RestW]) :-
  243    !, wrapped_headers(Table, RestH, RestW).
  244wrapped_headers(Table, [KV|RestH], [KV|RestW]) :-
  245    wrapped_headers(Table, RestH, RestW).
  246
  247% Worker thread - recieving data from server
  248
  249read_frame(State0, In, State2, Rest) :-
  250    phrase(frames:frame(Type, Flags, Ident, Payload),
  251           In, Rest), !,
  252    debug(http2_client(response), "Read frame type ~w", [Type]),
  253    phrase(frames:frame(Type, Flags, Ident, Payload), Bytes),
  254    http2_state_last_stream_id(State0, LastIdent),
  255    NewLastIdent is max(LastIdent, Ident),
  256    set_last_stream_id_of_http2_state(NewLastIdent, State0, State1),
  257    debug(http2_client(response), "Update last seen frame ~w", [NewLastIdent]),
  258    handle_frame(Type, Ident, State1, Bytes, State2),
  259    debug(http2_client(response), "Handled frame", []).
  260read_frame(State, In, _, _) :-
  261    print_message(warning, bad_frame(State, In)),
  262    !, fail.
  263
  264handle_frame(0x0, _, State0, In, State2) :- % data frame
  265    phrase(data_frame(Ident, Data, [end_stream(End)]), In), !,
  266    length(Data, DataL),
  267    debug(http2_client(response), "Data on stream ~w # = ~w end? ~w", [Ident, DataL, End]),
  268    stream_info(State0, Ident, StreamInfo0),
  269    http2_stream_data(StreamInfo0, OldData),
  270    append(OldData, Data, NewData),
  271    set_http2_stream_fields([data(NewData), done(End)],
  272                            StreamInfo0, StreamInfo1),
  273    update_state_substream(Ident, StreamInfo1, State0, State1),
  274    (End -> complete_client(Ident, State1, State2) ; State2 = State1).
  275handle_frame(0x1, Ident, State0, In, State3) :- % headers frame
  276    http2_state_recv_header_table(State0, HeaderTable0),
  277    http2_state_recv_header_table_size(State0, TableSize),
  278    phrase(header_frame(Ident,
  279                        Headers,
  280                        TableSize-HeaderTable0-TableSize1-HeaderTable1,
  281                        % Ignoring priority
  282                        [end_stream(EndStream),
  283                         end_headers(EndHeaders)]),
  284          In), !,
  285    debug(http2_client(response), "Header frame ~w", [Headers]),
  286    stream_info(State0, Ident, StreamInfo),
  287    http2_stream_headers(StreamInfo, PreviousHeaders),
  288    append(PreviousHeaders, Headers, NewHeaders),
  289    debug(http2_client(response), "NEW HEADERS ~w", [NewHeaders]),
  290    set_http2_stream_fields([done(EndStream),
  291                             headers(NewHeaders)],
  292                            StreamInfo, StreamInfo1),
  293    update_state_substream(Ident, StreamInfo1, State0, State1),
  294    set_http2_state_fields([recv_header_table(HeaderTable1),
  295                            recv_header_table_size(TableSize1)],
  296                           State1, State2),
  297    ((EndStream, EndHeaders)
  298    -> complete_client(Ident, State2, State3)
  299    ;  State3 = State2).
  300handle_frame(0x2, _Ident, State0, _In, State0). % priority frame
  301handle_frame(0x3, Ident, State0, In, State2) :- % rst frame
  302    phrase(rst_frame(Ident, ErrCode), In), !,
  303    debug(http2_client(response), "Rst frame ~w ~w", [Ident, ErrCode]),
  304    stream_info(State0, Ident, StreamInfo0),
  305    set_done_of_http2_stream(true, StreamInfo0, StreamInfo1),
  306    update_state_substream(Ident, StreamInfo1, State0, State1),
  307    complete_client(Ident, State1, State2).
  308handle_frame(0x4, _, State0, In, State0) :- % settings ack frame
  309    phrase(settings_ack_frame, In), !.
  310handle_frame(0x4, _, State0, In, State1) :- % settings frame
  311    debug(http2_client(response), "read settings ~w", [In]),
  312    phrase(settings_frame(UpdateSettings), In), !,
  313    debug(http2_client(response), "Settings ~w", [UpdateSettings]),
  314    http2_state_settings(State0, Settings),
  315    update_settings(Settings, UpdateSettings, NewSettings),
  316    NewTableSize = NewSettings.header_table_size,
  317    http2_state_recv_header_table(State0, OldTable),
  318    hpack:keep_fitting(NewTableSize, OldTable, NewTable),
  319    set_http2_state_fields([settings(NewSettings),
  320                            recv_header_table(NewTable),
  321                            recv_header_table_size(NewTableSize)],
  322                           State0, State1),
  323    % [TODO] validate new size
  324    % send ACK
  325    http2_state_stream(State1, Stream),
  326    send_frame(Stream, settings_ack_frame), flush_output(Stream).
  327handle_frame(0x5, Ident, State0, In, State2) :- % push promise frame
  328    http2_state_recv_header_table(State0, TableIn),
  329    http2_state_recv_header_table_size(State0, TableSize),
  330    phrase(push_promise_frame(Ident, NewIdent, TableSize-TableIn-TableSizeOut-TableOut-Headers,
  331                              [end_headers(_EndHeaders)]),
  332          In), !,
  333    debug(http2_client(response), "Push promise Stream ~w headers ~w", [NewIdent, Headers]),
  334
  335    stream_info(State0, NewIdent, StreamInfo0),
  336    set_headers_of_http2_stream(Headers, StreamInfo0, StreamInfo1),
  337    update_state_substream(NewIdent, StreamInfo1, State0, State1),
  338
  339    http2_state_last_stream_id(State1, LastStreamId),
  340    NewLastId is max(LastStreamId, NewIdent),
  341    set_http2_state_fields([last_stream_id(NewLastId),
  342                            recv_header_table(TableOut),
  343                            recv_header_table_size(TableSizeOut)],
  344                           State1, State2).
  345handle_frame(0x6, _, State, In, State) :- % ping frame
  346    phrase(ping_frame(_, Ack), In), !,
  347    (Ack
  348    ; (http2_state_stream(State, Stream),
  349       send_frame(Stream, ping_frame(`12345678`, true)))).
  350handle_frame(0x7, _, State0, In, State0) :- % goaway frame
  351    phrase(goaway_frame(LastStreamId, Error, Data), In),
  352    debug(http2_client(response), "GOAWAY frame: ~w ~w ~w", [LastStreamId, Error, Data]),
  353    (Error = 0 ; print_message(warning, connection_closed(Error, Data, State0))),
  354    worker_shutdown(State0, _{msg: "goaway frame",
  355                              error: Error,
  356                              data: Data}).
  357handle_frame(0x8, _, State0, In, State0) :- % window frame
  358    phrase(window_update_frame(Ident, Increment), In), !,
  359    debug(http2_client(response), "window frame ~w ~w", [Ident, Increment]),
  360    % [TODO] update flow control state for the stream
  361    true.
  362handle_frame(0x9, Ident, State0, In, State3) :- % continuation frame
  363    http2_state_recv_header_table(State0, HeaderTable0),
  364    http2_state_recv_header_table_size(State0, TableSize),
  365    phrase(continuation_frame(Ident,
  366                              TableSize-HeaderTable0-TableSizeOut-HeaderTable1-Headers,
  367                              EndHeaders),
  368          In),
  369    stream_info(State0, Ident, StreamInfo),
  370    http2_stream_headers(StreamInfo, PreviousHeaders),
  371    append(PreviousHeaders, Headers, NewHeaders),
  372    % Handle EndHeaders? (end of headers = means end of stream if the
  373    % previous header frame was end-of-stream but not end-of-headers)
  374    set_headers_of_http2_stream(NewHeaders, StreamInfo, StreamInfo1),
  375    update_state_substream(Ident, StreamInfo1, State0, State1),
  376    set_http2_state_fields([recv_header_table(HeaderTable1),
  377                            recv_header_table_size(TableSizeOut)],
  378                           State1, State2),
  379    http2_stream_done(StreamInfo1, StreamDone),
  380    ((StreamDone, EndHeaders)
  381    -> complete_client(Ident, State2, State3)
  382    ;  State3 = State2).
  383handle_frame(Code, _, State, In, State) :-
  384    print_message(warning, unknown_frame(Code, In, State)),
  385    !, fail.
  386
  387% Worker thread - Completing a request
  388
  389complete_client(Ident, State0, State1) :-
  390    stream_info(State0, Ident, StreamInfo),
  391    notify_client_done(StreamInfo),
  392    remove_state_substream(Ident, State0, State1).
  393
  394notify_client_done(StreamInfo) :-
  395    http2_stream_complete_cb(StreamInfo, Cb),
  396    http2_stream_headers(StreamInfo, Headers),
  397    http2_stream_data(StreamInfo, Body),
  398    catch(call(Cb, Headers, Body),
  399          Err,
  400          debug(http2_client(request), "Error invoking cb ~w", [Err])).
  401
  402% Other helper predicates
  403
  404:- meta_predicate send_frame(+, :).  405send_frame(Stream, Frame) :-
  406    debug(http2_client(request), "sending frame ~w", [Frame]),
  407    phrase(Frame, FrameCodes), !,
  408    format(Stream, "~s", [FrameCodes]).
  409
  410update_settings(New, [], New).
  411update_settings(Old, [K-V|Rest], New) :-
  412    put_dict(K, Old, V, Update),
  413    update_settings(Update, Rest, New).
  414
  415stream_info(State, Ident, Stream) :-
  416    http2_state_substreams(State, Streams),
  417    (get_dict(Ident, Streams, Stream) ; make_http2_stream([], Stream)).
  418
  419update_state_substream(Ident, StreamInfo, State0, State1) :-
  420    http2_state_substreams(State0, Streams0),
  421    put_dict(Ident, Streams0, StreamInfo, Streams1),
  422    set_substreams_of_http2_state(Streams1, State0, State1).
  423
  424remove_state_substream(Ident, State0, State1) :-
  425    http2_state_substreams(State0, Streams0),
  426    del_dict(Ident, Streams0, _, Streams1),
  427    set_substreams_of_http2_state(Streams1, State0, State1)