View source with raw comments or as raw
    1/*  Prolog Machine Query Interface
    2    Author:        Eric Zinda
    3    E-mail:        ericz@inductorsoftware.com
    4    WWW:           http://www.inductorsoftware.com
    5    Copyright (c)  2021, Eric Zinda
    6    All rights reserved.
    7
    8        Redistribution and use in source and binary forms, with or without
    9    modification, are permitted provided that the following conditions
   10    are met:
   11
   12    1. Redistributions of source code must retain the above copyright
   13       notice, this list of conditions and the following disclaimer.
   14
   15    2. Redistributions in binary form must reproduce the above copyright
   16       notice, this list of conditions and the following disclaimer in
   17       the documentation and/or other materials provided with the
   18       distribution.
   19
   20    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   21    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   22    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   23    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   24    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   25    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   26    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   27    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   28    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   29    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   30    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   31    POSSIBILITY OF SUCH DAMAGE.
   32*/
   33
   34:- module(mqi,
   35          [ mqi_start/0,
   36            mqi_start/1,                % +Options
   37            mqi_stop/1,                 % ?Thread
   38            mqi_version/2               % ?Major_Version, ?Minor_Version
   39          ]).
 mqi_start(+Options:list) is semidet
Starts a Prolog Machine Query Interface ('MQI') using Options. The MQI is normally started automatically by a library built for a particular programming language such as the swiplserver Python library, but starting manually can be useful when debugging Prolog code in some scenarios. See the documentation on "Standalone Mode" for more information.

Once started, the MQI listens for TCP/IP or Unix Domain Socket connections and authenticates them using the password provided (or created depending on options) before processing any messages. The messages processed by the MQI are described below.

For debugging, the server outputs traces using the debug/3 predicate so that the server operation can be observed by using the debug/1 predicate. Run the following commands to see them:

  110:- use_module(library(socket)).  111:- use_module(library(http/json)).  112:- use_module(library(http/json_convert)).  113:- use_module(library(http/http_stream)).  114:- use_module(library(option)).  115:- use_module(library(term_to_json)).  116:- use_module(library(debug)).  117:- use_module(library(filesex)).  118:- use_module(library(gensym)).  119:- use_module(library(lists)).  120:- use_module(library(main)).  121:- use_module(library(make)).  122:- use_module(library(prolog_source)).  123:- use_module(library(time)).  124:- use_module(library(uuid)).  125
  126% One for every Machine Query Interface running
  127:- dynamic(mqi_thread/3).  128
  129% One for every active connection
  130:- dynamic(mqi_worker_threads/3).  131:- dynamic(mqi_socket/5).  132
  133% Indicates that a query is in progress on the goal thread or hasn't had its results drained
  134% Deleted once the last result from the queue has been drained
  135% Only deleted by the communication thread to avoid race conditions
  136:- dynamic(query_in_progress/1).  137
  138% Indicates to the communication thread that we are in a place
  139% that can be cancelled
  140:- dynamic(safe_to_cancel/1).
 mqi_version(?Major_Version, ?Minor_Version) is det
Provides the major and minor version number of the protocol used by the MQI. The protocol includes the message format and the messages that can be sent and received from the MQI.

Note that the initial version of the MQI did not have a version predicate so The proper way for callers to check the version is:

use_module(library(mqi)), ( current_predicate(mqi_version/2) -> mqi_version(Major_Version, Minor_Version) ; Major_Version = 0, Minor_Version = 0 )

Major versions are increased when there is a change to the protocol that will likely break clients written to the previous version. Minor versions are increased when there is new functionality that will not break clients written to the old version

This allows a client written to MQI version 'Client_Major_Version.Client_Minor_Version' to check for non-breaking compatibility like this:

Client_Major_Version = MQI_Major_Version and Client_Minor_Version <= MQI_Minor_Version

Breaking changes (i.e. Major version increments) should be very rare as the goal is to have the broadest adoption possible.

