View source with raw comments or as raw
    1/*  Prolog Machine Query Interface
    2    Author:        Eric Zinda
    3    E-mail:        ericz@inductorsoftware.com
    4    WWW:           http://www.inductorsoftware.com
    5    Copyright (c)  2021, Eric Zinda
    6    All rights reserved.
    7
    8        Redistribution and use in source and binary forms, with or without
    9    modification, are permitted provided that the following conditions
   10    are met:
   11
   12    1. Redistributions of source code must retain the above copyright
   13       notice, this list of conditions and the following disclaimer.
   14
   15    2. Redistributions in binary form must reproduce the above copyright
   16       notice, this list of conditions and the following disclaimer in
   17       the documentation and/or other materials provided with the
   18       distribution.
   19
   20    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   21    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   22    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   23    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   24    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   25    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   26    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   27    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   28    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   29    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   30    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   31    POSSIBILITY OF SUCH DAMAGE.
   32*/
   33
   34:- module(mqi,
   35          [ mqi_start/0,
   36            mqi_start/1,                % +Options
   37            mqi_stop/1                  % ?Thread
   38          ]).
 mqi_start(+Options:list) is semidet
Starts a Prolog Machine Query Interface ('MQI') using Options. The MQI is normally started automatically by a library built for a particular programming language such as the swiplserver Python library, but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on "Standalone Mode" for more information.

Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages. The messages processed by the MQI are described below.

For debugging, the server outputs traces using the debug/3 predicate so that the server operation can be observed by using the debug/1 predicate. Run the following commands to see them:

  109:- use_module(library(socket)).  110:- use_module(library(http/json)).  111:- use_module(library(http/json_convert)).  112:- use_module(library(option)).  113:- use_module(library(term_to_json)).  114:- use_module(library(debug)).  115:- use_module(library(filesex)).  116:- use_module(library(gensym)).  117:- use_module(library(lists)).  118:- use_module(library(main)).  119:- use_module(library(make)).  120:- use_module(library(prolog_source)).  121:- use_module(library(time)).  122:- use_module(library(uuid)).  123
  124% One for every Machine Query Interface running
  125:- dynamic(mqi_thread/3).  126
  127% One for every active connection
  128:- dynamic(mqi_worker_threads/3).  129:- dynamic(mqi_socket/5).  130
  131% Indicates that a query is in progress on the goal thread or hasn't had its results drained
  132% Deleted once the last result from the queue has been drained
  133% Only deleted by the communication thread to avoid race conditions
  134:- dynamic(query_in_progress/1).  135
  136% Indicates to the communication thread that we are in a place
  137% that can be cancelled
  138:- dynamic(safe_to_cancel/1).  139
  140
  141% Password is carefully constructed to be a string (not an atom) so that it is not
  142% globally visible
  143% Add ".\n" to the password since it will be added by the message when received
  144mqi_start(Options) :-
  145    Encoding = utf8,
  146    option(pending_connections(Connection_Count), Options, 5),
  147    option(query_timeout(Query_Timeout), Options, -1),
  148    option(port(Port), Options, _),
  149    option(run_server_on_thread(Run_Server_On_Thread), Options, true),
  150    option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
  151    option(write_connection_values(Write_Connection_Values), Options, false),
  152    option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
  153    (   (   memberchk(unix_domain_socket(_), Options),
  154            var(Unix_Domain_Socket_Path_And_File)
  155        )
  156    ->  unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
  157    ;   true
  158    ),
  159    option(server_thread(Server_Thread_ID), Options, _),
  160    (   var(Server_Thread_ID)
  161    ->  gensym(mqi, Server_Thread_ID)
  162    ;   true
  163    ),
  164    option(password(Password), Options, _),
  165    (   var(Password)
  166    ->  (   current_prolog_flag(bounded, false)
  167        ->  uuid(UUID, [format(integer)])
  168        ;   UUID is random(1<<62)
  169        ),
  170        format(string(Password), '~d', [UUID])
  171    ;   true
  172    ),
  173    string_concat(Password, '.\n', Final_Password),
  174    bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
  175    send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
  176    option(write_output_to_file(File), Options, _),
  177    (   var(File)
  178    ->  true
  179    ;   write_output_to_file(File)
  180    ),
  181    Server_Goal = (
  182                    catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
  183                    debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
  184                 ),
  185    start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
  186
  187opt_type(port,                      port,                      natural).
  188opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
  189opt_type(unix_domain_socket,        unix_domain_socket,        file(write)).
  190opt_type(password,                  password,                  string).
  191opt_type(pending_connections,       pending_connections,       nonneg).
  192opt_type(query_timeout,             query_timeout,             float).
  193opt_type(run_server_on_thread,      run_server_on_thread,      boolean).
  194opt_type(exit_main_on_failure,      exit_main_on_failure,      boolean).
  195opt_type(write_connection_values,   write_connection_values,   boolean).
  196opt_type(write_output_to_file,      write_output_to_file,      file(write)).
  197
  198opt_help(port,                      "TCP/IP port for clients to connect to").
  199opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
  200opt_help(unix_domain_socket,        "File path for the Unix domain socket").
  201opt_help(password,                  "Connection password").
  202opt_help(pending_connections,       "Max number of queued connections (5)").
  203opt_help(query_timeout,             "Max query runtime in seconds (default infinite)").
  204opt_help(run_server_on_thread,      "Run server in a background thread (true)").
  205opt_help(exit_main_on_failure,      "Exit the process on a failure").
  206opt_help(write_connection_values,   "Print info for clients to connect").
  207opt_help(write_output_to_file,      "Write stdout and stderr to file").
 mqi_start is semidet
