View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2008-2016, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(thread_pool,
   37          [ thread_pool_create/3,       % +Pool, +Size, +Options
   38            thread_pool_destroy/1,      % +Pool
   39            thread_create_in_pool/4,    % +Pool, :Goal, -Id, +Options
   40
   41            current_thread_pool/1,      % ?Pool
   42            thread_pool_property/2      % ?Pool, ?Property
   43          ]).   44:- use_module(library(debug),[debug/3]).   45:- autoload(library(error),[must_be/2,type_error/2]).   46:- autoload(library(lists),[member/2,delete/3]).   47:- autoload(library(option),
   48	    [meta_options/3,select_option/4,merge_options/3,option/3]).   49:- autoload(library(rbtrees),
   50	    [ rb_new/1,
   51	      rb_insert_new/4,
   52	      rb_delete/3,
   53	      rb_keys/2,
   54	      rb_lookup/3,
   55	      rb_update/4
   56	    ]).

Resource bounded thread management

The module library(thread_pool) manages threads in pools. A pool defines properties of its member threads and the maximum number of threads that can coexist in the pool. The call thread_create_in_pool/4 allocates a thread in the pool, just like thread_create/3. If the pool is fully allocated it can be asked to wait or raise an error.

The library has been designed to deal with server applications that receive a variety of requests, such as HTTP servers. Simply starting a thread for each request is a bit too simple minded for such servers:

Using this library, one can define a pool for each set of tasks with comparable characteristics and create threads in this pool. Unlike the worker-pool model, threads are not started immediately. Depending on the design, both approaches can be attractive.

The library is implemented by means of a manager thread with the fixed thread id __thread_pool_manager. All state is maintained in this manager thread, which receives and processes requests to create and destroy pools, create threads in a pool and handle messages from terminated threads. Thread pools are not saved in a saved state and must therefore be recreated using the initialization/1 directive or otherwise during startup of the application.

See also
- http_handler/3 and http_spawn/2. */
   93:- meta_predicate
   94    thread_create_in_pool(+, 0, -, :).   95:- predicate_options(thread_create_in_pool/4, 4,
   96                     [ wait(boolean),
   97                       pass_to(system:thread_create/3, 3)
   98                     ]).   99
  100:- multifile
  101    create_pool/1.
 thread_pool_create(+Pool, +Size, +Options) is det
Create a pool of threads. A pool of threads is a declaration for creating threads with shared properties (stack sizes) and a limited number of threads. Threads are created using thread_create_in_pool/4. If all threads in the pool are in use, the behaviour depends on the wait option of thread_create_in_pool/4 and the backlog option described below. Options are passed to thread_create/3, except for
backlog(+MaxBackLog)
Maximum number of requests that can be suspended. Default is infinite. Otherwise it must be a non-negative integer. Using backlog(0) will never delay thread creation for this pool.

The pooling mechanism does not interact with the detached state of a thread. Threads can be created both detached and normal and must be joined using thread_join/2 if they are not detached.

  124thread_pool_create(Name, Size, Options) :-
  125    must_be(list, Options),
  126    pool_manager(Manager),
  127    thread_self(Me),
  128    thread_send_message(Manager, create_pool(Name, Size, Options, Me)),
  129    wait_reply.
 thread_pool_destroy(+Name) is det
Destroy the thread pool named Name.
Errors
- existence_error(thread_pool, Name).
  137thread_pool_destroy(Name) :-
  138    pool_manager(Manager),
  139    thread_self(Me),
  140    thread_send_message(Manager, destroy_pool(Name, Me)),
  141    wait_reply.
 current_thread_pool(?Name) is nondet
True if Name refers to a defined thread pool.
  148current_thread_pool(Name) :-
  149    pool_manager(Manager),
  150    thread_self(Me),
  151    thread_send_message(Manager, current_pools(Me)),
  152    wait_reply(Pools),
  153    (   atom(Name)
  154    ->  memberchk(Name, Pools)
  155    ;   member(Name, Pools)
  156    ).
 thread_pool_property(?Name, ?Property) is nondet
