View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           https://www.swi-prolog.org
    6    Copyright (c)  2025, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(json_rpc_client,
   36          [ json_call/4,          % +Stream, +Goal, -Result, +Options
   37            json_notify/3,        % +Stream, +Goal, +Options
   38            json_batch/5,         % +Stream, +Notifications, +Calls, -Results, +Options
   39            json_full_duplex/2    % +Stream, :Options
   40          ]).   41:- autoload(library(json), [json_write_dict/3, json_read_dict/3]).   42:- autoload(library(option), [option/2]).   43:- use_module(library(debug), [debug/3]).   44:- autoload(library(apply), [maplist/4, maplist/3]).   45:- autoload(library(lists), [append/3, member/2]).   46:- autoload(library(terms), [mapsubterms/3]).   47:- autoload(library(http/http_stream), [stream_range_open/3]).   48
   49:- meta_predicate
   50    json_full_duplex(+, :).

JSON RPC client

This module implements a JSON RPC compliant client. The three predicates require a stream pair (see stream_pair/2) that connects us to a JSON RPC server.

*/

   60:- dynamic
   61    json_result_queue/2,                        % Stream, Queue
   62    failed_id/2.                                % Queue, Id
 json_call(+Stream, +Goal, -Result, +Options) is det
Run Goal on a JSON RPC service identified by Stream and wait for Result. This predicate may be called from multiple threads. As replies come in in arbitrary order, this predicate starts a thread the reads the replies from Stream and informs the calling thread using a Prolog message queue.

If Stream is closed this library terminates the thread and related message queue.

Arguments:
Goal- is a callable term. The functor name is the method. If there is a single argument that is a dict, we invoke a JSON-RPC method using named arguments. If there is a single argument that is a list, use the elements of the list as positional arguments. If there are zero or more than one arguments use these as positional arguments. Examples:
TermMethodTypeJSON (params)

