1:- module(rdf_notification,
    2          [
    3           rdf_subscribe/3,         % +Topic, +Callback, -SubscriberId
    4           rdf_subscribe/4,         % +Topic, +Callback, +Options, -SubscriberId
    5           rdf_unsubscribe/1        % +SubscriberId
    6          ]).

RDF Notification

RDF Notification.

author
- Hongxin Liang
license
- Apache License Version 2.0 */
   15:- use_module(library(option)).   16:- use_module(library(gensym)).   17:- use_module(library(semweb/rdf_db)).   18:- use_module(library(thread_pool)).   19
   20:- rdf_meta
   21    rdf_subscribe(r, +, -),
   22    rdf_subscribe(r, +, +, -).
   23
   24:- initialization
   25    thread_create(rdf_notification_main(_{}), _, [alias(rdf_notification_main_thread)]),
   26    current_prolog_flag(cpu_count, CPUs),
   27    Size is CPUs + 1,
   28    thread_pool_create(rdf_notification_caller_thread_pool, Size, []),
   29    rdf_monitor(rdf_notification:rdf_event_received, [assert,assert(load),retract,update]).
 rdf_subscribe(+Topic, +Callback, -SubscriberId) is semidet
 rdf_subscribe(+Topic, +Callback, +Options, -SubscriberId) is semidet
Subscribe to the Topic which could be subject, predicate or object. When assert, retract or update event happens on the topic, Callback will be invoked. Upon successful subscription, SubscriberId will be unified and can be later on used to unsubscribe. Callback should be prefixed with a module name.

It is highly NOT recommended to do any intensive or long time computation in Callback, however if that is required, a separated thread must be used.

Supported options are:

type(+Type)
Can be subject (default), predicate or object, indicating what to subscribe.
db(+DB)
Specify which database should be monitored. If not specified, all databases will be monitored. Database refers to the fourth argument of rdf_assert/4, and sometimes it's called graph.
monitor_descendant(+Boolean)
Indicate whether descendant of the Topic should also be monitored. The default value is false and it's only applicable when type is subject. For example if Topic is a and this option is true, and there is a triple =rdf(a, b, c)= already in database, =rdf_assert(c, d, e)= will trigger a callback to subscribers of a.

N.B. subscribing to the same Topic with the same Callback and Options will result a new subscribers without having any impact on the existing one. If this is not the wanted result, unsubscribe existing one first.

   61rdf_subscribe(Topic, Callback, SubscriberId) :-
   62    rdf_subscribe(Topic, Callback, [type(subject),monitor_descendant(false)], SubscriberId).
   63
   64rdf_subscribe(Topic, Callback, Options, SubscriberId) :-
   65    select_option(monitor_descendant(MonitorDescendant), Options, RestOptions1, false),
   66    select_option(type(Type), RestOptions1, RestOptions2, subject),
   67    gensym(sub_, SubscriberId),
   68    thread_send_message(rdf_notification_main_thread,
   69                        rdf_subscribe(Topic, SubscriberId, Callback,
   70                                      [type(Type),monitor_descendant(MonitorDescendant)|RestOptions2])
   71                       ).
 rdf_unsubscribe(+SubscriberId) is semidet