True if Property is a property of thread pool Name. Defined properties are:
options(Options)
Thread creation options for this pool
free(Size)
Number of free slots on this pool
size(Size)
Total number of slots on this pool
members(ListOfIDs)
ListOfIDs is the list or threads running in this pool
running(Running)
Number of running threads in this pool
backlog(Size)
Number of delayed thread creations on this pool
  176thread_pool_property(Name, Property) :-
  177    current_thread_pool(Name),
  178    pool_manager(Manager),
  179    thread_self(Me),
  180    thread_send_message(Manager, pool_properties(Me, Name, Property)),
  181    wait_reply(Props),
  182    (   nonvar(Property)
  183    ->  memberchk(Property, Props)
  184    ;   member(Property, Props)
  185    ).
 thread_create_in_pool(+Pool, :Goal, -Id, +Options) is det
Create a thread in Pool. Options overrule default thread creation options associated to the pool. In addition, the following option is defined:
wait(+Boolean)
If true (default) and the pool is full, wait until a member of the pool completes. If false, throw a resource_error.
Errors
- resource_error(threads_in_pool(Pool)) is raised if wait is false or the backlog limit has been reached.
- existence_error(thread_pool, Pool) if Pool does not exist.
  204thread_create_in_pool(Pool, Goal, Id, QOptions) :-
  205    meta_options(is_meta, QOptions, Options),
  206    catch(thread_create_in_pool_(Pool, Goal, Id, Options),
  207          Error, true),
  208    (   var(Error)
  209    ->  true
  210    ;   Error = error(existence_error(thread_pool, Pool), _),
  211        create_pool_lazily(Pool)
  212    ->  thread_create_in_pool_(Pool, Goal, Id, Options)
  213    ;   throw(Error)
  214    ).
  215
  216thread_create_in_pool_(Pool, Goal, Id, Options) :-
  217    select_option(wait(Wait), Options, ThreadOptions, true),
  218    pool_manager(Manager),
  219    thread_self(Me),
  220    thread_send_message(Manager,
  221                        create(Pool, Goal, Me, Wait, Id, ThreadOptions)),
  222    wait_reply(Id).
  223
  224is_meta(at_exit).
 create_pool_lazily(+Pool) is semidet
Call the hook create_pool/1 to create the pool lazily.
  231create_pool_lazily(Pool) :-
  232    with_mutex(Pool,
  233               ( mutex_destroy(Pool),
  234                 create_pool_sync(Pool)
  235               )).
  236
  237create_pool_sync(Pool) :-
  238    current_thread_pool(Pool),
  239    !.
  240create_pool_sync(Pool) :-
  241    create_pool(Pool).
  242
  243
  244                 /*******************************
  245                 *        START MANAGER         *
  246                 *******************************/
 pool_manager(-ThreadID) is det