f(#{a:1,b:2})fnamed{"a":1, "b":2}
f(["a", 42])fpositional["a", 42]
f([#{"a":1}])fpositional[{"a":1}]
f()fpositional[]
f("a", 42)fpositional["a", 42]

Options processed:

   93json_call(Stream, Goal, Result, Options) :-
   94    Goal =.. [Name|Args0],
   95    call_args(Args0, Args),
   96    client_id(Id, Options),
   97    debug(json_rpc, 'Sending request ~p', [Id]),
   98    json_send(Stream,
   99              #{ jsonrpc: "2.0",
  100                 id: Id,
  101                 method: Name,
  102                 params: Args
  103               }, Options),
  104    setup_call_catcher_cleanup(
  105        true,
  106        json_wait_reply(Stream, Id, Result, Options),
  107        Catcher,
  108        client_cleanup(Catcher, Stream, Id)).
  109
  110call_args([Arg], Args), is_dict(Arg) =>
  111    Args = Arg.
  112call_args([Args0], Args), is_list(Args0) =>
  113    Args = Args0.
  114call_args(Args0, Args) =>
  115    Args = Args0.
  116
  117json_wait_reply(Stream, Id, Result, Options) :-
  118    with_mutex(json_rpc_client,
  119               get_json_result_queue(Stream, Queue, Options)),
  120    debug(json_rpc, 'Waiting for reply', []),
  121    (   thread_get_message(Queue, done(Id, Result0), Options)
  122    ->  map_reply(Result0, Result1, Options),
  123        debug(json_rpc, 'Got reply for ~p', [Id]),
  124        (   Result1 = throw(Error)
  125        ->  throw(Error)
  126        ;   Result1 = true(Result)
  127        )
  128    ;   assertz(failed_id(Queue, Id)),
  129        fail
  130    ).
  131
  132map_reply(Reply0, Reply, Options) :-
  133    option(value_string_as(atom), Options),
  134    !,
  135    mapsubterms(map_string, Reply0, Reply).
  136map_reply(Reply, Reply, _).
  137
  138map_string(String, Atom) :-
  139    string(String),
  140    atom_string(Atom,String).
  141
  142client_id(Id, Options) :-
  143    option(id(Id), Options),
  144    !.
  145client_id(Id, _Options) :-
  146    flag(json_client_id, Id, Id+1).
  147
  148client_cleanup(exit, _, _) =>
  149    true.
  150client_cleanup(_, Stream, Id) =>
  151    json_result_queue(Stream, Queue),
  152    assertz(failed_id(Queue, Id)).
 json_notify(+Stream, +Goal, +Options) is det
Run Goal on a JSON RPC service identified by Stream without waiting for the result.
  159json_notify(Stream, Goal, Options) :-
  160    Goal =.. [Name|Args0],
  161    call_args(Args0, Args),
  162    json_send(Stream,
  163              #{ jsonrpc: "2.0",
  164                 method: Name,
  165                 params: Args
  166               }, Options).
 json_batch(+Stream, +Notifications:list, +Calls:list, -Results:list, +Options) is det
Run a batch of notifications and normal calls on the JSON server at the other end of Stream. On success, Result is unified to a list with the same length as Calls. Each element either contains a value, similar to json_call/4 or a term error(Dict), where Dict holds the code, message and optional data field. Note that error(Dict) is not a valid JSON type and this is thus unambiguous. While the JSON RPC standard allows the server to process the messages in any order and allows for concurrent processing, all results are sent in one message and this client ensures the elements of the Results list are in the same order as the Calls list. If the Calls list is empty this predicate does not wait for a reply.
  183json_batch(Stream, Notifications, Calls, Results, Options) :-
  184    maplist(call_to_json_request, Calls, IDs, Requests1),
  185    maplist(call_to_json_notification, Notifications, Requests2),
  186    append(Requests1, Requests2, Batch),
  187    json_send(Stream, Batch, Options),
  188    flush_output(Stream),
  189    (   IDs == []
  190    ->  true
  191    ;   batch_id(IDs, Id),
  192        json_wait_reply(Stream, Id, Results0, Options),
  193        sort(id, <, Results0, Results1),
  194        maplist(batch_result, Results1, Results)
  195    ).
  196
  197call_to_json_request(Goal, Id, Request) :-
  198    Goal =.. [Name|Args],
  199    client_id(Id, []),
  200    Request = #{ jsonrpc: "2.0",
  201                 id: Id,
  202                 method: Name,
  203                 params: Args
  204               }.
  205
  206call_to_json_notification(Goal, Notification) :-
  207    Goal =.. [Name|Args],
  208    Notification = #{ jsonrpc: "2.0",
  209                      method: Name,
  210                      params: Args
  211                    }.
  212
  213batch_id(IDs, Id) :-
  214    sort(IDs, Canonical),
  215    variant_sha1(Canonical, Id).
  216
  217batch_result(Reply, Result), Result0 = Reply.get(result) =>
  218    Result = Result0.
  219batch_result(Reply, Result), Result0 = Reply.get(error) =>
  220    Result = error(Result0).
 json_send(+Stream, +Dict, +Options)
  224json_send(Stream, Dict, Options) :-
  225    option(header(true), Options),
  226    !,
  227    with_output_to(string(Msg),
  228                   json_write_dict(current_output, Dict, Options)),
  229    utf8_length(Msg, Len),
  230    format(Stream,
  231           'Content-Length: ~d\r\n\r\n~s', [Len, Msg]),
  232    flush_output(Stream).
  233json_send(Stream, Dict, Options) :-
  234    with_output_to(Stream,
  235                   json_write_dict(Stream, Dict, Options)),
  236    flush_output(Stream).
  237
  238utf8_length(String, Len) :-
  239    setup_call_cleanup(
  240        open_null_stream(Null),
  241        (   set_stream(Null, encoding(utf8)),
  242            format(Null, '~s', [String]),
  243            flush_output(Null),
  244            byte_count(Null, Len)
  245        ),
  246        close(Null)).
  247
  248                /*******************************
  249                *        INCOMMING DATA        *
  250                *******************************/
 json_full_duplex(+Stream, :Options) is det
Start the thread for incomming data and on requests, dispatch them using library(jso_rpc_server) in the module derived from the Options list.
  258json_full_duplex(Stream, Options) :-
  259    with_mutex(json_rpc_client, json_full_duplex_(Stream, Options)).
  260
  261json_full_duplex_(Stream, _) :-
  262    json_result_queue(Stream, _Queue),
  263    !,
  264    permission_error(json, full_duplex, Stream).
  265json_full_duplex_(Stream, M:Options) :-
  266    get_json_result_queue(Stream, _Queue,
  267                          [server_module(M)|Options]).
 get_json_result_queue(+Stream, -Queue, +Options) is det