Main entry point for running the Machine Query Interface in "Embedded Mode" and designed to be called from the command line. Embedded Mode is used when launching the Machine Query Interface as an embedded part of another language (e.g. Python). Calling mqi_start/0 from Prolog interactively is not recommended as it depends on Prolog exiting to stop the MQI, instead use mqi_start/1 for interactive use.

To launch embedded mode:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true

This will start SWI Prolog and invoke the mqi_start/0 predicate and exit the process when that predicate stops. Any command line arguments after the standalone -- will be passed as Options. These are the same Options that mqi_start/1 accepts and are passed to it directly. Some options are expressed differently due to command line limitations, see mqi_start/1 Options for more information.

Any Option values that cause issues during command line parsing (such as spaces) should be passed with "" like this:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true \
                                      --password="HGJ SOWLWW WNDSJD"

For help on commandline options run

swipl -g mqi_start -- --help
  247% Turn off int signal when running in embedded mode so the client language
  248% debugger signal doesn't put Prolog into debug mode
  249% run_server_on_thread must be missing or true (the default) so we can exit
  250% properly
  251% create_unix_domain_socket=true/false is only used as a command line argument
  252% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line
  253% and have it interpreted as a variable.
  254mqi_start :-
  255    current_prolog_flag(argv, Argv),
  256    argv_options(Argv, _Args, Options),
  257    merge_options(Options, [exit_main_on_failure(true)], Options1),
  258    select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
  259    (   Create_Unix_Domain_Socket == true
  260    ->  merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
  261    ;   FinalOptions = Options2
  262    ),
  263    option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
  264    (   Run_Server_On_Thread == true
  265    ->  true
  266    ;   throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
  267    ),
  268    mqi_start(FinalOptions),
  269    on_signal(int, _, quit),
  270    thread_get_message(quit_mqi).
  271
  272
  273quit(_) :-
  274    thread_send_message(main, quit_mqi).
 mqi_stop(?Server_Thread_ID:atom) is det
If Server_Thread_ID is a variable, stops all Machine Query Interfaces and associated threads. If Server_Thread_ID is an atom, then only the MQI with that Server_Thread_ID is stopped. Server_Thread_ID can be provided or retrieved using Options in mqi_start/1.