ThreadID is the thread (alias) identifier of the manager. Starts the manager if it is not running.
  253pool_manager(TID) :-
  254    TID = '__thread_pool_manager',
  255    (   thread_running(TID)
  256    ->  true
  257    ;   with_mutex('__thread_pool', create_pool_manager(TID))
  258    ).
  259
  260thread_running(Thread) :-
  261    catch(thread_property(Thread, status(Status)),
  262          E, true),
  263    (   var(E)
  264    ->  (   Status == running
  265        ->  true
  266        ;   thread_join(Thread, _),
  267            print_message(warning, thread_pool(manager_died(Status))),
  268            fail
  269        )
  270    ;   E = error(existence_error(thread, Thread), _)
  271    ->  fail
  272    ;   throw(E)
  273    ).
  274
  275create_pool_manager(Thread) :-
  276    thread_running(Thread),
  277    !.
  278create_pool_manager(Thread) :-
  279    thread_create(pool_manager_main, _,
  280                  [ alias(Thread),
  281                    inherit_from(main)
  282                  ]).
  283
  284
  285pool_manager_main :-
  286    rb_new(State0),
  287    manage_thread_pool(State0).
  288
  289
  290                 /*******************************
  291                 *        MANAGER LOGIC         *
  292                 *******************************/
 manage_thread_pool(+State)
  296manage_thread_pool(State0) :-
  297    thread_get_message(Message),
  298    (   update_thread_pool(Message, State0, State)
  299    ->  debug(thread_pool(state), 'Message ~p --> ~p', [Message, State]),
  300        manage_thread_pool(State)
  301    ;   format(user_error, 'Update failed: ~p~n', [Message])
  302    ).
  303
  304
  305update_thread_pool(create_pool(Name, Size, Options, For), State0, State) :-
  306    !,
  307    (   rb_insert_new(State0,
  308                      Name, tpool(Options, Size, Size, WP, WP, []),
  309                      State)
  310    ->  thread_send_message(For, thread_pool(true))
  311    ;   reply_error(For, permission_error(create, thread_pool, Name)),
  312        State = State0
  313    ).
  314update_thread_pool(destroy_pool(Name, For), State0, State) :-
  315    !,
  316    (   rb_delete(State0, Name, State)
  317    ->  thread_send_message(For, thread_pool(true))
  318    ;   reply_error(For, existence_error(thread_pool, Name)),
  319        State = State0
  320    ).
  321update_thread_pool(current_pools(For), State, State) :-
  322    !,
  323    rb_keys(State, Keys),
  324    debug(thread_pool(current), 'Reply to ~w: ~p', [For, Keys]),
  325    reply(For, Keys).
  326update_thread_pool(pool_properties(For, Name, P), State, State) :-
  327    !,
  328    (   rb_lookup(Name, Pool, State)
  329    ->  findall(P, pool_property(P, Pool), List),
  330        reply(For, List)
  331    ;   reply_error(For, existence_error(thread_pool, Name))
  332    ).
  333update_thread_pool(Message, State0, State) :-
  334    arg(1, Message, Name),
  335    (   rb_lookup(Name, Pool0, State0)
  336    ->  update_pool(Message, Pool0, Pool),
  337        rb_update(State0, Name, Pool, State)
  338    ;   State = State0,
  339        (   Message = create(Name, _, For, _, _, _)
  340        ->  reply_error(For, existence_error(thread_pool, Name))
  341        ;   true
  342        )
  343    ).
  344
  345pool_property(options(Options),
  346              tpool(Options, _Free, _Size, _WP, _WPT, _Members)).
  347pool_property(backlog(Size),
  348              tpool(_, _Free, _Size, WP, WPT, _Members)) :-
  349    diff_list_length(WP, WPT, Size).
  350pool_property(free(Free),
  351              tpool(_, Free, _Size, _, _, _)).
  352pool_property(size(Size),
  353              tpool(_, _Free, Size, _, _, _)).
  354pool_property(running(Count),
  355              tpool(_, Free, Size, _, _, _)) :-
  356    Count is Size - Free.
  357pool_property(members(IDList),
  358              tpool(_, _, _, _, _, IDList)).
  359
  360diff_list_length(List, Tail, Size) :-
  361    '$skip_list'(Length, List, Rest),
  362    (   Rest == Tail
  363    ->  Size = Length
  364    ;   type_error(difference_list, List/Tail)
  365    ).
 update_pool(+Message, +Pool0, -Pool) is det
Deal with create requests and completion messages on a given pool. There are two messages:
create(PoolName, Goal, ForThread, Wait, Id, Options)
Create a new thread on behalf of ForThread. There are two cases:
  • Free slots: create the thread
  • No free slots: error or add to waiting
