1:- module(pha_sched, [nil_belief/1, cons_belief/3, explanation/2, prob/2]).

PHA scheduler

This module uses the the PHA continuation capturing meta-interpreter in pha_mi.pl to run PHA goals and a priority queue to manage a collection of threads, each exploring a different possible world associated with a list of valuations of random variables. Each time a thread tries to sample a random variable, its continuation is captured and a new set of threads corresponding to the different possible choices is created in a suspended state. next//1 then advances the state of the collection of threads by running all threads until a success is found or all threads fail.

A thread can either be a list of random variable valuations (assumptions) and a continuation, or a continuation to be appended to a lazy list of explanations for a previous goal. [cf. Monadic bind!] */

   18:- use_module(library(dcg_core)).   19:- use_module(library(dcg_macros)).   20:- use_module(library(typedef)).   21:- use_module(library/priorityq, [pq_empty/1, pq_insert/4, pq_remove/4]).   22:- use_module(pha_load,  [glist/3]).   23:- use_module(pha_mi,    [mi/3]).   24
   25:- type procq       == pair(prob,pq(thread)).
   26:- type explist     == llist(expentry).
   27:- type maybe(A)    ---> nothing ; just(A).
   28:- type explanation ---> ex(goal, assumptions).
   29:- type expentry    ---> refinement(weighted(explanation),maybe(prob)).
   30:- type thread      ---> tail(explist,cont)
   31                       ; thread(assumptions,cont).
   32
   33% --------------- lazy list of explanations --------------------
   34
   35nil_belief(Es) :- lazy:singleton(refinement(1:ex(true,[]),nothing), Es).
   36cons_belief(G, Es1, Es2) :- 
   37   glist(G, C, ans(G)), 
   38   proc_init(1, tail(Es1,C), PQ),
   39   unfold_search(PQ, Es2).
 unfold_search(+P:prob, +PQ:procq, -Es:explist) is det
Here, we lazily unfold a procq using next//1, with look-ahead to remaing queue probability.
   43unfold_search(Q1,E1) :- 
   44   next(X,Q1,Q2), !, Head=refinement(X,More), 
   45   (  proc_empty(Q2) -> More=nothing, lazy:singleton(Head, E1)
   46   ;  proc_total(Q2,QP), More=just(QP), lazy:cons(Head, pha_sched:unfold_search(Q2), E1)
   47   ).
   48unfold_search(_,E1) :- lazy:empty(E1).
 explanation(+Es:explist, -E:weighted(explanation)) is nondet
Unifies E with an explanation, finds other explanations on backtracking.
   52explanation(Es,E) :- lazy:member(refinement(E,_),Es).
 prob(+Es:llist(expentry), -Ps:llist(interval(prob))) is det
Compute a lazy stream of probability intervals from a lazy list of explanations.
   56prob(Es,Ps) :- prob1(Es,0,Ps). % freeze on Ps?
   57
   58prob1(Es,P0,Ps) :-
   59   lazy:decons(Es, refinement(P:_,Rem), Es2), !,
   60   lazy:cons(range(P1,PMax), pha_sched:prob1(Es2,P1), Ps), 
   61   P1 is P0 + P, (Rem=just(R) -> PMax is P1+R; PMax=P1).
   62prob1(Es,P0,Ps) :-
   63   lazy:empty(Es),
   64   lazy:repeat(range(P0,P0), Ps).
   65
   66% ------------------ process queue DCG ------------------------------
 next(-E:weighted(explanation))// is semidet
Gets the next explanation if there is one. Works in procq DCG.
   71next(X) --> proc_remove(P,Thread), run(Thread,P,X).
 run(+T:thread, +P:prob, -E:weighted(explanation))// is semidet
Runs a thread until it produces a request, or unfolds an explanation tail and inserts any new explanations as new threads. Works in procq DCG.
   77run(thread(AS,Cont),P,Y) --> 
   78   {mi(Cont,AS,Req)}, 
   79   respond(Req,AS,P,Y).
   80run(tail(E1,Cont),_,Y) --> 
   81   (  {lazy:head(E1, refinement(P:ex(_,AS),Rem))}
   82   -> proc_insert(P, thread(AS,Cont)),
   83      ({Rem=just(QP)} -> {lazy:tail(E1, ET)}, proc_insert(QP, tail(ET,Cont)); [])
   84   ),
   85   next(Y).
 respond(+A:request, +AS:assumptions, +P:prob, -R:weighted(explanation))// is det
Respond to request from mi/4, where AS is the list of assumptions made so far and P is the probability associated with these choices.
   90respond(sus(TS), AS, P, Y) --> !, seqmap(suspend(AS,P),TS), next(Y).
   91respond(ret(G),  AS, P, P:ex(G,AS)) --> [].
 suspend(+AS:assumptions, +P:prob, +T:susreq)// is det
Adds a suspension request for probabilistic choice to priority queue of threads.
   95suspend(AS,P1,susreq(A,C,P)) --> { P2 is P1*P }, proc_insert(P2, thread([A|AS],C)).
   96
   97proc_init(P,T,P-PQ1)          :- pq_empty(PQ0), pq_insert(P,T,PQ0,PQ1).
   98proc_insert(P,Th,T1-Q1,T2-Q2) :- pq_insert(P,Th,Q1,Q2), T2 is T1+P.
   99proc_remove(P,Th,T1-Q1,T2-Q2) :- pq_remove(P,Th,Q1,Q2), T2 is T1-P.
  100proc_empty(_-Q)               :- pq_empty(Q).
  101proc_total(T-_,T)