Always succeeds.

  283% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session
  284% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died.
  285% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread)
  286% should be left so seeing abort warning messages in the console seems OK
  287mqi_stop(Server_Thread_ID) :-
  288    % First shut down any matching servers to stop new connections
  289    forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
  290        (
  291            debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
  292            catch(tcp_close_socket(Socket), Socket_Exception, true),
  293            abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
  294            debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
  295        )),
  296    forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
  297        (
  298            abortSilentExit(Communication_Thread_ID, CommunicationException),
  299            debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
  300            abortSilentExit(Goal_Thread_ID, Goal_Exception),
  301            debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
  302        )).
  303
  304
  305start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  306    (   Run_Server_On_Thread
  307    ->  (   thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
  308                                            at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  309                                                     detach_if_expected(Server_Thread_ID)
  310                                                    ))
  311                                          ]),
  312            debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
  313        )
  314    ;   (   Server_Goal,
  315            delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  316            debug(mqi(protocol), "Halting.", [])
  317        )
  318    ).
  319
  320
  321% Unix domain sockets create a file that needs to be cleaned up
  322% If mqi generated it, there is also a directory that needs to be cleaned up
  323%   that will only contain that file
  324delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  325    (   nonvar(Unix_Domain_Socket_Path)
  326    ->  catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
  327    ;   (   nonvar(Unix_Domain_Socket_Path_And_File)
  328        ->  catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
  329        ;   true
  330        )
  331    ).
  332
  333:- if(current_predicate(unix_domain_socket/1)).  334    optional_unix_domain_socket(Socket) :-
  335        unix_domain_socket(Socket).
  336:- else.  337    optional_unix_domain_socket(_).
  338:- endif.  339
  340% Always bind only to localhost for security reasons
  341% Delete the socket file in case it is already around so that the same name can be reused
  342bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
  343    (   nonvar(Unix_Domain_Socket_Path_And_File)
  344    ->  debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
  345        optional_unix_domain_socket(Socket),
  346        catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
  347        tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
  348        Client_Address = Unix_Domain_Socket_Path_And_File
  349    ;   (   tcp_socket(Socket),
  350            tcp_setopt(Socket, reuseaddr),
  351            tcp_bind(Socket, '127.0.0.1':Port),
  352            debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
  353            Client_Address = Port
  354        )
  355    ),
  356    assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
  357
  358% Communicates the used port and password to the client via STDOUT so the client
  359% language library can use them to connect
  360send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
  361    (   Write_Connection_Values
  362    ->  (   (  var(Unix_Domain_Socket_Path_And_File)
  363            ->  format(Stream, "~d\n", [Port])
  364            ;   format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
  365            ),
  366            format(Stream, "~w\n", [Password]),
  367            flush_output(Stream)
  368        )
  369    ;   true
  370    ).
  371
  372
  373% Server thread worker predicate
  374% Listen for connections and create a connection for each in its own communication thread
  375% Uses tail recursion to ensure the stack doesn't grow
  376server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  377    debug(mqi(protocol), "Listening on address: ~w", [Address]),
  378    tcp_listen(Socket, Connection_Count),
  379    tcp_open_socket(Socket, AcceptFd, _),
  380    create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
  381    server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
  382
  383
  384% Wait for the next connection and create communication and goal threads to support it
  385% Create known IDs for the threads so we can pass them along before the threads are created
  386% First create the goal thread to avoid a race condition where the communication
  387% thread tries to queue a goal before it is created
  388create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  389    debug(mqi(protocol), "Waiting for client connection...", []),
  390    tcp_accept(AcceptFd, Socket, _Peer),
  391    debug(mqi(protocol), "Client connected", []),
  392    gensym('conn', Connection_Base),
  393    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
  394    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
  395    mutex_create(Goal_Alias, [alias(Goal_Alias)]),
  396    assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
  397    thread_create(goal_thread(Thread_Alias),
  398        _,
  399        [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
  400    thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
  401        _,
  402        [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
  403
  404
  405% The worker predicate for the Goal thread.
  406% Looks for a message from the connection thread, processes it, then recurses.
  407%
  408% Goals always run in the same thread in case the user is setting thread local information.
  409% For each answer to the user's query (including an exception), the goal thread will queue a message
  410% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once
  411% Tail recurse to avoid growing the stack
  412goal_thread(Respond_To_Thread_ID) :-
  413    thread_self(Self_ID),
  414    throw_if_testing(Self_ID),
  415    thread_get_message(Self_ID, goal(Goal, Binding_List, Query_Timeout, Find_All)),
  416    debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Goal]),
  417    (   Find_All
  418    ->  One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
  419    ;
  420        One_Answer_Goal = ( @(user:Goal, user),
  421                            Answers = [Binding_List],
  422                            send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
  423                          )
  424    ),
  425    All_Answers_Goal = run_cancellable_goal(Self_ID, findall(Answers, One_Answer_Goal, [Find_All_Answers | _])),
  426    (   Query_Timeout == -1
  427    ->  catch(All_Answers_Goal, Top_Exception, true)
  428    ;   catch(call_with_time_limit(Query_Timeout, All_Answers_Goal), Top_Exception, true)
  429    ),
  430    (
  431        var(Top_Exception)
  432    ->  (
  433            Find_All
  434        ->
  435            send_next_result(Respond_To_Thread_ID, Find_All_Answers, _, Find_All)
  436        ;
  437            send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
  438        )
  439    ;
  440        send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
  441    ),
  442    goal_thread(Respond_To_Thread_ID).
  443
  444
  445% Used only for testing unhandled exceptions outside of the "safe zone"
  446throw_if_testing(Self_ID) :-
  447    (   thread_peek_message(Self_ID, testThrow(Test_Exception))
  448    ->  (   debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
  449            throw(Test_Exception)
  450        )
  451    ;   true
  452    ).
  453
  454
  455% run_cancellable_goal handles the communication
  456% to ensure the cancel exception from the communication thread
  457% is injected at a place we are prepared to handle in the goal_thread
  458% Before the goal is run, sets a fact to indicate we are in the "safe to cancel"
  459% zone for the communication thread.
  460% Then it doesn't exit this "safe to cancel" zone if the
  461% communication thread is about to cancel
  462run_cancellable_goal(Mutex_ID, Goal) :-
  463    thread_self(Self_ID),
  464    setup_call_cleanup(
  465        assert(safe_to_cancel(Self_ID), Assertion),
  466        Goal,
  467        with_mutex(Mutex_ID, erase(Assertion))
  468    ).
  469
  470
  471% Worker predicate for the communication thread.
  472% Processes messages and sends goals to the goal thread.
  473% Continues processing messages until communication_thread_listen() throws or ends with true/false
  474%
  475% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal
  476%   thread if there is a communication failure.
  477%
  478% True means user explicitly called close or there was an exception
  479%   only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure
  480%   otherwise just exit the session
  481communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
  482    thread_self(Self_ID),
  483    (   (
  484            catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
  485            debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
  486            abortSilentExit(Goal_Thread_ID, _),
  487            retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
  488        )
  489    ->  Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
  490    ;   Halt = true
  491    ),
  492    (   Halt
  493    ->  (   debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
  494            quit(_)
  495        )
  496    ;   (   debug(mqi(protocol), "Ending session ~w", [Self_ID]),
  497            catch(tcp_close_socket(Socket), error(_, _), true)
  498        )
  499    ).
  500
  501
  502% Open socket and begin processing the streams for a connection using the Encoding if the password matches
  503% true: session ended
  504% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly)
  505% false: halt
  506communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
  507    tcp_open_socket(Socket, Read_Stream, Write_Stream),
  508    thread_self(Communication_Thread_ID),
  509    assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
  510    set_stream(Read_Stream, encoding(Encoding)),
  511    set_stream(Write_Stream, encoding(Encoding)),
  512    read_message(Read_Stream, Sent_Password),
  513    (   Password == Sent_Password
  514    ->  (   debug(mqi(protocol), "Password matched.", []),
  515            thread_self(Self_ID),
  516            reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID)]]))
  517        )
  518    ;   (   debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
  519            reply_error(Write_Stream, password_mismatch),
  520            throw(password_mismatch)
  521        )
  522    ),
  523    process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
  524    debug(mqi(protocol), "Session finished.", []).
  525
  526
  527% process_mqi_messages implements the main interface to the Machine Query Interface.
  528% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream,
  529% until the connection fails or a `quit` or `close` message is sent.
  530%
  531% Read_Stream and Write_Stream can be any valid stream using any encoding.
  532%
  533% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate
  534%
  535% uses tail recursion to ensure the stack doesn't grow
  536%
  537% true: indicates we should terminate the session (clean termination)
  538% false: indicates we should exit the process if running in embedded mode
  539% exception: indicates we should terminate the session (communication failure termination) or
  540%    thread was asked to halt
  541process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
  542    process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
  543    (   Command == close
  544    ->  (   debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
  545            true
  546        )
  547    ;   (   Command == quit
  548        ->  (   debug(mqi(protocol), "Command: quit.", []),
  549                false
  550            )
  551        ;
  552            process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
  553        )
  554    ).
  555
  556% process_mqi_message manages the protocol for the connection: receive message, parse it, process it.
  557% - Reads a single message from Read_Stream.
  558% - Processes it and issues a response on Write_Stream.
  559% - The message will be unified with Command to allow the caller to handle it.
  560%
  561% Read_Stream and Write_Stream can be any valid stream using any encoding.
  562%
  563% True if the message understood. A response will always be sent.
  564% False if the message was malformed.
  565% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit.
  566%
  567% state_* predicates manage the state transitions of the protocol
  568% They only bubble up exceptions if there is a communication failure
  569%
  570% state_process_command will never return false
  571% since errors should be sent to the client
  572% It can throw if there are communication failures, though.
  573process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
  574    debug(mqi(protocol), "Waiting for next message ...", []),
  575    (   state_receive_raw_message(Read_Stream, Message_String)
  576    ->  (   state_parse_command(Write_Stream, Message_String, Command, Binding_List)
  577        ->  state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
  578        ;   true
  579        )
  580    ;   false
  581    ).
  582
  583
  584% state_receive_raw_message: receive a raw message, which is simply a string
  585%   true: valid message received
  586%   false: invalid message format
  587%   exception: communication failure OR thread asked to exit
  588state_receive_raw_message(Read, Command_String) :-
  589    read_message(Read, Command_String),
  590    debug(mqi(protocol), "Valid message: ~w", [Command_String]).
  591
  592
  593% state_parse_command: attempt to parse the message string into a valid command
  594%
  595% Use read_term_from_atom instead of read_term(stream) so that we don't hang
  596% indefinitely if the caller didn't properly finish the term
  597% parse in the context of module 'user' to properly bind operators, do term expansion, etc
  598%
  599%   true: command could be parsed
  600%   false: command cannot be parsed.  An error is sent to the client in this case
  601%   exception: communication failure on sending a reply
  602state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
  603    (   catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
  604    ->  (   var(Parse_Exception)
  605        ->  debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
  606        ;   (   reply_error(Write_Stream, Parse_Exception),
  607                fail
  608            )
  609        )
  610    ;   (   reply_error(Write_Stream, error(couldNotParseCommand, _)),
  611            fail
  612        )
  613    ).
  614
  615
  616% state_process_command(): execute the requested Command
  617%
  618% First wait until we have removed all results from any previous query.
  619% If query_in_progress(Goal_Thread_ID) exists then there is at least one
  620% more result to drain, by definition. Because the predicate is
  621% deleted by get_next_result in the communication thread when the last result is drained
  622%
  623%   true: if the command itself succeeded, failed or threw an exception.
  624%         In that case, the outcome is sent to the client
  625%   exception: only communication or thread failures are allowed to bubble up
  626% See mqi(Options) documentation
  627state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
  628    !,
  629    debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
  630    repeat_until_false((
  631            query_in_progress(Goal_Thread_ID),
  632            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  633            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  634            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  635            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  636        )),
  637    debug(mqi(protocol), "All previous results drained", []),
  638    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
  639    heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
  640    reply_with_result(Goal_Thread_ID, Stream, Answers).
  641
  642
  643% See mqi(Options) documentation for documentation
  644% See notes in run(Goal, Timeout) re: draining previous query
  645state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
  646    !,
  647    debug(mqi(protocol), "Command: run_async/1.", []),
  648    debug(mqi(query),  "   Goal: ~w", [Goal]),
  649    repeat_until_false((
  650            query_in_progress(Goal_Thread_ID),
  651            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  652            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  653            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  654            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  655            )),
  656    debug(mqi(protocol), "All previous results drained", []),
  657    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
  658    reply(Stream, true([[]])).
  659
  660
  661% See mqi(Options) documentation for documentation
  662state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
  663    !,
  664    debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
  665    (   once((var(Timeout) ; Timeout == -1))
  666    ->  Options = []
  667    ;   Options = [timeout(Timeout)]
  668    ),
  669    (   query_in_progress(Goal_Thread_ID)
  670    ->  (   (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  671                get_next_result(Goal_Thread_ID, Stream, Options, Result)
  672            )
  673        ->  reply_with_result(Goal_Thread_ID, Stream, Result)
  674        ;   reply_error(Stream, result_not_available)
  675        )
  676   ;    (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  677            reply_error(Stream, no_query)
  678        )
  679   ).
  680
  681
  682% See mqi(Options) documentation for documentation
  683% To ensure the goal thread is in a place it is safe to cancel,
  684% we lock a mutex first that the goal thread checks before exiting
  685% the "safe to cancel" zone.
  686% It is not in the safe zone: it either finished
  687% or was never running.
  688state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
  689    !,
  690    debug(mqi(protocol), "Command: cancel_async/0.", []),
  691    with_mutex(Goal_Thread_ID, (
  692        (   safe_to_cancel(Goal_Thread_ID)
  693        ->  (   thread_signal(Goal_Thread_ID, throw(cancel_goal)),
  694                reply(Stream, true([[]]))
  695            )
  696        ;   (   query_in_progress(Goal_Thread_ID)
  697            ->  (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  698                    reply(Stream, true([[]]))
  699                )
  700            ;   (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  701                    reply_error(Stream, no_query)
  702                )
  703            )
  704        )
  705    )).
  706
  707
  708% Used for testing how the system behaves when the goal thread is killed unexpectedly
  709% Needs to run a bogus command `run(true, -1)` to
  710% get the goal thread to process the exception
  711state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
  712    !,
  713    debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
  714    thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
  715    state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
  716
  717
  718state_process_command(Stream, _, _, close, _) :-
  719    !,
  720    reply(Stream, true([[]])).
  721
  722
  723state_process_command(Stream, _, _, quit, _) :-
  724    !,
  725    reply(Stream, true([[]])).
  726
  727
  728%  Send an exception if the command is not known
  729state_process_command(Stream, _, _, Command, _) :-
  730    debug(mqi(protocol), "Unknown command ~w", [Command]),
  731    reply_error(Stream, unknownCommand).
  732
  733
  734% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message
  735% every so often until it arrives to detect if the socket is broken.
  736% Throws if If the heartbeat failed which will
  737% and then shutdown the communication thread
  738% Tail recurse to not grow the stack
  739heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
  740    (   get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
  741    ->  debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
  742    ;   (   debug(mqi(protocol), "heartbeat...", []),
  743            write_heartbeat(Stream),
  744            heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
  745        )
  746    ).
  747
  748
  749% True if write succeeded, otherwise throws as that
  750% indicates that heartbeat failed because the other
  751% end of the pipe terminated
  752write_heartbeat(Stream) :-
  753    put_char(Stream, '.'),
  754    flush_output(Stream).
  755
  756
  757% Send a goal to the goal thread in its queue
  758%
  759% Remember that we are now running a query using assert.
  760%   This will be retracted once all the answers have been drained.
  761%
  762% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond,
  763%   the client could hang so catch and give them a good message before propagating
  764%   the exception
  765send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
  766    (   var(Timeout)
  767    ->  Timeout = Default_Timeout
  768    ;   true
  769    ),
  770    (   var(Binding_List)
  771    ->  Binding_List = []
  772    ;   true
  773    ),
  774    debug(mqi(query),  "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
  775    assert(query_in_progress(Goal_Thread_ID)),
  776    catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
  777    (   var(Send_Message_Exception)
  778    ->  true
  779    ;   (   reply_error(Stream, connection_failed),
  780            throw(Send_Message_Exception)
  781        )
  782    ).
  783
  784
  785% Send a result from the goal thread to the communication thread in its queue
  786send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
  787    (   var(Exception_In_Goal)
  788    ->  (   (   debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
  789                Answer == []
  790            )
  791        ->  thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
  792        ;   thread_send_message(Respond_To_Thread_ID, result(true(Answer), Find_All))
  793        )
  794    ;   (   debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
  795            thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
  796        )
  797    ).
  798
  799
  800% Gets the next result from the goal thread in the communication thread queue,
  801% and retracts query_in_progress/1 when the last result has been sent.
  802% Find_All == true only returns one message, so delete query_in_progress
  803% No matter what it is
  804% \+ Find_All: There may be more than one result. The first one we hit with any exception
  805% (note that no_more_results is also returned as an exception) means we are done
  806get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
  807    (   thread_property(Goal_Thread_ID, status(running))
  808    ->  true
  809    ;   (   reply_error(Stream, connection_failed),
  810            throw(connection_failed)
  811        )
  812    ),
  813    thread_self(Self_ID),
  814    thread_get_message(Self_ID, result(Answers, Find_All), Options),
  815    (   Find_All
  816    ->  (   debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
  817            retractall(query_in_progress(Goal_Thread_ID))
  818        )
  819    ;   (   Answers = error(_)
  820        ->  (   debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
  821                retractall(query_in_progress(Goal_Thread_ID))
  822            )
  823        ;   true
  824        )
  825    ).
  826
  827
  828% reply_with_result predicates are used to consistently return
  829% answers for a query from either run() or run_async()
  830reply_with_result(_, Stream, error(Error)) :-
  831    !,
  832    reply_error(Stream, Error).
  833reply_with_result(_, Stream, Result) :-
  834    !,
  835    reply(Stream, Result).
  836
  837
  838% Reply with a normal term
  839% Convert term to an actual JSON string
  840reply(Stream, Term) :-
  841    debug(mqi(query), "Responding with Term: ~w", [Term]),
  842    term_to_json_string(Term, Json_String),
  843    write_message(Stream, Json_String).
  844
  845
  846% Special handling for exceptions since they can have parts that are not
  847% "serializable". Ensures they they are always returned in an exception/1 term
  848reply_error(Stream, Error_Term) :-
  849    (   error(Error_Value, _) = Error_Term
  850    ->  Response = exception(Error_Value)
  851    ;   (   atom(Error_Term)
  852        ->
  853            Response = exception(Error_Term)
  854        ;   (   compound_name_arity(Error_Term, Name, _),
  855                Response = exception(Name)
  856            )
  857        )
  858    ),
  859    reply(Stream, Response).
  860
  861
  862% Send and receive messages are simply strings preceded by their length + ".\n"
  863% i.e. "<stringlength>.\n<string>"
  864% The desired encoding must be set on the Stream before calling this predicate
  865
  866
  867% Writes the next message.
  868% Throws if there is an unexpected exception
  869write_message(Stream, String) :-
  870    write_string_length(Stream, String),
  871    write(Stream, String),
  872    flush_output(Stream).
  873
  874
  875% Reads the next message.
  876% Throws if there is an unexpected exception or thread has been requested to quit
  877% the length passed must match the actual number of bytes in the stream
  878% in whatever encoding is being used
  879read_message(Stream, String) :-
  880    read_string_length(Stream, Length),
  881    read_string(Stream, Length, String).
  882
  883
  884% Terminate with '.\n' so we know that's the end of the count
  885write_string_length(Stream, String) :-
  886    stream_property(Stream, encoding(Encoding)),
  887    string_encoding_length(String, Encoding, Length),
  888    format(Stream, "~d.\n", [Length]).
  889
  890
  891% Note: read_term requires ".\n" after the length
  892% ... but does not consume the "\n"
  893read_string_length(Stream, Length) :-
  894    read_term(Stream, Length, []),
  895    get_char(Stream, _).
  896
  897
  898% converts a string to Codes using Encoding
  899string_encoding_length(String, Encoding, Length) :-
  900    setup_call_cleanup(
  901        open_null_stream(Out),
  902        (   set_stream(Out, encoding(Encoding)),
  903            write(Out, String),
  904            byte_count(Out, Length)
  905        ),
  906        close(Out)).
  907
  908
  909% Convert Prolog Term to a Prolog JSON term
  910% Add a final \n so that using netcat to debug works well
  911term_to_json_string(Term, Json_String) :-
  912    term_to_json(Term, Json),
  913    with_output_to(string(Json_String),
  914        (   current_output(Stream),
  915            json_write(Stream, Json),
  916            put(Stream, '\n')
  917        )).
  918
  919
  920% Execute the goal as once() without binding any variables
  921% and keep executing it until it returns false (or throws)
  922repeat_until_false(Goal) :-
  923    (\+ (\+ Goal)), !, repeat_until_false(Goal).
  924repeat_until_false(_).
  925
  926
  927% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards
  928%
  929% If the thread is alive OR it was already aborted (expected cases) then attempt to join
  930%   the thread so that no warnings are sent to the console. Other cases leave the thread for debugging.
  931% There are some fringe cases (like calling external code)
  932%   where the call might not return for a long time.  Do a timeout for those cases.
  933abortSilentExit(Thread_ID, Exception) :-
  934    catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
  935    debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
  936% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining
  937% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called.
  938%    ,
  939%    (   once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true)))
  940%    ->  (   catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true),
  941%            debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)])
  942%        )
  943%    ;   true
  944%    ).
  945
  946
  947% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards
  948% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards
  949%
  950% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted
  951% thread_property/2 record *and* print a message to the console. To work around this,
  952% the goal thread is always aborted by the communication thread using abortSilentExit.
  953detach_if_expected(Thread_ID) :-
  954    thread_property(Thread_ID, status(Status)),
  955    debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
  956    (   once((Status = true ; Status = false))
  957    ->  (   debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
  958            thread_detach(Thread_ID)
  959        )
  960    ;   true
  961    ).
  962
  963
  964write_output_to_file(File) :-
  965    debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
  966    open(File, write, Stream, [buffer(false)]),
  967    set_prolog_IO(user_input, Stream, Stream).
  968
  969
  970% Creates a Unix Domain Socket file in a secured directory.
  971% Throws if the directory or file cannot be created in /tmp for any reason
  972% Requirements for this file are:
  973%    - The Prolog process will attempt to create and, if Prolog exits cleanly,
  974%           delete this file when the server closes.  This means the directory
  975%           must have the appropriate permissions to allow the Prolog process
  976%           to do so.
  977%    - For security reasons, the filename should not be predictable and the
  978%           directory it is contained in should have permissions set so that files
  979%           created are only accessible to the current user.
  980%    - The path must be below 92 *bytes* long (including null terminator) to
  981%           be portable according to the Linux documentation
  982%
  983% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small
  984% Set 700 (rwx------)  permission so it is only accessible by current user
  985% Create a secure tmp file in the new directory
  986% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex.
  987% Close the stream so sockets can use it
  988unix_domain_socket_path(Created_Directory, File_Path) :-
  989    tmp_file(udsock, Created_Directory),
  990    make_directory(Created_Directory),
  991    catch(  chmod(Created_Directory, urwx),
  992            Exception,
  993            (   catch(delete_directory(Created_Directory), error(_, _), true),
  994                throw(Exception)
  995            )
  996    ),
  997    setup_call_cleanup( (   current_prolog_flag(tmp_dir, Save_Tmp_Dir),
  998                            set_prolog_flag(tmp_dir, Created_Directory)
  999                        ),
 1000                        tmp_file_stream(File_Path, Stream, []),
 1001                        set_prolog_flag(tmp_dir, Save_Tmp_Dir)
 1002                      ),
 1003    close(Stream).
 1004
 1005
 1006% Helper for installing the mqi.pl file to the right
 1007% library directory.
 1008% Call using swipl -s mqi.pl -g "mqi:install_to_library('mqi.pl')" -t halt
 1009install_to_library(File) :-
 1010    once(find_library(Path)),
 1011    copy_file(File, Path),
 1012    make.
 1013
 1014
 1015% Find the base library path, i.e. the one that ends in
 1016% "library/"
 1017find_library(Path) :-
 1018    file_alias_path(library, Path),
 1019    atomic_list_concat(Parts, '/', Path),
 1020    reverse(Parts, Parts_Reverse),
 1021    nth0(0, Parts_Reverse, ''),
 1022    nth0(1, Parts_Reverse, Library),
 1023    string_lower(Library, 'library')