Unsubscribe the subscriber specifed by SubscriberId. Unsubscribing a non-existing subscriber does not have any impact.
   78rdf_unsubscribe(SubscriberId) :-
   79    thread_send_message(rdf_notification_main_thread,
   80                        rdf_unsubscribe(SubscriberId)).
   83rdf_event_received(Event) :-
   84    thread_send_message(rdf_notification_main_thread, Event).
   87rdf_notification_main(Subscribers) :-
   88    thread_get_message(Message),
   89    debug(rdf_notification, 'received message ~w', [Message]),
   90    handle_message(Subscribers, Message, NewSubscribers),
   91    rdf_notification_main(NewSubscribers).
   92
   93handle_message(SubscribersDict, rdf_subscribe(Topic, SubscriberId, Callback, Options),
   94               NewSubscribersDict) :- !,
   95    (   Old = SubscribersDict.get(Topic)
   96    ->  true
   97    ;   Old = []
   98    ),
   99    NewSubscribersDict = SubscribersDict.put(Topic,
  100                                             [subscriber(SubscriberId, Callback, Options)|Old]),
  101    debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]).
  102
  103handle_message(SubscribersDict, rdf_unsubscribe(SubscriberId), NewSubscribersDict) :-
  104    get_dict(Topic, SubscribersDict, Old),
  105    selectchk(subscriber(SubscriberId, _, _), Old, Rest), !,
  106    (   Rest \= []
  107    ->  NewSubscribersDict = SubscribersDict.put(Topic, Rest)
  108    ;   del_dict(Topic, SubscribersDict, _, NewSubscribersDict)
  109    ),
  110    debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]).
  111handle_message(_, rdf_unsubscribe(_), _) :- !.
  112
  113handle_message(SubscribersDict, Message, SubscribersDict) :-
  114    rdf_notify(SubscribersDict, Message).
  115
  116rdf_notify(SubscribersDict, Event) :-
  117    Event =.. [_,Subject,Predicate,Object,DB|_],
  118    rdf_notify_direct_subscribers(SubscribersDict, event_meta(subject, Subject, DB), Event),
  119    rdf_notify_indirect_subscribers(SubscribersDict, Subject, DB, Event),
  120    rdf_notify_direct_subscribers(SubscribersDict, event_meta(predicate, Predicate, DB), Event),
  121    rdf_notify_direct_subscribers(SubscribersDict, event_meta(object, Object, DB), Event).
  122
  123rdf_notify_direct_subscribers(SubscribersDict, EventMeta, Event) :-
  124    debug(rdf_notification, 'received event ~w with meta ~w', [Event,EventMeta]),
  125    rdf_notify_direct_subscribers0(SubscribersDict, EventMeta, Event, false).
  126
  127rdf_notify_direct_subscribers0(SubscribersDict, event_meta(Type, Topic, DB), Event, FromDescendant) :-
  128    (   Subscribers = SubscribersDict.get(Topic)
  129    ->  foreach(select_subscriber(subscriber(SubscriberId, Callback, _),
  130                                  Subscribers, Type, DB, FromDescendant),
  131                thread_create_in_pool(rdf_notification_caller_thread_pool,
  132                                      ignore(call(Callback, SubscriberId, Topic, Event)), _, [])
  133               )
  134    ;   true
  135    ).
  136
  137select_subscriber(Subscriber, Subscribers, Type, DB, FromDescendant) :-
  138    member(Subscriber, Subscribers),
  139    Subscriber = subscriber(_, _, Options),
  140    option(type(Type), Options),
  141    (   option(db(DB0), Options)
  142    ->  DB0 = DB
  143    ;   true
  144    ),
  145    (   FromDescendant
  146    ->  option(monitor_descendant(true), Options)
  147    ;   true
  148    ).
  149
  150rdf_notify_indirect_subscribers(SubscribersDict, Object, DB, Event) :-
  151    rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, [Object], _).
  152
  153rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, Seen0, Seen) :-
  154    findall(Subject,
  155            (
  156             rdf(Subject, _, Object), % there might be cross-db reference
  157             \+ memberchk(Subject, Seen0)
  158            ),
  159            Subjects),
  160    rdf_notify_indirect_subscribers1(SubscribersDict, Subjects, DB, Event, Seen0, Seen).
  161
  162rdf_notify_indirect_subscribers1(_, [], _, _, Seen, Seen) :- !.
  163rdf_notify_indirect_subscribers1(SubscribersDict, [H|T], DB, Event, Seen0, Seen) :-
  164    rdf_notify_direct_subscribers0(SubscribersDict, event_meta(subject, H, DB), Event, true),
  165    rdf_notify_indirect_subscribers0(SubscribersDict, H, DB, Event, [H|Seen0], Seen1),
  166    rdf_notify_indirect_subscribers1(SubscribersDict, T, DB, Event, Seen1, Seen)