1% * -*- Mode: Prolog -*- */
    2
    3:- module(queue,
    4          [
    5              queue_engine/1,
    6              init_queue/2,
    7              release_queue/1,
    8	      write_script_file/4,
    9	      write_script_file/7,
   10	      run_execs_in_queue/4,
   11	      flush_queue_recursive/2
   12          ]).   13
   14:- use_module(library(readutil)).   15
   16:- use_module(library(biomake/biomake)).   17:- use_module(library(biomake/utils)).   18
   19:- discontiguous queue_engine/1.   20:- discontiguous init_queue/2.   21:- discontiguous release_queue/1.   22:- discontiguous run_execs_in_queue/4.   23:- discontiguous default_qsub_exec/2.   24:- discontiguous default_qdel_exec/2.   25:- discontiguous qsub_output_arg/3.   26:- discontiguous qsub_error_arg/3.   27:- discontiguous qsub_dep_arg/3.   28:- discontiguous qsub_dep_arg_prefix/2.   29:- discontiguous qsub_dep_prefix/2.   30:- discontiguous qsub_dep_separator/2.   31:- discontiguous qsub_extra_args/2.   32:- discontiguous qsub_script_headers/4.   33:- discontiguous qsub_job_id/3.   34
   35% ----------------------------------------
   36% SUPPORTED QUEUE ENGINES
   37% ----------------------------------------
   38
   39% ----------------------------------------
   40% GENERIC JOB SUBMISSION
   41% ----------------------------------------
   42
   43run_execs_with_qsub(Engine,Rule,SL,Opts) :-
   44	rule_target(Rule,T,Opts),
   45        rule_dependencies(Rule,DL,Opts),
   46	qsub_kill(Engine,T,SL,Opts),
   47	(get_opt(qsub_exec,QsubExec,Opts); default_qsub_exec(Engine,QsubExec)),
   48	(get_opt(qsub_args,QsubArgs,Opts); QsubArgs = ""),
   49	(get_opt(queue_args,QArgs,Opts); QArgs = ""),
   50	(get_opt(qsub_header,QsubHeader,Opts); QsubHeader = ""),
   51	(get_opt(qsub_header_file,QsubHeaderFile,Opts)
   52	 -> file_contents(QsubHeaderFile,QsubHeaderFileContents)
   53	 ; QsubHeaderFileContents = ""),
   54	qsub_extra_args(Engine,ExtraArgs),
   55	bindvar_rule('QsubArgs',Rule,Opts,RuleQsubArgs),
   56	bindvar_rule('QsubHeader',Rule,Opts,RuleQsubHeader),
   57	bindvar_rule('QsubHeaderFile',Rule,Opts,RuleQsubHeaderFile),
   58	file_contents(RuleQsubHeaderFile,RuleQsubHeaderFileContents),
   59	biomake_private_filename_dir_exists(T,[Engine,"job"],JobFilename),
   60	format(string(RemoveJobFile),"rm ~w",[JobFilename]),
   61	qsub_rule_execs(Rule,Es,Opts),
   62	qsub_job_ids(Engine,DL,DepJobs),
   63	qsub_script_headers(Engine,DepJobs,Opts,Headers),
   64	append(Headers,["","# Main script"],HeadersWithComment),
   65	qsub_dep_arg(Engine,DepJobs,DepArg),
   66	biomake_private_filename_dir_exists(T,[Engine,"out"],OutPath),
   67	qsub_output_arg(Engine,OutPath,OutArg),
   68	biomake_private_filename_dir_exists(T,[Engine,"err"],ErrPath),
   69	qsub_error_arg(Engine,ErrPath,ErrArg),
   70	!,
   71	write_script_file(T,
   72			  ["# Header from QsubHeader variable",
   73			   RuleQsubHeader,
   74			   "# Header from --qsub-header",
   75			   QsubHeader,
   76			   "# Contents of header file specified by QsubHeaderFile variable",
   77			   RuleQsubHeaderFileContents,
   78			   "# Contents of header file specified by --qsub-header-file",
   79			   QsubHeaderFileContents,
   80			   "# Generic headers" | HeadersWithComment],
   81			  Es,
   82			  [RemoveJobFile],
   83			  Opts,
   84			  [Engine],
   85			  ScriptFilename),
   86	format(string(QsubCmd),"~w ~w ~w ~w ~w ~w ~w ~w ~w >~w",[QsubExec,OutArg,ErrArg,QArgs,QsubArgs,RuleQsubArgs,DepArg,ExtraArgs,ScriptFilename,JobFilename]),
   87	report("Submitting job: ~w",[QsubCmd],Opts),
   88	shell(QsubCmd).
   89
   90qsub_rule_execs(Rule,[Chdir,Biomake],Opts) :-
   91	qsub_use_biomake(Opts),
   92	!,
   93	rule_target(Rule,T,Opts),
   94	get_opt(biomake_cwd,Dir,Opts),
   95	get_opt(biomake_prog,Prog,Opts),
   96	get_opt(biomake_args,Args,Opts),
   97	get_opt(qsub_biomake_args,QArgs,Opts),
   98	format(string(Chdir),"cd ~w",[Dir]),  % probably redundant: write_script_file starts with a cd to CWD
   99	format(string(Biomake),"~w ~w ~w ~w",[Prog,Args,QArgs,T]).
  100
  101qsub_rule_execs(Rule,Es,Opts) :-
  102	rule_execs(Rule,Es,Opts).
  103
  104file_contents(Filename,String) :-
  105        Filename \= '',
  106	exists_file(Filename),
  107	read_file_to_codes(Filename,Codes,[]),
  108	string_codes(String,Codes),
  109	!.
  110file_contents(_,"").
  111
  112qsub_use_biomake(Opts) :-
  113	get_opt(qsub_use_biomake,true,Opts).
  114
  115qsub_job_ids(Engine,[D|Ds],[N|Ns]) :-
  116	qsub_job_id(Engine,D,N),
  117	!,
  118	qsub_job_ids(Engine,Ds,Ns).
  119qsub_job_ids(Engine,[_|Ds],Ns) :-
  120	!,
  121	qsub_job_ids(Engine,Ds,Ns).
  122qsub_job_ids(_,[],[]).
  123
  124qsub_numeric_job_id(Engine,T,Id) :-
  125	biomake_private_filename(T,[Engine,"job"],JobFilename),
  126	exists_file(JobFilename),
  127	phrase_from_file(first_int(Id),JobFilename).
  128
  129first_int(N) --> before_first_int(Cs), !, {number_codes(N,Cs)}.
  130before_first_int([C|Cs]) --> parse_num_code(C), !, first_int_codes(Cs).
  131before_first_int(Cs) --> [_], before_first_int(Cs).
  132first_int_codes([C|Cs]) --> parse_num_code(C), !, first_int_codes(Cs).
  133first_int_codes([]) --> [_], first_int_codes([]).
  134first_int_codes([]) --> [].
  135
  136qsub_make_dep_arg(_,[],"").
  137qsub_make_dep_arg(Engine,DepJobs,DepArg) :-
  138        qsub_dep_arg_prefix(Engine,DepArgPrefix),
  139        qsub_dep_prefix(Engine,DepPrefix),
  140	qsub_dep_separator(Engine,DepSep),
  141	string_concat(DepSep,DepPrefix,SepPrefix),
  142	atomic_list_concat(DepJobs,SepPrefix,DepJobStr),
  143	format(string(DepArg),"~w~w~w",[DepArgPrefix,DepPrefix,DepJobStr]).
  144
  145% ----------------------------------------
  146% KILLING JOBS
  147% ----------------------------------------
  148
  149qsub_kill(Engine,T,SL,Opts) :-
  150	qsub_job_id(Engine,T,Id),
  151	(get_opt(qdel_exec,QdelExec,Opts); default_qdel_exec(Engine,QdelExec)),
  152	(get_opt(qdel_args,QdelArgs,Opts); QdelArgs = ""),
  153	(get_opt(queue_args,QArgs,Opts); QArgs = ""),
  154	format(string(QdelCmd),"~w ~w ~w ~w",[QdelExec,QArgs,QdelArgs,Id]),
  155	verbose_report("Killing previous job: ~w",[QdelCmd],SL,Opts),
  156	(shell(QdelCmd); true),
  157	biomake_private_filename(T,[Engine,"job"],JobFilename),
  158	(exists_file(JobFilename) -> delete_file(JobFilename); true).
  159qsub_kill(_,_,_,_).
  160
  161flush_queue_recursive(Dir,Opts) :-
  162	get_opt(queue,Engine,Opts),
  163	absolute_file_name(Dir,AbsDir),  % guard against Dir='.'
  164	flush_queue_recursive(Engine,AbsDir,[],Opts).
  165
  166flush_queue_recursive(_,X,_,_) :-
  167	atom_chars(X,['.'|_]),
  168	!.
  169flush_queue_recursive(Engine,Dir,SL,Opts) :-
  170	exists_directory(Dir),
  171	!,
  172	verbose_report("Scanning ~w",[Dir],SL,Opts),
  173	directory_files(Dir,Files),
  174	forall(member(File,Files),
  175	       flush_queue_recursive(Engine,File,[Dir|SL],Opts)).
  176flush_queue_recursive(Engine,File,SL,Opts) :-
  177	qsub_kill(Engine,File,SL,Opts),
  178	!.
  179flush_queue_recursive(_,_,_,_).
  180
  181% ----------------------------------------
  182% WRITING COMMANDS TO SCRIPT FILES
  183% ----------------------------------------
  184
  185write_script_file(T,Es,Opts,ScriptFilename) :-
  186	write_script_file(T,[],Es,[],Opts,[],ScriptFilename).
  187
  188write_script_file(T,Headers,Es,Cleanup,Opts,Subdirs,ScriptFilename) :-
  189        (get_opt(oneshell,true,Opts)
  190	 -> maplist(echo_wrap,Es,ShellExecs)
  191	 ; maplist(shell_echo_wrap,Es,ShellExecs)),
  192	maplist(shell_wrap,Cleanup,ShellCleanup),
  193	append(ShellExecs,ShellCleanup,ExecsWithCleanup),
  194	write_script_file_contents(T,Headers,ExecsWithCleanup,Opts,Subdirs,ScriptFilename).
  195
  196write_script_file_contents(T,Headers,Execs,Opts,Subdirs,ScriptFilename) :-
  197	working_directory(CWD,CWD),
  198	open_script_file(T,Subdirs,ScriptFilename,IO),
  199	shell_path(Sh),
  200	wrap_shell_execs(T,Execs,Opts,Subdirs,ExecStr),
  201	concat_string_list(Headers,HeaderStr,"\n"),
  202	format(IO,"#!~w~n~w~ncd ~w~n~w~n",[Sh,HeaderStr,CWD,ExecStr]),
  203	close(IO),
  204	format(string(Chmod),"chmod +x ~w",[ScriptFilename]),
  205	shell(Chmod).
  206
  207wrap_shell_execs(T,Execs,_Opts,Subdirs,ShellFilename) :-
  208        shell_var_specified(Sh),
  209        !,
  210	open_shell_file(T,Subdirs,ShellFilename,IO),
  211	concat_string_list(Execs,ExecStr,"\n"),
  212	format(IO,"#!~w~n~w~n",[Sh,ExecStr]),
  213	close(IO),
  214	format(string(Chmod),"chmod +x ~w",[ShellFilename]),
  215	shell(Chmod).
  216
  217wrap_shell_execs(_T,Execs,_Opts,_Subdirs,ExecStr) :-
  218        !,
  219	join_with_ands(Execs,ExecStr).
  220
  221join_with_ands(List,Str) :-
  222        !,
  223	concat_string_list(List,Str," &&\n").
  224
  225open_script_file(Target,Subdirs,Filename,Stream) :-
  226        append(Subdirs,["script"],SubdirsScript),
  227	open_biomake_private_file(Target,SubdirsScript,Filename,Stream).
  228
  229open_shell_file(Target,Subdirs,Filename,Stream) :-
  230        append(Subdirs,["shell"],SubdirsShell),
  231	open_biomake_private_file(Target,SubdirsShell,Filename,Stream).
  232
  233% ----------------------------------------
  234% No queue engine (runs execs immediately)
  235% ----------------------------------------
  236
  237queue_engine(none).
  238init_queue(none,_).
  239release_queue(none).
  240
  241run_execs_in_queue(none,Rule,SL,Opts) :-
  242	run_execs_now(Rule,SL,Opts).
  243
  244% ----------------------------------------
  245% Test queue engine (just runs script)
  246% ----------------------------------------
  247
  248queue_engine(test).
  249init_queue(test,_).
  250release_queue(test).
  251
  252run_execs_in_queue(test,Rule,SL,Opts) :-
  253	rule_target(Rule,T,Opts),
  254        rule_dependencies(Rule,DL,Opts),
  255	qsub_rule_execs(Rule,Es,Opts),
  256	write_script_file(T,Es,Opts,Script),
  257	report_run_exec(Script,T,SL,Opts),
  258	update_hash(T,DL,Opts).
  259
  260% ----------------------------------------
  261% Sun Grid Engine
  262% ----------------------------------------
  263
  264queue_engine(sge).
  265init_queue(sge,_).
  266release_queue(sge).
  267
  268run_execs_in_queue(sge,Rule,SL,Opts) :-
  269	run_execs_with_qsub(sge,Rule,SL,Opts).
  270
  271default_qsub_exec(sge,"qsub").
  272default_qdel_exec(sge,"qdel").
  273qsub_output_arg(sge,F,S) :- format(string(S),"-o ~w",[F]).
  274qsub_error_arg(sge,F,S) :- format(string(S),"-e ~w",[F]).
  275qsub_dep_arg_prefix(sge,"-hold_jid ").
  276qsub_dep_prefix(sge,"").
  277qsub_dep_separator(sge,",").
  278qsub_dep_arg(sge,DepJobs,Arg) :- qsub_make_dep_arg(sge,DepJobs,Arg).
  279qsub_extra_args(sge,"").
  280qsub_script_headers(sge,_,_,[]).
  281qsub_job_id(sge,T,N) :- qsub_numeric_job_id(sge,T,N).
  282
  283% ----------------------------------------
  284% PBS
  285% ----------------------------------------
  286
  287queue_engine(pbs).
  288init_queue(pbs,_).
  289release_queue(pbs).
  290
  291run_execs_in_queue(pbs,Rule,SL,Opts) :-
  292	run_execs_with_qsub(pbs,Rule,SL,Opts).
  293
  294default_qsub_exec(pbs,"qsub").
  295default_qdel_exec(pbs,"qdel").
  296qsub_output_arg(pbs,F,S) :- format(string(S),"-o ~w",[F]).
  297qsub_error_arg(pbs,F,S) :- format(string(S),"-e ~w",[F]).
  298qsub_dep_arg_prefix(pbs,"-W depend=").
  299qsub_dep_prefix(pbs,"afterok:").
  300qsub_dep_separator(pbs,",").
  301qsub_dep_arg(pbs,DepJobs,Arg) :- qsub_make_dep_arg(pbs,DepJobs,Arg).
  302qsub_extra_args(pbs,"").
  303qsub_script_headers(pbs,_,_,[]).
  304qsub_job_id(pbs,T,N) :- qsub_numeric_job_id(pbs,T,N).
  305
  306% ----------------------------------------
  307% SLURM
  308% ----------------------------------------
  309
  310queue_engine(slurm).
  311init_queue(slurm,_).
  312release_queue(slurm).
  313
  314run_execs_in_queue(slurm,Rule,SL,Opts) :-
  315	run_execs_with_qsub(slurm,Rule,SL,Opts).
  316
  317default_qsub_exec(slurm,"sbatch").
  318default_qdel_exec(slurm,"scancel").
  319qsub_output_arg(slurm,F,S) :- format(string(S),"-o ~w",[F]).
  320qsub_error_arg(slurm,F,S) :- format(string(S),"-e ~w",[F]).
  321qsub_dep_arg_prefix(slurm,"--dependency=").
  322qsub_dep_prefix(slurm,"afterok:").
  323qsub_dep_separator(slurm,",").
  324qsub_dep_arg(slurm,DepJobs,Arg) :- qsub_make_dep_arg(slurm,DepJobs,Arg).
  325qsub_extra_args(slurm,"--parsable").
  326qsub_script_headers(slurm,_,_,[]).
  327qsub_job_id(slurm,T,N) :- qsub_numeric_job_id(slurm,T,N).
  328
  329% ----------------------------------------
  330% POOLQ
  331% ----------------------------------------
  332
  333:- dynamic poolq_scheduler/1.  334
  335default_poolq_threads(4).
  336
  337queue_engine(poolq).
  338init_queue(poolq,Opts) :-
  339	ensure_loaded(library(poolq/poolq)),
  340	(get_opt(poolq_threads,Size,Opts) ; default_poolq_threads(Size)),
  341	poolq_create(Scheduler,Size,[]),
  342	assert(poolq_scheduler(Scheduler)).
  343release_queue(poolq) :-
  344	poolq_scheduler(Scheduler),
  345	poolq_wait(Scheduler,_Status).
  346
  347run_execs_in_queue(poolq,Rule,SL,Opts) :-
  348	poolq_scheduler(Scheduler),
  349	rule_target(Rule,T,Opts),
  350        rule_dependencies(Rule,DL,Opts),
  351	include(not_always_make_or_queue,Opts,Opts2),
  352	poolq_submit_job(Scheduler,build(T,SL,[no_deps(true)|Opts2]),T,DL,[]).
  353
  354not_always_make_or_queue(always_make(true)) :- !, fail.
  355not_always_make_or_queue(queue(_)) :- !, fail.
  356not_always_make_or_queue(_)