Protocol Version History: 0.0 -> First published version. Had a protocol bug that required messages sent to MQI to count Unicode code points instead of bytes for the message header. 1.0 -> Breaking change: Fixed protocol bug so that it properly accepted byte count instead of Unicode code point count in the message header for messages sent to MQI.

  174mqi_version(1, 0).
  175
  176
  177% Password is carefully constructed to be a string (not an atom) so that it is not
  178% globally visible
  179% Add ".\n" to the password since it will be added by the message when received
  180mqi_start(Options) :-
  181    Encoding = utf8,
  182    option(pending_connections(Connection_Count), Options, 5),
  183    option(query_timeout(Query_Timeout), Options, -1),
  184    option(port(Port), Options, _),
  185    option(run_server_on_thread(Run_Server_On_Thread), Options, true),
  186    option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
  187    option(write_connection_values(Write_Connection_Values), Options, false),
  188    option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
  189    (   (   memberchk(unix_domain_socket(_), Options),
  190            var(Unix_Domain_Socket_Path_And_File)
  191        )
  192    ->  unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
  193    ;   true
  194    ),
  195    option(server_thread(Server_Thread_ID), Options, _),
  196    (   var(Server_Thread_ID)
  197    ->  gensym(mqi, Server_Thread_ID)
  198    ;   true
  199    ),
  200    option(password(Password), Options, _),
  201    (   var(Password)
  202    ->  (   current_prolog_flag(bounded, false)
  203        ->  uuid(UUID, [format(integer)])
  204        ;   UUID is random(1<<62)
  205        ),
  206        format(string(Password), '~d', [UUID])
  207    ;   true
  208    ),
  209    string_concat(Password, '.\n', Final_Password),
  210    bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
  211    send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
  212    option(write_output_to_file(File), Options, _),
  213    (   var(File)
  214    ->  true
  215    ;   write_output_to_file(File)
  216    ),
  217    Server_Goal = (
  218                    catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
  219                    debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
  220                 ),
  221    start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
  222
  223opt_type(port,                      port,                      natural).
  224opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
  225opt_type(unix_domain_socket,        unix_domain_socket,        file(write)).
  226opt_type(password,                  password,                  string).
  227opt_type(pending_connections,       pending_connections,       nonneg).
  228opt_type(query_timeout,             query_timeout,             float).
  229opt_type(run_server_on_thread,      run_server_on_thread,      boolean).
  230opt_type(exit_main_on_failure,      exit_main_on_failure,      boolean).
  231opt_type(write_connection_values,   write_connection_values,   boolean).
  232opt_type(write_output_to_file,      write_output_to_file,      file(write)).
  233
  234opt_help(port,                      "TCP/IP port for clients to connect to").
  235opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
  236opt_help(unix_domain_socket,        "File path for the Unix domain socket").
  237opt_help(password,                  "Connection password").
  238opt_help(pending_connections,       "Max number of queued connections (5)").
  239opt_help(query_timeout,             "Max query runtime in seconds (default infinite)").
  240opt_help(run_server_on_thread,      "Run server in a background thread (true)").
  241opt_help(exit_main_on_failure,      "Exit the process on a failure").
  242opt_help(write_connection_values,   "Print info for clients to connect").
  243opt_help(write_output_to_file,      "Write stdout and stderr to file").
 mqi_start is semidet
Main entry point for running the Machine Query Interface in "Embedded Mode" and designed to be called from the command line. Embedded Mode is used when launching the Machine Query Interface as an embedded part of another language (e.g. Python). Calling mqi_start/0 from Prolog interactively is not recommended as it depends on Prolog exiting to stop the MQI, instead use mqi_start/1 for interactive use.

To launch embedded mode:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true

This will start SWI Prolog and invoke the mqi_start/0 predicate and exit the process when that predicate stops. Any command line arguments after the standalone -- will be passed as Options. These are the same Options that mqi_start/1 accepts and are passed to it directly. Some options are expressed differently due to command line limitations, see mqi_start/1 Options for more information.

Any Option values that cause issues during command line parsing (such as spaces) should be passed with "" like this:

swipl --quiet -g mqi_start -t halt -- --write_connection_values=true \
                                      --password="HGJ SOWLWW WNDSJD"

For help on commandline options run

swipl -g mqi_start -- --help
  283% Turn off int signal when running in embedded mode so the client language
  284% debugger signal doesn't put Prolog into debug mode
  285% run_server_on_thread must be missing or true (the default) so we can exit
  286% properly
  287% create_unix_domain_socket=true/false is only used as a command line argument
  288% since it doesn't seem possible to pass create_unix_domain_socket=_ on the command line
  289% and have it interpreted as a variable.
  290mqi_start :-
  291    current_prolog_flag(argv, Argv),
  292    argv_options(Argv, _Args, Options),
  293    merge_options(Options, [exit_main_on_failure(true)], Options1),
  294    select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
  295    (   Create_Unix_Domain_Socket == true
  296    ->  merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
  297    ;   FinalOptions = Options2
  298    ),
  299    option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
  300    (   Run_Server_On_Thread == true
  301    ->  true
  302    ;   throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
  303    ),
  304    mqi_start(FinalOptions),
  305    on_signal(int, _, quit),
  306    thread_get_message(quit_mqi).
  307
  308
  309quit(_) :-
  310    thread_send_message(main, quit_mqi).
 mqi_stop(?Server_Thread_ID:atom) is det
