1% * -*- Mode: Prolog -*- */
    2
    3% Implements a simple job queue, with dependencies, using thread pools & message queues.
    4
    5:- module(poolq,
    6          [
    7             poolq_create/3,
    8	     poolq_submit_job/5,
    9	     poolq_wait/2
   10          ]).   11
   12:- dynamic job_waiting/4.      % job_waiting(JobId,DepJobIds,Goal,Options).
   13:- dynamic job_running/2.      % job_running(JobId,Thread).
   14:- dynamic job_complete/2.     % job_complete(JobId,Status).
   15
   16:- use_module(library(thread_pool)).   17
   18poolq_create(Scheduler,Size,Options) :-
   19	thread_create(init_scheduler(Size,Options),Scheduler,[]),
   20	debug(poolq,"Started scheduling thread ~w",[Scheduler]).
   21
   22poolq_submit_job(Scheduler,Goal,JobId,JobDepIds,Options) :-
   23	thread_send_message(Scheduler,submit(Goal,JobId,JobDepIds,Options)),
   24	debug(poolq,"Sent message 'submit(~w,~w <-- ~w,~w)' to scheduling thread ~w",[Goal,JobId,JobDepIds,Options,Scheduler]).
   25
   26poolq_wait(Scheduler,Status) :-
   27	thread_send_message(Scheduler,finish),
   28	debug(poolq,"Sent message 'finish' to scheduling thread ~w",[Scheduler]),
   29	thread_join(Scheduler,Status),
   30	debug(poolq,"Scheduling thread ~w terminated",[Scheduler]).
   31
   32init_scheduler(Size,Options) :-
   33	debug(poolq,"Scheduler: initializing",[]),
   34	thread_self(Pool),
   35	thread_pool_create(Pool,Size,Options),
   36	debug(poolq,"Scheduler: created thread pool ~w with ~w threads, options ~w",[Pool,Size,Options]),
   37	wait_for_message.
   38
   39wait_for_message :-
   40	receive_message(Msg),
   41	process_message(Msg).
   42
   43receive_message(Msg) :-
   44	debug(poolq,"Scheduler: waiting for message",[]),
   45	thread_get_message(Msg),
   46	debug(poolq,"Scheduler: received message '~w'",[Msg]).
   47
   48process_message(Msg) :-
   49	process_submit_message(Msg),
   50	!,
   51	wait_for_message.
   52
   53process_message(Msg) :-
   54	process_complete_message(Msg),
   55	!,
   56	wait_for_message.
   57
   58process_message(finish) :-
   59	!,
   60	finish_queued_jobs.
   61
   62process_message(Msg) :-
   63	process_error(Msg),
   64	!,
   65	wait_for_message.
   66
   67process_submit_message(submit(Goal,JobId,DepJobIds,Options)) :-
   68	none_waiting_or_running(DepJobIds),
   69	!,
   70	debug(poolq,"Scheduler: job ~w has no unmet dependencies, starting immediately",[JobId]),
   71	start_job(JobId,Goal,Options).
   72
   73process_submit_message(submit(Goal,JobId,DepJobIds,Options)) :-
   74	!,
   75	debug(poolq,"Scheduler: job ~w has dependencies ~w; postponing",[JobId,DepJobIds]),
   76	assert(job_waiting(JobId,DepJobIds,Goal,Options)).
   77
   78process_complete_message(complete(JobId,JobStatus)) :-
   79	job_running(JobId,Thread),
   80	!,
   81	debug(poolq,"Scheduler: job ~w on thread ~w finished with status ~w",[JobId,Thread,JobStatus]),
   82	retract(job_running(JobId,Thread)),
   83	assert(job_complete(JobId,JobStatus)),
   84	thread_join(Thread,ThreadStatus),
   85	debug(poolq,"Scheduler: job thread ~w terminated with status ~w",[Thread,ThreadStatus]),
   86	start_queued_jobs.
   87
   88process_error(Msg) :-
   89	format("Error: couldn't process message '~w'~n",[Msg]).
   90
   91finish_queued_jobs :-
   92	start_queued_jobs,
   93	wait_for_queue,
   94	thread_self(Pool),
   95	thread_pool_destroy(Pool),
   96	debug(poolq,"Scheduler: destroyed thread pool",[]).
   97
   98wait_for_queue :-
   99	job_running(_,_),
  100	!,
  101	receive_message(Msg),
  102	(process_complete_message(Msg) ; process_error(Msg)),
  103	wait_for_queue.
  104wait_for_queue :-
  105	job_waiting(_,_,_,_),
  106	!,
  107	bagof(JobId,job_waiting(JobId,_,_,_),AbandonedJobs),
  108	format("Warning: unprocessed jobs ~w~n",[AbandonedJobs]),
  109	fail.
  110wait_for_queue.
  111
  112start_queued_jobs :-
  113	debug(poolq,"Scheduler: looking for postponed jobs",[]),
  114	start_queued_job,
  115	!,
  116	start_queued_jobs.
  117start_queued_jobs :-
  118	\+ job_waiting(_,_,_,_),
  119	!,
  120	debug(poolq,"Scheduler: no jobs waiting",[]).
  121start_queued_jobs :-
  122	debug(poolq,"Scheduler: no jobs ready to run",[]).
  123
  124start_queued_job :-
  125	job_waiting(JobId,DepJobIds,Goal,Options),
  126	none_waiting_or_running(DepJobIds),
  127	retract(job_waiting(JobId,DepJobIds,Goal,Options)),
  128	start_job(JobId,Goal,Options).
  129
  130start_job(JobId,Goal,Options) :-
  131	debug(poolq,"Scheduler: starting job ~w: ~w",[Goal,Options]),
  132	thread_self(Self),
  133	thread_create_in_pool(Self,run_job(Self,JobId,Goal),Thread,Options),
  134	assert(job_running(JobId,Thread)).
  135
  136run_job(Scheduler,JobId,Goal) :-
  137	job_status(Goal,Status),
  138	thread_send_message(Scheduler,complete(JobId,Status)),
  139	debug(poolq,"Job ~w: sent message 'complete(~w,~w)' to scheduling thread ~w",[JobId,JobId,Status,Scheduler]).
  140
  141job_status(Goal,true) :- call(Goal), !.
  142job_status(_,false).
  143
  144none_waiting_or_running(JobIds) :-
  145	forall(member(JobId,JobIds),
  146	       \+ (job_waiting(JobId,_,_,_)
  147                   ; job_running(JobId,_)))