exitted(PoolName, Thread)
A thread completed. If there is a request waiting, create a new one.
  382update_pool(create(Name, Goal, For, _, Id, MyOptions),
  383            tpool(Options, Free0, Size, WP, WPT, Members0),
  384            tpool(Options, Free, Size, WP, WPT, Members)) :-
  385    succ(Free, Free0),
  386    !,
  387    merge_options(MyOptions, Options, ThreadOptions),
  388    select_option(at_exit(AtExit), ThreadOptions, ThreadOptions1, true),
  389    catch(thread_create(Goal, Id,
  390                        [ at_exit(worker_exitted(Name, Id, AtExit))
  391                        | ThreadOptions1
  392                        ]),
  393          E, true),
  394    (   var(E)
  395    ->  Members = [Id|Members0],
  396        reply(For, Id)
  397    ;   reply_error(For, E),
  398        Members = Members0
  399    ).
  400update_pool(Create,
  401            tpool(Options, 0, Size, WP, WPT0, Members),
  402            tpool(Options, 0, Size, WP, WPT, Members)) :-
  403    Create = create(Name, _Goal, For, Wait, _, _Options),
  404    !,
  405    option(backlog(BackLog), Options, infinite),
  406    (   can_delay(Wait, BackLog, WP, WPT0)
  407    ->  WPT0 = [Create|WPT],
  408        debug(thread_pool, 'Delaying ~p', [Create])
  409    ;   WPT = WPT0,
  410        reply_error(For, resource_error(threads_in_pool(Name)))
  411    ).
  412update_pool(exitted(_Name, Id),
  413            tpool(Options, Free0, Size, WP0, WPT, Members0),
  414            Pool) :-
  415    succ(Free0, Free),
  416    delete(Members0, Id, Members1),
  417    Pool1 = tpool(Options, Free, Size, WP, WPT, Members1),
  418    (   WP0 == WPT
  419    ->  WP = WP0,
  420        Pool = Pool1
  421    ;   WP0 = [Waiting|WP],
  422        debug(thread_pool, 'Start delayed ~p', [Waiting]),
  423        update_pool(Waiting, Pool1, Pool)
  424    ).
  425
  426
  427can_delay(true, infinite, _, _) :- !.
  428can_delay(true, BackLog, WP, WPT) :-
  429    diff_list_length(WP, WPT, Size),
  430    BackLog > Size.
 worker_exitted(+PoolName, +WorkerId, :AtExit)
It is possible that '__thread_pool_manager' no longer exists while closing down the process because the manager was killed before the worker.
To be done
- Find a way to discover that we are terminating Prolog.
  440:- public
  441    worker_exitted/3.  442
  443worker_exitted(Name, Id, AtExit) :-
  444    catch(thread_send_message('__thread_pool_manager', exitted(Name, Id)),
  445          _, true),
  446    call(AtExit).
  447
  448
  449                 /*******************************
  450                 *             UTIL             *
  451                 *******************************/
  452
  453reply(To, Term) :-
  454    thread_send_message(To, thread_pool(true(Term))).
  455
  456reply_error(To, Error) :-
  457    thread_send_message(To, thread_pool(error(Error, _))).
  458
  459wait_reply :-
  460    thread_get_message(thread_pool(Result)),
  461    (   Result == true
  462    ->  true
  463    ;   Result == fail
  464    ->  fail
  465    ;   throw(Result)
  466    ).
  467
  468wait_reply(Value) :-
  469    thread_get_message(thread_pool(Reply)),
  470    (   Reply = true(Value0)
  471    ->  Value = Value0
  472    ;   Reply == fail
  473    ->  fail
  474    ;   throw(Reply)
  475    ).
  476
  477
  478                 /*******************************
  479                 *             HOOKS            *
  480                 *******************************/
 create_pool(+PoolName) is semidet
Hook to create a thread pool lazily. The hook is called if thread_create_in_pool/4 discovers that the thread pool does not exist. If the hook succeeds, thread_create_in_pool/4 retries creating the thread. For example, we can use the following declaration to create threads in the pool media, which holds a maximum of 20 threads.
:- multifile thread_pool:create_pool/1.

thread_pool:create_pool(media) :-
    thread_pool_create(media, 20, []).
  498                 /*******************************
  499                 *            MESSAGES          *
  500                 *******************************/
  501:- multifile
  502    prolog:message/3.  503
  504prolog:message(thread_pool(Message)) -->
  505    message(Message).
  506
  507message(manager_died(Status)) -->
  508    [ 'Thread-pool: manager died on status ~p; restarting'-[Status] ]