If Server_Thread_ID is a variable, stops all Machine Query Interfaces and associated threads. If Server_Thread_ID is an atom, then only the MQI with that Server_Thread_ID is stopped. Server_Thread_ID can be provided or retrieved using Options in mqi_start/1.

Always succeeds.

  319% tcp_close_socket(Socket) will shut down the server thread cleanly so the socket is released and can be used again in the same session
  320% Closes down any pending connections using abort even if there were no matching server threads since the server thread could have died.
  321% At this point only threads associated with live connections (or potentially a goal thread that hasn't detected its missing communication thread)
  322% should be left so seeing abort warning messages in the console seems OK
  323mqi_stop(Server_Thread_ID) :-
  324    % First shut down any matching servers to stop new connections
  325    forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
  326        (
  327            debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
  328            catch(tcp_close_socket(Socket), Socket_Exception, true),
  329            abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
  330            debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
  331        )),
  332    forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
  333        (
  334            abortSilentExit(Communication_Thread_ID, CommunicationException),
  335            debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
  336            abortSilentExit(Goal_Thread_ID, Goal_Exception),
  337            debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
  338        )).
  339
  340
  341start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  342    (   Run_Server_On_Thread
  343    ->  (   thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
  344                                            at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  345                                                     detach_if_expected(Server_Thread_ID)
  346                                                    ))
  347                                          ]),
  348            debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
  349        )
  350    ;   (   Server_Goal,
  351            delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
  352            debug(mqi(protocol), "Halting.", [])
  353        )
  354    ).
  355
  356
  357% Unix domain sockets create a file that needs to be cleaned up
  358% If mqi generated it, there is also a directory that needs to be cleaned up
  359%   that will only contain that file
  360delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
  361    (   nonvar(Unix_Domain_Socket_Path)
  362    ->  catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
  363    ;   (   nonvar(Unix_Domain_Socket_Path_And_File)
  364        ->  catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
  365        ;   true
  366        )
  367    ).
  368
  369:- if(current_predicate(unix_domain_socket/1)).  370    optional_unix_domain_socket(Socket) :-
  371        unix_domain_socket(Socket).
  372:- else.  373    optional_unix_domain_socket(_).
  374:- endif.  375
  376% Always bind only to localhost for security reasons
  377% Delete the socket file in case it is already around so that the same name can be reused
  378bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
  379    (   nonvar(Unix_Domain_Socket_Path_And_File)
  380    ->  debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
  381        optional_unix_domain_socket(Socket),
  382        catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
  383        tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
  384        Client_Address = Unix_Domain_Socket_Path_And_File
  385    ;   (   tcp_socket(Socket),
  386            tcp_setopt(Socket, reuseaddr),
  387            tcp_bind(Socket, '127.0.0.1':Port),
  388            debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
  389            Client_Address = Port
  390        )
  391    ),
  392    assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
  393
  394% Communicates the used port and password to the client via STDOUT so the client
  395% language library can use them to connect
  396send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
  397    (   Write_Connection_Values
  398    ->  (   (  var(Unix_Domain_Socket_Path_And_File)
  399            ->  format(Stream, "~d\n", [Port])
  400            ;   format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
  401            ),
  402            format(Stream, "~w\n", [Password]),
  403            flush_output(Stream)
  404        )
  405    ;   true
  406    ).
  407
  408
  409% Server thread worker predicate
  410% Listen for connections and create a connection for each in its own communication thread
  411% Uses tail recursion to ensure the stack doesn't grow
  412server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  413    debug(mqi(protocol), "Listening on address: ~w", [Address]),
  414    tcp_listen(Socket, Connection_Count),
  415    tcp_open_socket(Socket, AcceptFd, _),
  416    create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
  417    server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
  418
  419
  420% Wait for the next connection and create communication and goal threads to support it
  421% Create known IDs for the threads so we can pass them along before the threads are created
  422% First create the goal thread to avoid a race condition where the communication
  423% thread tries to queue a goal before it is created
  424create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
  425    debug(mqi(protocol), "Waiting for client connection...", []),
  426    tcp_accept(AcceptFd, Socket, _Peer),
  427    debug(mqi(protocol), "Client connected", []),
  428    gensym('conn', Connection_Base),
  429    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
  430    atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
  431    mutex_create(Goal_Alias, [alias(Goal_Alias)]),
  432    assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
  433    thread_create(goal_thread(Thread_Alias),
  434        _,
  435        [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
  436    thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
  437        _,
  438        [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
  439
  440
  441% The worker predicate for the Goal thread.
  442% Looks for a message from the connection thread, processes it, then recurses.
  443%
  444% Goals always run in the same thread in case the user is setting thread local information.
  445% For each answer to the user's query (including an exception), the goal thread will queue a message
  446% to the communication thread of the form result(Answer, Find_All), where Find_All == true if the user wants all answers at once
  447% Tail recurse to avoid growing the stack
  448goal_thread(Respond_To_Thread_ID) :-
  449    thread_self(Self_ID),
  450    throw_if_testing(Self_ID),
  451    thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)),
  452    expand_goal(Unexpanded_Goal, Goal),
  453    debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]),
  454    (   Find_All
  455    ->  One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
  456    ;   One_Answer_Goal = ( @(user:Goal, user),
  457                            Answers = [Binding_List],
  458                            send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
  459                          )
  460    ),
  461    All_Answers_Goal = run_cancellable_goal(Self_ID, findall(Answers, One_Answer_Goal, [Find_All_Answers | _])),
  462    (   Query_Timeout == -1
  463    ->  catch(All_Answers_Goal, Top_Exception, true)
  464    ;   catch(call_with_time_limit(Query_Timeout, All_Answers_Goal), Top_Exception, true)
  465    ),
  466    (   var(Top_Exception)
  467    ->  (   Find_All
  468        ->  send_next_result(Respond_To_Thread_ID, Find_All_Answers, _, Find_All)
  469        ;   send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
  470        )
  471    ;   send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
  472    ),
  473    goal_thread(Respond_To_Thread_ID).
  474
  475
  476% Used only for testing unhandled exceptions outside of the "safe zone"
  477throw_if_testing(Self_ID) :-
  478    (   thread_peek_message(Self_ID, testThrow(Test_Exception))
  479    ->  (   debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
  480            throw(Test_Exception)
  481        )
  482    ;   true
  483    ).
  484
  485
  486% run_cancellable_goal handles the communication
  487% to ensure the cancel exception from the communication thread
  488% is injected at a place we are prepared to handle in the goal_thread
  489% Before the goal is run, sets a fact to indicate we are in the "safe to cancel"
  490% zone for the communication thread.
  491% Then it doesn't exit this "safe to cancel" zone if the
  492% communication thread is about to cancel
  493run_cancellable_goal(Mutex_ID, Goal) :-
  494    thread_self(Self_ID),
  495    setup_call_cleanup(
  496        assert(safe_to_cancel(Self_ID), Assertion),
  497        Goal,
  498        with_mutex(Mutex_ID, erase(Assertion))
  499    ).
  500
  501
  502% Worker predicate for the communication thread.
  503% Processes messages and sends goals to the goal thread.
  504% Continues processing messages until communication_thread_listen() throws or ends with true/false
  505%
  506% Catches all exceptions from communication_thread_listen so that it can do an orderly shutdown of the goal
  507%   thread if there is a communication failure.
  508%
  509% True means user explicitly called close or there was an exception
  510%   only exit the main thread if there was an exception and we are supposed to Exit_Main_On_Failure
  511%   otherwise just exit the session
  512communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
  513    thread_self(Self_ID),
  514    (   (
  515            catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
  516            debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
  517            abortSilentExit(Goal_Thread_ID, _),
  518            retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
  519        )
  520    ->  Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
  521    ;   Halt = true
  522    ),
  523    (   Halt
  524    ->  (   debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
  525            quit(_)
  526        )
  527    ;   (   debug(mqi(protocol), "Ending session ~w", [Self_ID]),
  528            catch(tcp_close_socket(Socket), error(_, _), true)
  529        )
  530    ).
  531
  532
  533% Open socket and begin processing the streams for a connection using the Encoding if the password matches
  534% true: session ended
  535% exception: communication failure or an internal failure (like a thread threw or shutdown unexpectedly)
  536% false: halt
  537communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
  538    tcp_open_socket(Socket, Read_Stream, Write_Stream),
  539    thread_self(Communication_Thread_ID),
  540    assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
  541    set_stream(Read_Stream, encoding(Encoding)),
  542    set_stream(Write_Stream, encoding(Encoding)),
  543    read_message(Read_Stream, Sent_Password),
  544    (   Password == Sent_Password
  545    ->  (   debug(mqi(protocol), "Password matched.", []),
  546            thread_self(Self_ID),
  547            mqi_version(Major, Minor),
  548            reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]]))
  549        )
  550    ;   (   debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
  551            reply_error(Write_Stream, password_mismatch),
  552            throw(password_mismatch)
  553        )
  554    ),
  555    process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
  556    debug(mqi(protocol), "Session finished.", []).
  557
  558
  559% process_mqi_messages implements the main interface to the Machine Query Interface.
  560% Continuously reads a Machine Query Interface message from Read_Stream and writes a response to Write_Stream,
  561% until the connection fails or a `quit` or `close` message is sent.
  562%
  563% Read_Stream and Write_Stream can be any valid stream using any encoding.
  564%
  565% Goal_Thread_ID must be the threadID of a thread started on the goal_thread predicate
  566%
  567% uses tail recursion to ensure the stack doesn't grow
  568%
  569% true: indicates we should terminate the session (clean termination)
  570% false: indicates we should exit the process if running in embedded mode
  571% exception: indicates we should terminate the session (communication failure termination) or
  572%    thread was asked to halt
  573process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
  574    process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
  575    (   Command == close
  576    ->  (   debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
  577            true
  578        )
  579    ;   (   Command == quit
  580        ->  (   debug(mqi(protocol), "Command: quit.", []),
  581                false
  582            )
  583        ;
  584            process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
  585        )
  586    ).
  587
  588% process_mqi_message manages the protocol for the connection: receive message, parse it, process it.
  589% - Reads a single message from Read_Stream.
  590% - Processes it and issues a response on Write_Stream.
  591% - The message will be unified with Command to allow the caller to handle it.
  592%
  593% Read_Stream and Write_Stream can be any valid stream using any encoding.
  594%
  595% True if the message understood. A response will always be sent.
  596% False if the message was malformed.
  597% Exceptions will be thrown by the underlying stream if there are communication failures writing to Write_Stream or the thread was asked to exit.
  598%
  599% state_* predicates manage the state transitions of the protocol
  600% They only bubble up exceptions if there is a communication failure
  601%
  602% state_process_command will never return false
  603% since errors should be sent to the client
  604% It can throw if there are communication failures, though.
  605process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
  606    debug(mqi(protocol), "Waiting for next message ...", []),
  607    (   state_receive_raw_message(Read_Stream, Message_String)
  608    ->  (   state_parse_command(Write_Stream, Message_String, Command, Binding_List)
  609        ->  state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
  610        ;   true
  611        )
  612    ;   false
  613    ).
  614
  615
  616% state_receive_raw_message: receive a raw message, which is simply a string
  617%   true: valid message received
  618%   false: invalid message format
  619%   exception: communication failure OR thread asked to exit
  620state_receive_raw_message(Read, Command_String) :-
  621    read_message(Read, Command_String),
  622    debug(mqi(protocol), "Valid message: ~w", [Command_String]).
  623
  624
  625% state_parse_command: attempt to parse the message string into a valid command
  626%
  627% Use read_term_from_atom instead of read_term(stream) so that we don't hang
  628% indefinitely if the caller didn't properly finish the term
  629% parse in the context of module 'user' to properly bind operators, do term expansion, etc
  630%
  631%   true: command could be parsed
  632%   false: command cannot be parsed.  An error is sent to the client in this case
  633%   exception: communication failure on sending a reply
  634state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
  635    (   catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
  636    ->  (   var(Parse_Exception)
  637        ->  debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
  638        ;   (   reply_error(Write_Stream, Parse_Exception),
  639                fail
  640            )
  641        )
  642    ;   (   reply_error(Write_Stream, error(couldNotParseCommand, _)),
  643            fail
  644        )
  645    ).
  646
  647
  648% state_process_command(): execute the requested Command
  649%
  650% First wait until we have removed all results from any previous query.
  651% If query_in_progress(Goal_Thread_ID) exists then there is at least one
  652% more result to drain, by definition. Because the predicate is
  653% deleted by get_next_result in the communication thread when the last result is drained
  654%
  655%   true: if the command itself succeeded, failed or threw an exception.
  656%         In that case, the outcome is sent to the client
  657%   exception: only communication or thread failures are allowed to bubble up
  658% See mqi(Options) documentation
  659state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
  660    !,
  661    debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
  662    repeat_until_false((
  663            query_in_progress(Goal_Thread_ID),
  664            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  665            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  666            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  667            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  668        )),
  669    debug(mqi(protocol), "All previous results drained", []),
  670    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
  671    heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
  672    reply_with_result(Goal_Thread_ID, Stream, Answers).
  673
  674
  675% See mqi(Options) documentation for documentation
  676% See notes in run(Goal, Timeout) re: draining previous query
  677state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
  678    !,
  679    debug(mqi(protocol), "Command: run_async/1.", []),
  680    debug(mqi(query),  "   Goal: ~w", [Goal]),
  681    repeat_until_false((
  682            query_in_progress(Goal_Thread_ID),
  683            debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
  684            heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
  685            debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
  686            debug(mqi(query), "    Discarded answer: ~w", [Unused_Answer])
  687            )),
  688    debug(mqi(protocol), "All previous results drained", []),
  689    send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
  690    reply(Stream, true([[]])).
  691
  692
  693% See mqi(Options) documentation for documentation
  694state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
  695    !,
  696    debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
  697    (   once((var(Timeout) ; Timeout == -1))
  698    ->  Options = []
  699    ;   Options = [timeout(Timeout)]
  700    ),
  701    (   query_in_progress(Goal_Thread_ID)
  702    ->  (   (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  703                get_next_result(Goal_Thread_ID, Stream, Options, Result)
  704            )
  705        ->  reply_with_result(Goal_Thread_ID, Stream, Result)
  706        ;   reply_error(Stream, result_not_available)
  707        )
  708   ;    (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  709            reply_error(Stream, no_query)
  710        )
  711   ).
  712
  713
  714% See mqi(Options) documentation for documentation
  715% To ensure the goal thread is in a place it is safe to cancel,
  716% we lock a mutex first that the goal thread checks before exiting
  717% the "safe to cancel" zone.
  718% It is not in the safe zone: it either finished
  719% or was never running.
  720state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
  721    !,
  722    debug(mqi(protocol), "Command: cancel_async/0.", []),
  723    with_mutex(Goal_Thread_ID, (
  724        (   safe_to_cancel(Goal_Thread_ID)
  725        ->  (   thread_signal(Goal_Thread_ID, throw(cancel_goal)),
  726                reply(Stream, true([[]]))
  727            )
  728        ;   (   query_in_progress(Goal_Thread_ID)
  729            ->  (   debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
  730                    reply(Stream, true([[]]))
  731                )
  732            ;   (   debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
  733                    reply_error(Stream, no_query)
  734                )
  735            )
  736        )
  737    )).
  738
  739
  740% Used for testing how the system behaves when the goal thread is killed unexpectedly
  741% Needs to run a bogus command `run(true, -1)` to
  742% get the goal thread to process the exception
  743state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
  744    !,
  745    debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
  746    thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
  747    state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
  748
  749
  750state_process_command(Stream, _, _, close, _) :-
  751    !,
  752    reply(Stream, true([[]])).
  753
  754
  755state_process_command(Stream, _, _, quit, _) :-
  756    !,
  757    reply(Stream, true([[]])).
  758
  759
  760%  Send an exception if the command is not known
  761state_process_command(Stream, _, _, Command, _) :-
  762    debug(mqi(protocol), "Unknown command ~w", [Command]),
  763    reply_error(Stream, unknownCommand).
  764
  765
  766% Wait for a result (and put in Answers) from the goal thread, but send a heartbeat message
  767% every so often until it arrives to detect if the socket is broken.
  768% Throws if If the heartbeat failed which will
  769% and then shutdown the communication thread
  770% Tail recurse to not grow the stack
  771heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
  772    (   get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
  773    ->  debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
  774    ;   (   debug(mqi(protocol), "heartbeat...", []),
  775            write_heartbeat(Stream),
  776            heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
  777        )
  778    ).
  779
  780
  781% True if write succeeded, otherwise throws as that
  782% indicates that heartbeat failed because the other
  783% end of the pipe terminated
  784write_heartbeat(Stream) :-
  785    put_char(Stream, '.'),
  786    flush_output(Stream).
  787
  788
  789% Send a goal to the goal thread in its queue
  790%
  791% Remember that we are now running a query using assert.
  792%   This will be retracted once all the answers have been drained.
  793%
  794% If Goal_Thread_ID died, thread_send_message throws and, if we don't respond,
  795%   the client could hang so catch and give them a good message before propagating
  796%   the exception
  797send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
  798    (   var(Timeout)
  799    ->  Timeout = Default_Timeout
  800    ;   true
  801    ),
  802    (   var(Binding_List)
  803    ->  Binding_List = []
  804    ;   true
  805    ),
  806    debug(mqi(query),  "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
  807    assert(query_in_progress(Goal_Thread_ID)),
  808    catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
  809    (   var(Send_Message_Exception)
  810    ->  true
  811    ;   (   reply_error(Stream, connection_failed),
  812            throw(Send_Message_Exception)
  813        )
  814    ).
  815
  816
  817% Send a result from the goal thread to the communication thread in its queue
  818send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
  819    (   var(Exception_In_Goal)
  820    ->  (   (   debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
  821                Answer == []
  822            )
  823        ->  thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
  824        ;   thread_send_message(Respond_To_Thread_ID, result(true(Answer), Find_All))
  825        )
  826    ;   (   debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
  827            thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
  828        )
  829    ).
  830
  831
  832% Gets the next result from the goal thread in the communication thread queue,
  833% and retracts query_in_progress/1 when the last result has been sent.
  834% Find_All == true only returns one message, so delete query_in_progress
  835% No matter what it is
  836% \+ Find_All: There may be more than one result. The first one we hit with any exception
  837% (note that no_more_results is also returned as an exception) means we are done
  838get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
  839    (   thread_property(Goal_Thread_ID, status(running))
  840    ->  true
  841    ;   (   reply_error(Stream, connection_failed),
  842            throw(connection_failed)
  843        )
  844    ),
  845    thread_self(Self_ID),
  846    thread_get_message(Self_ID, result(Answers, Find_All), Options),
  847    (   Find_All
  848    ->  (   debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
  849            retractall(query_in_progress(Goal_Thread_ID))
  850        )
  851    ;   (   Answers = error(_)
  852        ->  (   debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
  853                retractall(query_in_progress(Goal_Thread_ID))
  854            )
  855        ;   true
  856        )
  857    ).
  858
  859
  860% reply_with_result predicates are used to consistently return
  861% answers for a query from either run() or run_async()
  862reply_with_result(_, Stream, error(Error)) :-
  863    !,
  864    reply_error(Stream, Error).
  865reply_with_result(_, Stream, Result) :-
  866    !,
  867    reply(Stream, Result).
  868
  869
  870% Reply with a normal term
  871% Convert term to an actual JSON string
  872reply(Stream, Term) :-
  873    debug(mqi(query), "Responding with Term: ~w", [Term]),
  874    term_to_json_string(Term, Json_String),
  875    write_message(Stream, Json_String).
  876
  877
  878% Special handling for exceptions since they can have parts that are not
  879% "serializable". Ensures they they are always returned in an exception/1 term
  880reply_error(Stream, Error_Term) :-
  881    debug(mqi(query), "Responding with exception: ~w", [Error_Term]),
  882    (   error(Error_Value, _) = Error_Term
  883    ->  Response = exception(Error_Value)
  884    ;   (   atom(Error_Term)
  885        ->
  886            Response = exception(Error_Term)
  887        ;   (   compound_name_arity(Error_Term, Name, _),
  888                Response = exception(Name)
  889            )
  890        )
  891    ),
  892    reply(Stream, Response).
  893
  894
  895% Send and receive messages are simply strings preceded by their length + ".\n"
  896% i.e. "<stringlength>.\n<string>"
  897% The desired encoding must be set on the Stream before calling this predicate
  898
  899
  900% Writes the next message.
  901% Throws if there is an unexpected exception
  902write_message(Stream, String) :-
  903    write_string_length(Stream, String),
  904    write(Stream, String),
  905    flush_output(Stream).
  906
  907
  908% Reads the next message.
  909% Throws if there is an unexpected exception or thread has been requested to quit
  910% the length passed must match the actual number of bytes in the stream
  911% in whatever encoding is being used
  912read_message(Stream, String) :-
  913    read_string_length(Stream, Length),
  914    stream_property(Stream, encoding(Encoding)),
  915    setup_call_cleanup(
  916         stream_range_open(Stream, Tmp, [size(Length)]),
  917         ( set_stream(Tmp, encoding(Encoding)),
  918           read_string(Tmp, _, String)
  919         ),
  920         close(Tmp)).
  921
  922
  923% Terminate with '.\n' so we know that's the end of the count
  924write_string_length(Stream, String) :-
  925    stream_property(Stream, encoding(Encoding)),
  926    string_encoding_length(String, Encoding, Length),
  927    format(Stream, "~d.\n", [Length]).
  928
  929
  930% Note: read_term requires ".\n" after the length
  931% ... but does not consume the "\n"
  932read_string_length(Stream, Length) :-
  933    read_term(Stream, Length, []),
  934    get_char(Stream, _).
  935
  936
  937% converts a string to Codes using Encoding
  938string_encoding_length(String, Encoding, Length) :-
  939    setup_call_cleanup(
  940        open_null_stream(Out),
  941        (   set_stream(Out, encoding(Encoding)),
  942            write(Out, String),
  943            byte_count(Out, Length)
  944        ),
  945        close(Out)).
  946
  947
  948% Convert Prolog Term to a Prolog JSON term
  949% Add a final \n so that using netcat to debug works well
  950term_to_json_string(Term, Json_String) :-
  951    term_to_json(Term, Json),
  952    with_output_to(string(Json_String),
  953        (   current_output(Stream),
  954            json_write(Stream, Json),
  955            put(Stream, '\n')
  956        )).
  957
  958
  959% Execute the goal as once() without binding any variables
  960% and keep executing it until it returns false (or throws)
  961repeat_until_false(Goal) :-
  962    (\+ (\+ Goal)), !, repeat_until_false(Goal).
  963repeat_until_false(_).
  964
  965
  966% Used to kill a thread in an "expected" way so it doesn't leave around traces in thread_property/2 afterwards
  967%
  968% If the thread is alive OR it was already aborted (expected cases) then attempt to join
  969%   the thread so that no warnings are sent to the console. Other cases leave the thread for debugging.
  970% There are some fringe cases (like calling external code)
  971%   where the call might not return for a long time.  Do a timeout for those cases.
  972abortSilentExit(Thread_ID, Exception) :-
  973    catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
  974    debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
  975% Workaround SWI Prolog bug: https://github.com/SWI-Prolog/swipl-devel/issues/852 by not joining
  976% The workaround just stops joining the aborted thread, so an inert record will be left if thread_property/2 is called.
  977%    ,
  978%    (   once((var(Exception) ; catch(thread_property(Thread_ID, status(exception('$aborted'))), error(_, _), true)))
  979%    ->  (   catch(call_with_time_limit(4, thread_join(Thread_ID)), error(JoinException1, JoinException2), true),
  980%            debug(mqi(protocol), "thread_join attempted because thread: ~w exit was expected, exception: ~w", [Thread_ID, error(JoinException1, JoinException2)])
  981%        )
  982%    ;   true
  983%    ).
  984
  985
  986% Detach a thread that exits with true or false so that it doesn't leave around a record in thread_property/2 afterwards
  987% Don't detach a thread if it exits because of an exception so we can debug using thread_property/2 afterwards
  988%
  989% However, `abort` is an expected exception but detaching a thread that aborts will leave an unwanted
  990% thread_property/2 record *and* print a message to the console. To work around this,
  991% the goal thread is always aborted by the communication thread using abortSilentExit.
  992detach_if_expected(Thread_ID) :-
  993    thread_property(Thread_ID, status(Status)),
  994    debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
  995    (   once((Status = true ; Status = false))
  996    ->  (   debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
  997            thread_detach(Thread_ID)
  998        )
  999    ;   true
 1000    ).
 1001
 1002
 1003write_output_to_file(File) :-
 1004    debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
 1005    open(File, write, Stream, [buffer(false)]),
 1006    set_prolog_IO(user_input, Stream, Stream).
 1007
 1008
 1009% Creates a Unix Domain Socket file in a secured directory.
 1010% Throws if the directory or file cannot be created in /tmp for any reason
 1011% Requirements for this file are:
 1012%    - The Prolog process will attempt to create and, if Prolog exits cleanly,
 1013%           delete this file when the server closes.  This means the directory
 1014%           must have the appropriate permissions to allow the Prolog process
 1015%           to do so.
 1016%    - For security reasons, the filename should not be predictable and the
 1017%           directory it is contained in should have permissions set so that files
 1018%           created are only accessible to the current user.
 1019%    - The path must be below 92 *bytes* long (including null terminator) to
 1020%           be portable according to the Linux documentation
 1021%
 1022% tmp_file finds the right /tmp directory, even on Mac OS, so the path is small
 1023% Set 700 (rwx------)  permission so it is only accessible by current user
 1024% Create a secure tmp file in the new directory
 1025% {set,current}_prolog_flag is copied to a thread, so no need to use a mutex.
 1026% Close the stream so sockets can use it
 1027unix_domain_socket_path(Created_Directory, File_Path) :-
 1028    tmp_file(udsock, Created_Directory),
 1029    make_directory(Created_Directory),
 1030    catch(  chmod(Created_Directory, urwx),
 1031            Exception,
 1032            (   catch(delete_directory(Created_Directory), error(_, _), true),
 1033                throw(Exception)
 1034            )
 1035    ),
 1036    setup_call_cleanup( (   current_prolog_flag(tmp_dir, Save_Tmp_Dir),
 1037                            set_prolog_flag(tmp_dir, Created_Directory)
 1038                        ),
 1039                        tmp_file_stream(File_Path, Stream, []),
 1040                        set_prolog_flag(tmp_dir, Save_Tmp_Dir)
 1041                      ),
 1042    close(Stream).
 1043
 1044
 1045% Helper for installing the mqi.pl file to the right
 1046% library directory.
 1047% Call using swipl -s mqi.pl -g "mqi:install_to_library('mqi.pl')" -t halt
 1048install_to_library(File) :-
 1049    once(find_library(Path)),
 1050    copy_file(File, Path),
 1051    make.
 1052
 1053
 1054% Find the base library path, i.e. the one that ends in
 1055% "library/"
 1056find_library(Path) :-
 1057    file_alias_path(library, Path),
 1058    atomic_list_concat(Parts, '/', Path),
 1059    reverse(Parts, Parts_Reverse),
 1060    nth0(0, Parts_Reverse, ''),
 1061    nth0(1, Parts_Reverse, Library),
 1062    string_lower(Library, 'library')