Find the result queue associated to Stream. If this does not exist, create one, as well as a thread that handles the incomming results and dispatches these to the queue.
  276get_json_result_queue(Stream, Queue, _Options) :-
  277    json_result_queue(Stream, Queue),
  278    !.
  279get_json_result_queue(Stream, Queue, Options) :-
  280    message_queue_create(Queue),
  281    asserta(json_result_queue(Stream, Queue)),
  282    flag(json_rpc_client_dispatcher, N, N+1),
  283    format(atom(Alias), 'json_rpc_client:~w', [N]),
  284    thread_create(
  285        handle_result_loop(Stream, Options),
  286        _Id,
  287        [ detached(true),
  288          alias(Alias),
  289          inherit_from(main),
  290          at_exit(cleanup_client(Stream))
  291        ]).
  292
  293handle_result_loop(Stream, Options) :-
  294    handle_result(Stream, EOF, Options),
  295    (   EOF == true
  296    ->  true
  297    ;   handle_result_loop(Stream, Options)
  298    ).
  299
  300handle_result(Stream, EOF, Options) :-
  301    Error = error(Formal, _),
  302    catch(json_receive(Stream, Reply, Options),
  303          Error,
  304          true),
  305    debug(json_rpc, 'Received ~p', [Reply]),
  306    (   Reply == end_of_file(true)
  307    ->  EOF = true
  308    ;   var(Formal)
  309    ->  handle_reply(Stream, Reply, Options)
  310    ;   handle_error(Error, EOF)
  311    ).
  312
  313json_receive(Stream, Reply, Options) :-
  314    option(header(true), Options),
  315    !,
  316    read_header(Stream, Lines),
  317    (   Lines == []
  318    ->  Reply = end_of_file(true)
  319    ;   header_content_length(Lines, Length),
  320        setup_call_cleanup(
  321            stream_range_open(Stream, Data, [size(Length)]),
  322            json_read_dict(Data,
  323                           Reply,
  324                           Options),
  325            close(Data))
  326    ).
  327json_receive(Stream, Reply, Options) :-
  328    json_read_dict(Stream,
  329                   Reply,
  330                   [ end_of_file(end_of_file(true))
  331                   | Options
  332                   ]).
  333
  334read_header(Stream, Lines) :-
  335    read_string(Stream, "\n", "\r\t ", Sep, Line),
  336    (   (Line == "" ; Sep == -1)
  337    ->  Lines = []
  338    ;   Lines = [Line|Rest],
  339        read_header(Stream, Rest)
  340    ).
  341
  342header_content_length(Lines, Length) :-
  343    member(Line, Lines),
  344    split_string(Line, ":", "\t\s", [Field,Value]),
  345    string_lower(Field, "content-length"),
  346    !,
  347    number_string(Length, Value).
  348
  349handle_reply(Stream, Batch, _Options),
  350    is_list(Batch) =>
  351    maplist(get_dict(id), Batch, IDs),
  352    batch_id(IDs, Id),
  353    json_result_queue(Stream, Queue),
  354    send_done(Queue, Id, true(Batch)).
  355handle_reply(Stream, Reply, _Options),
  356    #{ jsonrpc: "2.0",
  357       result: Result,
  358       id: Id } :< Reply =>
  359    json_result_queue(Stream, Queue),
  360    send_done(Queue, Id, true(Result)).
  361handle_reply(Stream, Reply, _Options),
  362    #{ jsonrpc: "2.0",
  363       error: Error,
  364       id: Id } :< Reply =>
  365    json_result_queue(Stream, Queue),
  366    send_done(Queue, Id, throw(error(json_rpc_error(Error), _))).
  367handle_reply(Stream, Request, Options),
  368    #{ jsonrpc: "2.0",
  369       method: _Method,
  370       params: _Params } :< Request =>
  371    option(server_module(M), Options),
  372    json_rpc_server:json_rpc_dispatch_request(M, Stream, Request, Options).
  373
  374
  375send_done(Queue, Id, _Data) :-
  376    retract(failed_id(Queue, Id)),
  377    !.
  378send_done(Queue, Id, Data) :-
  379    thread_send_message(Queue, done(Id, Data)),
  380    clean_dead_requests(Queue).
  381
  382clean_dead_requests(Queue) :-
  383    forall(failed_id(Queue, Id),
  384           cleanup_dead_id(Queue, Id)).
  385
  386cleanup_dead_id(Queue, Id) :-
  387    (   thread_get_message(Queue, done(Id, _), [timeout(0)])
  388    ->  retract(failed_id(Queue, Id))
  389    ;   true
  390    ).
  391
  392handle_error(error(existence_error(stream, _), _), EOF) =>
  393    EOF = true.
  394handle_error(Error, _EOF) =>
  395    print_message(error, Error).
 cleanup_client(+Stream) is det
Thread exit handler to remove the registration and queue.
  401cleanup_client(Stream) :-
  402    forall(retract(json_result_queue(Stream, Queue)),
  403           do_cleanup(Stream, Queue)).
  404
  405do_cleanup(Stream, Queue) :-
  406    close(Stream, [force(true)]),
  407    message_queue_destroy(Queue)