View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db)).   48:- use_module(library(filesex)).   49:- use_module(library(lists)).   50:- use_module(library(uri)).   51:- use_module(library(debug)).   52:- use_module(library(option)).   53:- use_module(library(error)).   54:- use_module(library(thread)).   55:- use_module(library(apply)).   56
   57
   58/** <module> RDF persistency plugin
   59
   60This  module  provides  persistency   for    rdf_db.pl   based   on  the
   61rdf_monitor/2 predicate to  track  changes   to  the  repository.  Where
   62previous  versions  used  autosafe  of  the  whole  database  using  the
   63quick-load format of rdf_db, this version is  based on a quick-load file
   64per source (4th argument of rdf/4), and journalling for edit operations.
   65
   66The result is safe, avoids frequent small   changes to large files which
   67makes synchronisation and backup expensive and avoids long disruption of
   68the server doing the autosafe. Only loading large files disrupts service
   69for some time.
   70
   71The persistent backup of the database is  realised in a directory, using
   72a lock file to avoid corruption due to concurrent access. Each source is
   73represented by two files, the latest snapshot   and a journal. The state
   74is restored by loading  the  snapshot   and  replaying  the journal. The
   75predicate rdf_flush_journals/1 can be used to create fresh snapshots and
   76delete the journals.
   77
   78@tbd If there is a complete `.new'   snapshot  and no journal, we should
   79     move the .new to the plain snapshot name as a means of recovery.
   80
   81@tbd Backup of each graph using one or two files is very costly if there
   82     are many graphs.  Although the currently used subdirectories avoid
   83     hitting OS limits early, this is still not ideal. Probably we
   84     should collect (small, older?) files and combine them into a single
   85     quick load file.  We could call this (similar to GIT) a `pack'.
   86
   87@see    rdf_edit.pl
   88*/
   89
   90:- volatile
   91    rdf_directory/1,
   92    rdf_lock/2,
   93    rdf_option/1,
   94    source_journal_fd/2,
   95    file_base_db/2.   96:- dynamic
   97    rdf_directory/1,                % Absolute path
   98    rdf_lock/2,                     % Dir, Lock
   99    rdf_option/1,                   % Defined options
  100    source_journal_fd/2,            % DB, JournalFD
  101    file_base_db/2.                 % FileBase, DB
  102
  103:- meta_predicate
  104    no_agc(0).  105
  106:- predicate_options(rdf_attach_db/2, 2,
  107                     [ access(oneof([read_write,read_only])),
  108                       concurrency(positive_integer),
  109                       max_open_journals(positive_integer),
  110                       silent(oneof([true,false,brief])),
  111                       log_nested_transactions(boolean)
  112                     ]).  113
  114%!  rdf_attach_db(+Directory, +Options) is det.
  115%
  116%   Start persistent operations using Directory   as  place to store
  117%   files.   There are several cases:
  118%
  119%           * Empty DB, existing directory
  120%           Load the DB from the existing directory
  121%
  122%           * Full DB, empty directory
  123%           Create snapshots for all sources in directory
  124%
  125%   Options:
  126%
  127%           * access(+AccessMode)
  128%           One of =auto= (default), =read_write= or
  129%           =read_only=. Read-only access implies that the RDF
  130%           store is not locked. It is read at startup and all
  131%           modifications to the data are temporary. The default
  132%           =auto= mode is =read_write= if the directory is
  133%           writeable and the lock can be acquired.  Otherwise
  134%           it reverts to =read_only=.
  135%
  136%           * concurrency(+Jobs)
  137%           Number of threads to use for loading the initial
  138%           database.  If not provided it is the number of CPUs
  139%           as optained from the flag =cpu_count=.
  140%
  141%           * max_open_journals(+Count)
  142%           Maximum number of journals kept open.  If not provided,
  143%           the default is 10.  See limit_fd_pool/0.
  144%
  145%           * directory_levels(+Count)
  146%           Number of levels of intermediate directories for storing
  147%           the graph files.  Default is 2.
  148%
  149%           * silent(+BoolOrBrief)
  150%           If =true= (default =false=), do not print informational
  151%           messages.  Finally, if =brief= it will show minimal
  152%           feedback.
  153%
  154%           * log_nested_transactions(+Boolean)
  155%           If =true=, nested _log_ transactions are added to the
  156%           journal information.  By default (=false=), no log-term
  157%           is added for nested transactions.\\
  158%
  159%   @error existence_error(source_sink, Directory)
  160%   @error permission_error(write, directory, Directory)
  161
  162rdf_attach_db(DirSpec, Options) :-
  163    option(access(read_only), Options),
  164    !,
  165    absolute_file_name(DirSpec,
  166                       Directory,
  167                       [ access(read),
  168                         file_type(directory)
  169                       ]),
  170    rdf_attach_db_ro(Directory, Options).
  171rdf_attach_db(DirSpec, Options) :-
  172    option(access(read_write), Options),
  173    !,
  174    rdf_attach_db_rw(DirSpec, Options).
  175rdf_attach_db(DirSpec, Options) :-
  176    absolute_file_name(DirSpec,
  177                       Directory,
  178                       [ access(exist),
  179                         file_type(directory),
  180                         file_errors(fail)
  181                       ]),
  182    !,
  183    (   access_file(Directory, write)
  184    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  185        (   var(E)
  186        ->  true
  187        ;   E = error(permission_error(lock, rdf_db, _), _)
  188        ->  print_message(warning, E),
  189            print_message(warning, rdf(read_only)),
  190            rdf_attach_db(DirSpec, [access(read_only)|Options])
  191        ;   throw(E)
  192        )
  193    ;   print_message(warning,
  194                      error(permission_error(write, directory, Directory))),
  195        print_message(warning, rdf(read_only)),
  196        rdf_attach_db_ro(Directory, Options)
  197    ).
  198rdf_attach_db(DirSpec, Options) :-
  199    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  200    (   var(E)
  201    ->  true
  202    ;   print_message(warning, E),
  203        print_message(warning, rdf(read_only)),
  204        rdf_attach_db(DirSpec, [access(read_only)|Options])
  205    ).
  206
  207
  208rdf_attach_db_rw(DirSpec, Options) :-
  209    absolute_file_name(DirSpec,
  210                       Directory,
  211                       [ access(write),
  212                         file_type(directory),
  213                         file_errors(fail)
  214                       ]),
  215    !,
  216    (   rdf_directory(Directory)
  217    ->  true                        % update settings?
  218    ;   rdf_detach_db,
  219        mkdir(Directory),
  220        lock_db(Directory),
  221        assert(rdf_directory(Directory)),
  222        assert_options(Options),
  223        stop_monitor,               % make sure not to register load
  224        no_agc(load_db),
  225        at_halt(rdf_detach_db),
  226        start_monitor
  227    ).
  228rdf_attach_db_rw(DirSpec, Options) :-
  229    absolute_file_name(DirSpec,
  230                       Directory,
  231                       [ solutions(all)
  232                       ]),
  233    (   exists_directory(Directory)
  234    ->  access_file(Directory, write)
  235    ;   catch(make_directory(Directory), _, fail)
  236    ),
  237    !,
  238    rdf_attach_db(Directory, Options).
  239rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  240    absolute_file_name(DirSpec,     % permission error
  241                       Directory,
  242                       [ access(exist),
  243                         file_type(directory)
  244                       ]),
  245    permission_error(write, directory, Directory).
  246
  247%!  rdf_attach_db_ro(+Directory, +Options)
  248%
  249%   Open an RDF database in read-only mode.
  250
  251rdf_attach_db_ro(Directory, Options) :-
  252    rdf_detach_db,
  253    assert(rdf_directory(Directory)),
  254    assert_options(Options),
  255    stop_monitor,           % make sure not to register load
  256    no_agc(load_db).
  257
  258
  259assert_options([]).
  260assert_options([H|T]) :-
  261    (   option_type(H, Check)
  262    ->  Check,
  263        assert(rdf_option(H))
  264    ;   true                        % ignore options we do not understand
  265    ),
  266    assert_options(T).
  267
  268option_type(concurrency(X),             must_be(positive_integer, X)).
  269option_type(max_open_journals(X),       must_be(positive_integer, X)).
  270option_type(directory_levels(X),        must_be(positive_integer, X)).
  271option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  272option_type(log_nested_transactions(X), must_be(boolean, X)).
  273option_type(access(X),                  must_be(oneof([read_write,
  274                                                       read_only]), X)).
  275
  276
  277%!  rdf_persistency_property(?Property) is nondet.
  278%
  279%   True when Property is a property of the current persistent database.
  280%   Exposes  the  properties  that  can   be    passed   as  options  to
  281%   rdf_attach_db/2.                                       Specifically,
  282%   rdf_persistency_property(access(read_only)) is true iff the database
  283%   is mounted in read-only mode. In addition, the following property is
  284%   supported:
  285%
  286%     - directory(Dir)
  287%     The directory in which the database resides.
  288
  289rdf_persistency_property(Property) :-
  290    var(Property),
  291    !,
  292    rdf_persistency_property_(Property).
  293rdf_persistency_property(Property) :-
  294    rdf_persistency_property_(Property),
  295    !.
  296
  297rdf_persistency_property_(Property) :-
  298    rdf_option(Property).
  299rdf_persistency_property_(directory(Dir)) :-
  300    rdf_directory(Dir).
  301
  302%!  no_agc(:Goal)
  303%
  304%   Run Goal with atom garbage collection   disabled. Loading an RDF
  305%   database creates large amounts  of  atoms   we  *know*  are  not
  306%   garbage.
  307
  308no_agc(Goal) :-
  309    current_prolog_flag(agc_margin, Old),
  310    setup_call_cleanup(
  311        set_prolog_flag(agc_margin, 0),
  312        Goal,
  313        set_prolog_flag(agc_margin, Old)).
  314
  315
  316%!  rdf_detach_db is det.
  317%
  318%   Detach from the  current  database.   Succeeds  silently  if  no
  319%   database is attached. Normally called at  the end of the program
  320%   through at_halt/1.
  321
  322rdf_detach_db :-
  323    debug(halt, 'Detaching RDF database', []),
  324    stop_monitor,
  325    close_journals,
  326    (   retract(rdf_directory(Dir))
  327    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  328        save_prefixes(Dir),
  329        retractall(rdf_option(_)),
  330        retractall(source_journal_fd(_,_)),
  331        unlock_db(Dir)
  332    ;   true
  333    ).
  334
  335
  336%!  rdf_current_db(?Dir)
  337%
  338%   True if Dir is the current RDF persistent database.
  339
  340rdf_current_db(Directory) :-
  341    rdf_directory(Dir),
  342    !,
  343    Dir = Directory.
  344
  345
  346%!  rdf_flush_journals(+Options)
  347%
  348%   Flush dirty journals.  Options:
  349%
  350%           * min_size(+KB)
  351%           Only flush if journal is over KB in size.
  352%           * graph(+Graph)
  353%           Only flush the journal of Graph
  354%
  355%   @tbd Provide a default for min_size?
  356
  357rdf_flush_journals(Options) :-
  358    option(graph(Graph), Options, _),
  359    forall(rdf_graph(Graph),
  360           rdf_flush_journal(Graph, Options)).
  361
  362rdf_flush_journal(Graph, Options) :-
  363    db_files(Graph, _SnapshotFile, JournalFile),
  364    db_file(JournalFile, File),
  365    (   \+ exists_file(File)
  366    ->  true
  367    ;   memberchk(min_size(KB), Options),
  368        size_file(JournalFile, Size),
  369        Size / 1024 < KB
  370    ->  true
  371    ;   create_db(Graph)
  372    ).
  373
  374                 /*******************************
  375                 *             LOAD             *
  376                 *******************************/
  377
  378%!  load_db is det.
  379%
  380%   Reload database from the directory specified by rdf_directory/1.
  381%   First we find all names graphs using find_dbs/1 and then we load
  382%   them.
  383
  384load_db :-
  385    rdf_directory(Dir),
  386    concurrency(Jobs),
  387    cpu_stat_key(Jobs, StatKey),
  388    get_time(Wall0),
  389    statistics(StatKey, T0),
  390    load_prefixes(Dir),
  391    verbosity(Silent),
  392    find_dbs(Dir, Graphs, SnapShots, Journals),
  393    length(Graphs, GraphCount),
  394    maplist(rdf_unload_graph, Graphs),
  395    rdf_statistics(triples(Triples0)),
  396    load_sources(snapshots, SnapShots, Silent, Jobs),
  397    load_sources(journals, Journals, Silent, Jobs),
  398    rdf_statistics(triples(Triples1)),
  399    statistics(StatKey, T1),
  400    get_time(Wall1),
  401    T is T1 - T0,
  402    Wall is Wall1 - Wall0,
  403    Triples = Triples1 - Triples0,
  404    message_level(Silent, Level),
  405    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  406
  407load_sources(_, [], _, _) :- !.
  408load_sources(Type, Sources, Silent, Jobs) :-
  409    length(Sources, Count),
  410    RunJobs is min(Count, Jobs),
  411    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  412    make_goals(Sources, Silent, 1, Count, Goals),
  413    concurrent(RunJobs, Goals, []).
  414
  415
  416%!  make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  417
  418make_goals([], _, _, _, []).
  419make_goals([DB|T0], Silent, I,  Total,
  420           [load_source(DB, Silent, I, Total)|T]) :-
  421    I2 is I + 1,
  422    make_goals(T0, Silent, I2, Total, T).
  423
  424verbosity(Silent) :-
  425    rdf_option(silent(Silent)),
  426    !.
  427verbosity(Silent) :-
  428    current_prolog_flag(verbose, silent),
  429    !,
  430    Silent = true.
  431verbosity(brief).
  432
  433
  434%!  concurrency(-Jobs)
  435%
  436%   Number of jobs to run concurrently.
  437
  438concurrency(Jobs) :-
  439    rdf_option(concurrency(Jobs)),
  440    !.
  441concurrency(Jobs) :-
  442    current_prolog_flag(cpu_count, Jobs),
  443    Jobs > 0,
  444    !.
  445concurrency(1).
  446
  447cpu_stat_key(1, cputime) :- !.
  448cpu_stat_key(_, process_cputime).
  449
  450
  451%!  find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det.
  452%
  453%   Scan the persistent database and return a list of snapshots and
  454%   journals, both sorted by file-size.  Each term is of the form
  455%
  456%     ==
  457%     db(Size, Ext, DB, DBFile, Depth)
  458%     ==
  459
  460find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  461    directory_files(Dir, Files),
  462    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  463    maplist(db_graph, Scanned, UnsortedGraphs),
  464    sort(UnsortedGraphs, Graphs),
  465    (   consider_reindex_db(Dir, Graphs, Scanned)
  466    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  467    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  468        sort(Snapshots, SnapBySize),
  469        sort(Journals, JournalBySize)
  470    ).
  471
  472consider_reindex_db(Dir, Graphs, Scanned) :-
  473    length(Graphs, Count),
  474    Count > 0,
  475    DepthNeeded is floor(log(Count)/log(256)),
  476    (   maplist(depth_db(DepthNow), Scanned)
  477    ->  (   DepthNeeded > DepthNow
  478        ->  true
  479        ;   retractall(rdf_option(directory_levels(_))),
  480            assertz(rdf_option(directory_levels(DepthNow))),
  481            fail
  482        )
  483    ;   true
  484    ),
  485    reindex_db(Dir, DepthNeeded).
  486
  487db_is_snapshot(Term) :-
  488    arg(2, Term, trp).
  489
  490db_graph(Term, DB) :-
  491    arg(3, Term, DB).
  492
  493db_file_name(Term, File) :-
  494    arg(4, Term, File).
  495
  496depth_db(Depth, DB) :-
  497    arg(5, DB, Depth).
  498
  499%!  scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det.
  500%
  501%   Produces a list of db(DB,  Size,   File)  for all recognised RDF
  502%   database files.  File is relative to the database directory Dir.
  503
  504scan_db_files([], _, _, _) -->
  505    [].
  506scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  507    { nofollow(Nofollow) },
  508    !,
  509    scan_db_files(T, Dir, Prefix, Depth).
  510scan_db_files([File|T], Dir, Prefix, Depth) -->
  511    { file_name_extension(Base, Ext, File),
  512      db_extension(Ext),
  513      !,
  514      rdf_db_to_file(DB, Base),
  515      directory_file_path(Prefix, File, DBFile),
  516      directory_file_path(Dir, DBFile, AbsFile),
  517      size_file(AbsFile, Size)
  518    },
  519    [ db(Size, Ext, DB, AbsFile, Depth) ],
  520    scan_db_files(T, Dir, Prefix, Depth).
  521scan_db_files([D|T], Dir, Prefix, Depth) -->
  522    { directory_file_path(Prefix, D, SubD),
  523      directory_file_path(Dir, SubD, AbsD),
  524      exists_directory(AbsD),
  525      \+ read_link(AbsD, _, _),    % Do not follow links
  526      !,
  527      directory_files(AbsD, SubFiles),
  528      SubDepth is Depth + 1
  529    },
  530    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  531    scan_db_files(T, Dir, Prefix, Depth).
  532scan_db_files([_|T], Dir, Prefix, Depth) -->
  533    scan_db_files(T, Dir, Prefix, Depth).
  534
  535nofollow(.).
  536nofollow(..).
  537
  538db_extension(trp).
  539db_extension(jrn).
  540
  541:- public load_source/4.                % called through make_goals/5
  542
  543load_source(DB, Silent, Nth, Total) :-
  544    db_file_name(DB, File),
  545    db_graph(DB, Graph),
  546    message_level(Silent, Level),
  547    graph_triple_count(Graph, Count0),
  548    statistics(cputime, T0),
  549    (   db_is_snapshot(DB)
  550    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  551        rdf_load_db(File)
  552    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  553        load_journal(File, Graph)
  554    ),
  555    statistics(cputime, T1),
  556    T is T1 - T0,
  557    graph_triple_count(Graph, Count1),
  558    Count is Count1 - Count0,
  559    print_message(Level, rdf(restore(Silent,
  560                                     done(Graph, T, Count, Nth, Total)))).
  561
  562
  563graph_triple_count(Graph, Count) :-
  564    rdf_statistics(triples_by_graph(Graph, Count)),
  565    !.
  566graph_triple_count(_, 0).
  567
  568
  569%!  attach_graph(+Graph, +Options) is det.
  570%
  571%   Load triples and reload  journal   from  the  indicated snapshot
  572%   file.
  573
  574attach_graph(Graph, Options) :-
  575    (   option(silent(true), Options)
  576    ->  Level = silent
  577    ;   Level = informational
  578    ),
  579    db_files(Graph, SnapshotFile, JournalFile),
  580    rdf_retractall(_,_,_,Graph),
  581    statistics(cputime, T0),
  582    print_message(Level, rdf(restore(Silent, Graph))),
  583    db_file(SnapshotFile, AbsSnapShot),
  584    (   exists_file(AbsSnapShot)
  585    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  586        rdf_load_db(AbsSnapShot)
  587    ;   true
  588    ),
  589    (   exists_db(JournalFile)
  590    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  591        load_journal(JournalFile, Graph)
  592    ;   true
  593    ),
  594    statistics(cputime, T1),
  595    T is T1 - T0,
  596    (   rdf_statistics(triples_by_graph(Graph, Count))
  597    ->  true
  598    ;   Count = 0
  599    ),
  600    print_message(Level, rdf(restore(Silent,
  601                                     done(Graph, T, Count)))).
  602
  603message_level(true, silent) :- !.
  604message_level(_, informational).
  605
  606
  607                 /*******************************
  608                 *         LOAD JOURNAL         *
  609                 *******************************/
  610
  611%!  load_journal(+File:atom, +DB:atom) is det.
  612%
  613%   Process transactions from the RDF journal File, adding the given
  614%   named graph.
  615
  616load_journal(File, DB) :-
  617    rdf_create_graph(DB),
  618    setup_call_cleanup(
  619        open(File, read, In, [encoding(utf8)]),
  620        ( read(In, T0),
  621          process_journal(T0, In, DB)
  622        ),
  623        close(In)).
  624
  625process_journal(end_of_file, _, _) :- !.
  626process_journal(Term, In, DB) :-
  627    (   process_journal_term(Term, DB)
  628    ->  true
  629    ;   throw(error(type_error(journal_term, Term), _))
  630    ),
  631    read(In, T2),
  632    process_journal(T2, In, DB).
  633
  634process_journal_term(assert(S,P,O), DB) :-
  635    rdf_assert(S,P,O,DB).
  636process_journal_term(assert(S,P,O,Line), DB) :-
  637    rdf_assert(S,P,O,DB:Line).
  638process_journal_term(retract(S,P,O), DB) :-
  639    rdf_retractall(S,P,O,DB).
  640process_journal_term(retract(S,P,O,Line), DB) :-
  641    rdf_retractall(S,P,O,DB:Line).
  642process_journal_term(update(S,P,O,Action), DB) :-
  643    (   rdf_update(S,P,O,DB, Action)
  644    ->  true
  645    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  646    ).
  647process_journal_term(start(_), _).      % journal open/close
  648process_journal_term(end(_), _).
  649process_journal_term(begin(_), _).      % logged transaction (compatibility)
  650process_journal_term(end, _).
  651process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  652process_journal_term(end(_,_,_), _).
  653
  654
  655                 /*******************************
  656                 *         CREATE JOURNAL       *
  657                 *******************************/
  658
  659:- dynamic
  660    blocked_db/2,                   % DB, Reason
  661    transaction_message/3,          % Nesting, Time, Message
  662    transaction_db/3.               % Nesting, DB, Id
  663
  664%!  rdf_persistency(+DB, Bool)
  665%
  666%   Specify whether a database is persistent.  Switching to =false=
  667%   kills the persistent state.  Switching to =true= creates it.
  668
  669rdf_persistency(DB, Bool) :-
  670    must_be(atom, DB),
  671    must_be(boolean, Bool),
  672    fail.
  673rdf_persistency(DB, false) :-
  674    !,
  675    (   blocked_db(DB, persistency)
  676    ->  true
  677    ;   assert(blocked_db(DB, persistency)),
  678        delete_db(DB)
  679    ).
  680rdf_persistency(DB, true) :-
  681    (   retract(blocked_db(DB, persistency))
  682    ->  create_db(DB)
  683    ;   true
  684    ).
  685
  686%!  rdf_db:property_of_graph(?Property, +Graph) is nondet.
  687%
  688%   Extend rdf_graph_property/2 with new properties.
  689
  690:- multifile
  691    rdf_db:property_of_graph/2.  692
  693rdf_db:property_of_graph(persistent(State), Graph) :-
  694    (   blocked_db(Graph, persistency)
  695    ->  State = false
  696    ;   State = true
  697    ).
  698
  699
  700%!  start_monitor is det.
  701%!  stop_monitor is det.
  702%
  703%   Start/stop monitoring the RDF database   for  changes and update
  704%   the journal.
  705
  706start_monitor :-
  707    rdf_monitor(monitor,
  708                [ -assert(load)
  709                ]).
  710stop_monitor :-
  711    rdf_monitor(monitor,
  712                [ -all
  713                ]).
  714
  715%!  monitor(+Term) is semidet.
  716%
  717%   Handle an rdf_monitor/2 callback to  deal with persistency. Note
  718%   that the monitor calls that come   from rdf_db.pl that deal with
  719%   database changes are serialized.  They   do  come from different
  720%   threads though.
  721
  722monitor(Msg) :-
  723    debug(monitor, 'Monitor: ~p~n', [Msg]),
  724    fail.
  725monitor(assert(S,P,O,DB:Line)) :-
  726    !,
  727    \+ blocked_db(DB, _),
  728    journal_fd(DB, Fd),
  729    open_transaction(DB, Fd),
  730    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  731    sync_journal(DB, Fd).
  732monitor(assert(S,P,O,DB)) :-
  733    \+ blocked_db(DB, _),
  734    journal_fd(DB, Fd),
  735    open_transaction(DB, Fd),
  736    format(Fd, '~q.~n', [assert(S,P,O)]),
  737    sync_journal(DB, Fd).
  738monitor(retract(S,P,O,DB:Line)) :-
  739    !,
  740    \+ blocked_db(DB, _),
  741    journal_fd(DB, Fd),
  742    open_transaction(DB, Fd),
  743    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  744    sync_journal(DB, Fd).
  745monitor(retract(S,P,O,DB)) :-
  746    \+ blocked_db(DB, _),
  747    journal_fd(DB, Fd),
  748    open_transaction(DB, Fd),
  749    format(Fd, '~q.~n', [retract(S,P,O)]),
  750    sync_journal(DB, Fd).
  751monitor(update(S,P,O,DB:Line,Action)) :-
  752    !,
  753    \+ blocked_db(DB, _),
  754    (   Action = graph(NewDB)
  755    ->  monitor(assert(S,P,O,NewDB)),
  756        monitor(retract(S,P,O,DB:Line))
  757    ;   journal_fd(DB, Fd),
  758        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  759        sync_journal(DB, Fd)
  760    ).
  761monitor(update(S,P,O,DB,Action)) :-
  762    \+ blocked_db(DB, _),
  763    (   Action = graph(NewDB)
  764    ->  monitor(assert(S,P,O,NewDB)),
  765        monitor(retract(S,P,O,DB))
  766    ;   journal_fd(DB, Fd),
  767        open_transaction(DB, Fd),
  768        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  769        sync_journal(DB, Fd)
  770    ).
  771monitor(load(BE, _DumpFileURI)) :-
  772    (   BE = end(Graphs)
  773    ->  sync_loaded_graphs(Graphs)
  774    ;   true
  775    ).
  776monitor(create_graph(Graph)) :-
  777    \+ blocked_db(Graph, _),
  778    journal_fd(Graph, Fd),
  779    open_transaction(Graph, Fd),
  780    sync_journal(Graph, Fd).
  781monitor(reset) :-
  782    forall(rdf_graph(Graph), delete_db(Graph)).
  783                                        % TBD: Remove empty directories?
  784
  785monitor(transaction(BE, Id)) :-
  786    monitor_transaction(Id, BE).
  787
  788monitor_transaction(load_journal(DB), begin(_)) :-
  789    !,
  790    assert(blocked_db(DB, journal)).
  791monitor_transaction(load_journal(DB), end(_)) :-
  792    !,
  793    retractall(blocked_db(DB, journal)).
  794
  795monitor_transaction(parse(URI), begin(_)) :-
  796    !,
  797    (   blocked_db(URI, persistency)
  798    ->  true
  799    ;   assert(blocked_db(URI, parse))
  800    ).
  801monitor_transaction(parse(URI), end(_)) :-
  802    !,
  803    (   retract(blocked_db(URI, parse))
  804    ->  create_db(URI)
  805    ;   true
  806    ).
  807monitor_transaction(unload(DB), begin(_)) :-
  808    !,
  809    (   blocked_db(DB, persistency)
  810    ->  true
  811    ;   assert(blocked_db(DB, unload))
  812    ).
  813monitor_transaction(unload(DB), end(_)) :-
  814    !,
  815    (   retract(blocked_db(DB, unload))
  816    ->  delete_db(DB)
  817    ;   true
  818    ).
  819monitor_transaction(log(Msg), begin(N)) :-
  820    !,
  821    check_nested(N),
  822    get_time(Time),
  823    asserta(transaction_message(N, Time, Msg)).
  824monitor_transaction(log(_), end(N)) :-
  825    check_nested(N),
  826    retract(transaction_message(N, _, _)),
  827    !,
  828    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  829    end_transactions(DBs, N).
  830monitor_transaction(log(Msg, DB), begin(N)) :-
  831    !,
  832    check_nested(N),
  833    get_time(Time),
  834    asserta(transaction_message(N, Time, Msg)),
  835    journal_fd(DB, Fd),
  836    open_transaction(DB, Fd).
  837monitor_transaction(log(Msg, _DB), end(N)) :-
  838    monitor_transaction(log(Msg), end(N)).
  839
  840
  841%!  check_nested(+Level) is semidet.
  842%
  843%   True if we must log this transaction.   This  is always the case
  844%   for toplevel transactions. Nested transactions   are only logged
  845%   if log_nested_transactions(true) is defined.
  846
  847check_nested(0) :- !.
  848check_nested(_) :-
  849    rdf_option(log_nested_transactions(true)).
  850
  851
  852%!  open_transaction(+DB, +Fd) is det.
  853%
  854%   Add a begin(Id, Level, Time,  Message)   term  if  a transaction
  855%   involves DB. Id is an incremental   integer, where each database
  856%   has its own counter. Level is the nesting level, Time a floating
  857%   point timestamp and Message te message   provided as argument to
  858%   the log message.
  859
  860open_transaction(DB, Fd) :-
  861    transaction_message(N, Time, Msg),
  862    !,
  863    (   transaction_db(N, DB, _)
  864    ->  true
  865    ;   next_transaction_id(DB, Id),
  866        assert(transaction_db(N, DB, Id)),
  867        RoundedTime is round(Time*100)/100,
  868        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  869    ).
  870open_transaction(_,_).
  871
  872
  873%!  next_transaction_id(+DB, -Id) is det.
  874%
  875%   Id is the number to user for  the next logged transaction on DB.
  876%   Transactions in each  named  graph   are  numbered  in sequence.
  877%   Searching the Id of the last transaction is performed by the 2nd
  878%   clause starting 1Kb from the end   and doubling this offset each
  879%   failure.
  880
  881:- dynamic
  882    current_transaction_id/2.  883
  884next_transaction_id(DB, Id) :-
  885    retract(current_transaction_id(DB, Last)),
  886    !,
  887    Id is Last + 1,
  888    assert(current_transaction_id(DB, Id)).
  889next_transaction_id(DB, Id) :-
  890    db_files(DB, _, Journal),
  891    exists_file(Journal),
  892    !,
  893    size_file(Journal, Size),
  894    open_db(Journal, read, In, []),
  895    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  896    Id is Last + 1,
  897    assert(current_transaction_id(DB, Id)).
  898next_transaction_id(DB, 1) :-
  899    assert(current_transaction_id(DB, 1)).
  900
  901iterative_expand(_, 0, 0) :- !.
  902iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  903    Max is floor(log(Size)/log(2)),
  904    between(10, Max, Step),
  905    Offset is -(1<<Step),
  906    seek(In, Offset, eof, _),
  907    skip(In, 10),                   % records are line-based
  908    read(In, T0),
  909    last_transaction_id(T0, In, 0, Last),
  910    Last > 0,
  911    !.
  912iterative_expand(In, _, Last) :-        % Scan the whole file
  913    seek(In, 0, bof, _),
  914    read(In, T0),
  915    last_transaction_id(T0, In, 0, Last).
  916
  917last_transaction_id(end_of_file, _, Last, Last) :- !.
  918last_transaction_id(end(Id, _, _), In, _, Last) :-
  919    read(In, T1),
  920    last_transaction_id(T1, In, Id, Last).
  921last_transaction_id(_, In, Id, Last) :-
  922    read(In, T1),
  923    last_transaction_id(T1, In, Id, Last).
  924
  925
  926%!  end_transactions(+DBs:list(atom:id)) is det.
  927%
  928%   End a transaction that affected the  given list of databases. We
  929%   write the list of other affected databases as an argument to the
  930%   end-term to facilitate fast finding of the related transactions.
  931%
  932%   In each database, the transaction is   ended with a term end(Id,
  933%   Nesting, Others), where  Id  and   Nesting  are  the transaction
  934%   identifier and nesting (see open_transaction/2)  and Others is a
  935%   list of DB:Id,  indicating  other   databases  affected  by  the
  936%   transaction.
  937
  938end_transactions(DBs, N) :-
  939    end_transactions(DBs, DBs, N).
  940
  941end_transactions([], _, _).
  942end_transactions([DB:Id|T], DBs, N) :-
  943    journal_fd(DB, Fd),
  944    once(select(DB:Id, DBs, Others)),
  945    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  946    sync_journal(DB, Fd),
  947    end_transactions(T, DBs, N).
  948
  949
  950%!  sync_loaded_graphs(+Graphs)
  951%
  952%   Called after a binary triple has been loaded that added triples
  953%   to the given graphs.
  954
  955sync_loaded_graphs(Graphs) :-
  956    maplist(create_db, Graphs).
  957
  958
  959                 /*******************************
  960                 *         JOURNAL FILES        *
  961                 *******************************/
  962
  963%!  journal_fd(+DB, -Stream) is det.
  964%
  965%   Get an open stream to a journal. If the journal is not open, old
  966%   journals are closed to satisfy   the =max_open_journals= option.
  967%   Then the journal is opened in   =append= mode. Journal files are
  968%   always encoded as UTF-8 for  portability   as  well as to ensure
  969%   full coverage of Unicode.
  970
  971journal_fd(DB, Fd) :-
  972    source_journal_fd(DB, Fd),
  973    !.
  974journal_fd(DB, Fd) :-
  975    with_mutex(rdf_journal_file,
  976               journal_fd_(DB, Out)),
  977    Fd = Out.
  978
  979journal_fd_(DB, Fd) :-
  980    source_journal_fd(DB, Fd),
  981    !.
  982journal_fd_(DB, Fd) :-
  983    limit_fd_pool,
  984    db_files(DB, _Snapshot, Journal),
  985    open_db(Journal, append, Fd,
  986            [ close_on_abort(false)
  987            ]),
  988    time_stamp(Now),
  989    format(Fd, '~q.~n', [start([time(Now)])]),
  990    assert(source_journal_fd(DB, Fd)).              % new one at the end
  991
  992%!  limit_fd_pool is det.
  993%
  994%   Limit the number of  open   journals  to max_open_journals (10).
  995%   Note that calls  from  rdf_monitor/2   are  issued  in different
  996%   threads, but as they are part of write operations they are fully
  997%   synchronised.
  998
  999limit_fd_pool :-
 1000    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1001    !,
 1002    (   rdf_option(max_open_journals(Max))
 1003    ->  true
 1004    ;   Max = 10
 1005    ),
 1006    Close is N - Max,
 1007    forall(between(1, Close, _),
 1008           close_oldest_journal).
 1009limit_fd_pool.
 1010
 1011close_oldest_journal :-
 1012    source_journal_fd(DB, _Fd),
 1013    !,
 1014    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1015    close_journal(DB).
 1016close_oldest_journal.
 1017
 1018
 1019%!  sync_journal(+DB, +Fd)
 1020%
 1021%   Sync journal represented by database and   stream.  If the DB is
 1022%   involved in a transaction there is   no point flushing until the
 1023%   end of the transaction.
 1024
 1025sync_journal(DB, _) :-
 1026    transaction_db(_, DB, _),
 1027    !.
 1028sync_journal(_, Fd) :-
 1029    flush_output(Fd).
 1030
 1031%!  close_journal(+DB) is det.
 1032%
 1033%   Close the journal associated with DB if it is open.
 1034
 1035close_journal(DB) :-
 1036    with_mutex(rdf_journal_file,
 1037               close_journal_(DB)).
 1038
 1039close_journal_(DB) :-
 1040    (   retract(source_journal_fd(DB, Fd))
 1041    ->  time_stamp(Now),
 1042        format(Fd, '~q.~n', [end([time(Now)])]),
 1043        close(Fd, [force(true)])
 1044    ;   true
 1045    ).
 1046
 1047%!  close_journals
 1048%
 1049%   Close all open journals.
 1050
 1051close_journals :-
 1052    forall(source_journal_fd(DB, _),
 1053           catch(close_journal(DB), E,
 1054                 print_message(error, E))).
 1055
 1056%!  create_db(+Graph)
 1057%
 1058%   Create a saved version of Graph in corresponding file, close and
 1059%   delete journals.
 1060
 1061create_db(Graph) :-
 1062    \+ rdf(_,_,_,Graph),
 1063    !,
 1064    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1065    delete_db(Graph).
 1066create_db(Graph) :-
 1067    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1068    close_journal(Graph),
 1069    db_abs_files(Graph, Snapshot, Journal),
 1070    atom_concat(Snapshot, '.new', NewSnapshot),
 1071    (   catch(( create_directory_levels(Snapshot),
 1072                rdf_save_db(NewSnapshot, Graph)
 1073              ), Error,
 1074              ( print_message(warning, Error),
 1075                fail
 1076              ))
 1077    ->  (   exists_file(Journal)
 1078        ->  delete_file(Journal)
 1079        ;   true
 1080        ),
 1081        rename_file(NewSnapshot, Snapshot),
 1082        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1083    ;   catch(delete_file(NewSnapshot), _, true)
 1084    ).
 1085
 1086
 1087%!  delete_db(+DB)
 1088%
 1089%   Remove snapshot and journal file for DB.
 1090
 1091delete_db(DB) :-
 1092    with_mutex(rdf_journal_file,
 1093               delete_db_(DB)).
 1094
 1095delete_db_(DB) :-
 1096    close_journal_(DB),
 1097    db_abs_files(DB, Snapshot, Journal),
 1098    !,
 1099    (   exists_file(Journal)
 1100    ->  delete_file(Journal)
 1101    ;   true
 1102    ),
 1103    (   exists_file(Snapshot)
 1104    ->  delete_file(Snapshot)
 1105    ;   true
 1106    ).
 1107delete_db_(_).
 1108
 1109                 /*******************************
 1110                 *             LOCKING          *
 1111                 *******************************/
 1112
 1113%!  lock_db(+Dir)
 1114%
 1115%   Lock the database directory Dir.
 1116
 1117lock_db(Dir) :-
 1118    lockfile(Dir, File),
 1119    catch(open(File, update, Out, [lock(write), wait(false)]),
 1120          error(permission_error(Access, _, _), _),
 1121          locked_error(Access, Dir)),
 1122    (   current_prolog_flag(pid, PID)
 1123    ->  true
 1124    ;   PID = 0                     % TBD: Fix in Prolog
 1125    ),
 1126    time_stamp(Now),
 1127    gethostname(Host),
 1128    format(Out, '/* RDF Database is in use */~n~n', []),
 1129    format(Out, '~q.~n', [ locked([ time(Now),
 1130                                    pid(PID),
 1131                                    host(Host)
 1132                                  ])
 1133                         ]),
 1134    flush_output(Out),
 1135    set_end_of_stream(Out),
 1136    assert(rdf_lock(Dir, lock(Out, File))),
 1137    at_halt(unlock_db(Dir)).
 1138
 1139locked_error(lock, Dir) :-
 1140    lockfile(Dir, File),
 1141    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1142        Terms = [locked(Args)]
 1143    ->  Context = rdf_locked(Args)
 1144    ;   Context = context(_, 'Database is in use')
 1145    ),
 1146    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1147locked_error(open, Dir) :-
 1148    throw(error(permission_error(lock, rdf_db, Dir),
 1149                context(_, 'Lock file cannot be opened'))).
 1150
 1151%!  unlock_db(+Dir) is det.
 1152%!  unlock_db(+Stream, +File) is det.
 1153
 1154unlock_db(Dir) :-
 1155    retract(rdf_lock(Dir, lock(Out, File))),
 1156    !,
 1157    unlock_db(Out, File).
 1158unlock_db(_).
 1159
 1160unlock_db(Out, File) :-
 1161    close(Out),
 1162    delete_file(File).
 1163
 1164                 /*******************************
 1165                 *           FILENAMES          *
 1166                 *******************************/
 1167
 1168lockfile(Dir, LockFile) :-
 1169    atomic_list_concat([Dir, /, lock], LockFile).
 1170
 1171directory_levels(Levels) :-
 1172    rdf_option(directory_levels(Levels)),
 1173    !.
 1174directory_levels(2).
 1175
 1176db_file(Base, File) :-
 1177    rdf_directory(Dir),
 1178    directory_levels(Levels),
 1179    db_file(Dir, Base, Levels, File).
 1180
 1181db_file(Dir, Base, Levels, File) :-
 1182    dir_levels(Base, Levels, Segments, [Base]),
 1183    atomic_list_concat([Dir|Segments], /, File).
 1184
 1185open_db(Base, Mode, Stream, Options) :-
 1186    db_file(Base, File),
 1187    create_directory_levels(File),
 1188    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1189
 1190create_directory_levels(_File) :-
 1191    rdf_option(directory_levels(0)),
 1192    !.
 1193create_directory_levels(File) :-
 1194    file_directory_name(File, Dir),
 1195    make_directory_path(Dir).
 1196
 1197exists_db(Base) :-
 1198    db_file(Base, File),
 1199    exists_file(File).
 1200
 1201%!  dir_levels(+File, +Levels, ?Segments, ?Tail) is det.
 1202%
 1203%   Create a list of intermediate directory names for File.  Each
 1204%   directory consists of two hexadecimal digits.
 1205
 1206dir_levels(_, 0, Segments, Segments) :- !.
 1207dir_levels(File, Levels, Segments, Tail) :-
 1208    rdf_atom_md5(File, 1, Hash),
 1209    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1210
 1211create_dir_levels(0, _, _, Segments, Segments) :- !.
 1212create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1213    sub_atom(Hash, S, 2, _, S1),
 1214    S2 is S+2,
 1215    N2 is N-1,
 1216    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 1217
 1218
 1219%!  db_files(+DB, -Snapshot, -Journal).
 1220%!  db_files(-DB, +Snapshot, -Journal).
 1221%!  db_files(-DB, -Snapshot, +Journal).
 1222%
 1223%   True if named graph DB is represented  by the files Snapshot and
 1224%   Journal. The filenames are local   to the directory representing
 1225%   the store.
 1226
 1227db_files(DB, Snapshot, Journal) :-
 1228    nonvar(DB),
 1229    !,
 1230    rdf_db_to_file(DB, Base),
 1231    atom_concat(Base, '.trp', Snapshot),
 1232    atom_concat(Base, '.jrn', Journal).
 1233db_files(DB, Snapshot, Journal) :-
 1234    nonvar(Snapshot),
 1235    !,
 1236    atom_concat(Base, '.trp', Snapshot),
 1237    atom_concat(Base, '.jrn', Journal),
 1238    rdf_db_to_file(DB, Base).
 1239db_files(DB, Snapshot, Journal) :-
 1240    nonvar(Journal),
 1241    !,
 1242    atom_concat(Base, '.jrn', Journal),
 1243    atom_concat(Base, '.trp', Snapshot),
 1244    rdf_db_to_file(DB, Base).
 1245
 1246db_abs_files(DB, Snapshot, Journal) :-
 1247    db_files(DB, Snapshot0, Journal0),
 1248    db_file(Snapshot0, Snapshot),
 1249    db_file(Journal0, Journal).
 1250
 1251
 1252%!  rdf_journal_file(+Graph, -File) is semidet.
 1253%!  rdf_journal_file(-Graph, -File) is nondet.
 1254%
 1255%   True if File the name of the existing journal file for Graph.
 1256
 1257rdf_journal_file(Graph, Journal) :-
 1258    (   var(Graph)
 1259    ->  rdf_graph(Graph)
 1260    ;   true
 1261    ),
 1262    db_abs_files(Graph, _Snapshot, Journal),
 1263    exists_file(Journal).
 1264
 1265
 1266%!  rdf_snapshot_file(+Graph, -File) is semidet.
 1267%!  rdf_snapshot_file(-Graph, -File) is nondet.
 1268%
 1269%   True if File the name of the existing snapshot file for Graph.
 1270
 1271rdf_snapshot_file(Graph, Snapshot) :-
 1272    (   var(Graph)
 1273    ->  rdf_graph(Graph)    % also pick the empty graphs
 1274    ;   true
 1275    ),
 1276    db_abs_files(Graph, Snapshot, _Journal),
 1277    exists_file(Snapshot).
 1278
 1279
 1280%!  rdf_db_to_file(+DB, -File) is det.
 1281%!  rdf_db_to_file(-DB, +File) is det.
 1282%
 1283%   Translate between database encoding (often an   file or URL) and
 1284%   the name we store in the  directory.   We  keep  a cache for two
 1285%   reasons. Speed, but much more important   is that the mapping of
 1286%   raw --> encoded provided by  www_form_encode/2 is not guaranteed
 1287%   to be unique by the W3C standards.
 1288
 1289rdf_db_to_file(DB, File) :-
 1290    file_base_db(File, DB),
 1291    !.
 1292rdf_db_to_file(DB, File) :-
 1293    url_to_filename(DB, File),
 1294    assert(file_base_db(File, DB)).
 1295
 1296%!  url_to_filename(+URL, -FileName) is det.
 1297%!  url_to_filename(-URL, +FileName) is det.
 1298%
 1299%   Turn  a  valid  URL  into  a  filename.  Earlier  versions  used
 1300%   www_form_encode/2, but this can produce  characters that are not
 1301%   valid  in  filenames.  We  will  use    the   same  encoding  as
 1302%   www_form_encode/2,  but  using  our  own    rules   for  allowed
 1303%   characters. The only requirement is that   we avoid any filename
 1304%   special character in use.  The   current  encoding  use US-ASCII
 1305%   alnum characters, _ and %
 1306
 1307url_to_filename(URL, FileName) :-
 1308    atomic(URL),
 1309    !,
 1310    atom_codes(URL, Codes),
 1311    phrase(url_encode(EncCodes), Codes),
 1312    atom_codes(FileName, EncCodes).
 1313url_to_filename(URL, FileName) :-
 1314    uri_encoded(path, URL, FileName).
 1315
 1316url_encode([0'+|T]) -->
 1317    " ",
 1318    !,
 1319    url_encode(T).
 1320url_encode([C|T]) -->
 1321    alphanum(C),
 1322    !,
 1323    url_encode(T).
 1324url_encode([C|T]) -->
 1325    no_enc_extra(C),
 1326    !,
 1327    url_encode(T).
 1328url_encode(Enc) -->
 1329    (   "\r\n"
 1330    ;   "\n"
 1331    ),
 1332    !,
 1333    { string_codes("%0D%0A", Codes),
 1334      append(Codes, T, Enc)
 1335    },
 1336    url_encode(T).
 1337url_encode([]) -->
 1338    eos,
 1339    !.
 1340url_encode([0'%,D1,D2|T]) -->
 1341    [C],
 1342    { Dv1 is (C>>4 /\ 0xf),
 1343      Dv2 is (C /\ 0xf),
 1344      code_type(D1, xdigit(Dv1)),
 1345      code_type(D2, xdigit(Dv2))
 1346    },
 1347    url_encode(T).
 1348
 1349eos([], []).
 1350
 1351alphanum(C) -->
 1352    [C],
 1353    { C < 128,                      % US-ASCII
 1354      code_type(C, alnum)
 1355    }.
 1356
 1357no_enc_extra(0'_) --> "_".
 1358
 1359
 1360                 /*******************************
 1361                 *             REINDEX          *
 1362                 *******************************/
 1363
 1364%!  reindex_db(+Dir, +Levels)
 1365%
 1366%   Reindex the database by creating intermediate directories.
 1367
 1368reindex_db(Dir, Levels) :-
 1369    directory_files(Dir, Files),
 1370    reindex_files(Files, Dir, '.', 0, Levels),
 1371    remove_empty_directories(Files, Dir).
 1372
 1373reindex_files([], _, _, _, _).
 1374reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1375    nofollow(Nofollow),
 1376    !,
 1377    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1378reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1379    CLevel \== Levels,
 1380    file_name_extension(_Base, Ext, File),
 1381    db_extension(Ext),
 1382    !,
 1383    directory_file_path(Prefix, File, DBFile),
 1384    directory_file_path(Dir, DBFile, OldPath),
 1385    db_file(Dir, File, Levels, NewPath),
 1386    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1387    file_directory_name(NewPath, NewDir),
 1388    make_directory_path(NewDir),
 1389    rename_file(OldPath, NewPath),
 1390    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1391reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1392    directory_file_path(Prefix, D, SubD),
 1393    directory_file_path(Dir, SubD, AbsD),
 1394    exists_directory(AbsD),
 1395    \+ read_link(AbsD, _, _),      % Do not follow links
 1396    !,
 1397    directory_files(AbsD, SubFiles),
 1398    CLevel2 is CLevel + 1,
 1399    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1400    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1401reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1402    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1403
 1404
 1405remove_empty_directories([], _).
 1406remove_empty_directories([File|Files], Dir) :-
 1407    \+ nofollow(File),
 1408    directory_file_path(Dir, File, Path),
 1409    exists_directory(Path),
 1410    \+ read_link(Path, _, _),
 1411    !,
 1412    directory_files(Path, Content),
 1413    exclude(nofollow, Content, RealContent),
 1414    (   RealContent == []
 1415    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1416        delete_directory(Path)
 1417    ;   remove_empty_directories(RealContent, Path)
 1418    ),
 1419    remove_empty_directories(Files, Dir).
 1420remove_empty_directories([_|Files], Dir) :-
 1421    remove_empty_directories(Files, Dir).
 1422
 1423
 1424                 /*******************************
 1425                 *            PREFIXES          *
 1426                 *******************************/
 1427
 1428save_prefixes(Dir) :-
 1429    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1430    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1431                       write_prefixes(Out),
 1432                       close(Out)).
 1433
 1434write_prefixes(Out) :-
 1435    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1436    forall(rdf_current_ns(Alias, URI),
 1437           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 1438
 1439%!  load_prefixes(+RDFDBDir) is det.
 1440%
 1441%   If the file RDFDBDir/prefixes.db exists,  load the prefixes. The
 1442%   prefixes are registered using rdf_register_ns/3. Possible errors
 1443%   because the prefix  definitions  have   changed  are  printed as
 1444%   warnings, retaining the  old  definition.   Note  that  changing
 1445%   prefixes generally requires reloading all RDF from the source.
 1446
 1447load_prefixes(Dir) :-
 1448    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1449    (   exists_file(PrefixFile)
 1450    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1451                           read_prefixes(In),
 1452                           close(In))
 1453    ;   true
 1454    ).
 1455
 1456read_prefixes(Stream) :-
 1457    read_term(Stream, T0, []),
 1458    read_prefixes(T0, Stream).
 1459
 1460read_prefixes(end_of_file, _) :- !.
 1461read_prefixes(prefix(Alias, URI), Stream) :-
 1462    !,
 1463    must_be(atom, Alias),
 1464    must_be(atom, URI),
 1465    catch(rdf_register_ns(Alias, URI, []), E,
 1466          print_message(warning, E)),
 1467    read_term(Stream, T, []),
 1468    read_prefixes(T, Stream).
 1469read_prefixes(Term, _) :-
 1470    domain_error(prefix_term, Term).
 1471
 1472
 1473                 /*******************************
 1474                 *              UTIL            *
 1475                 *******************************/
 1476
 1477%!  mkdir(+Directory)
 1478%
 1479%   Create a directory if it does not already exist.
 1480
 1481mkdir(Directory) :-
 1482    exists_directory(Directory),
 1483    !.
 1484mkdir(Directory) :-
 1485    make_directory(Directory).
 1486
 1487%!  time_stamp(-Integer)
 1488%
 1489%   Return time-stamp rounded to integer.
 1490
 1491time_stamp(Int) :-
 1492    get_time(Now),
 1493    Int is round(Now).
 1494
 1495
 1496                 /*******************************
 1497                 *            MESSAGES          *
 1498                 *******************************/
 1499
 1500:- multifile
 1501    prolog:message/3,
 1502    prolog:message_context/3. 1503
 1504prolog:message(rdf(Term)) -->
 1505    message(Term).
 1506
 1507message(restoring(Type, Count, Jobs)) -->
 1508    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1509message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1510    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1511    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1512      [Graphs, Triples, Wall, Percent, Time] ].
 1513% attach_graph/2
 1514message(restore(true, Action)) -->
 1515    !,
 1516    silent_message(Action).
 1517message(restore(brief, Action)) -->
 1518    !,
 1519    brief_message(Action).
 1520message(restore(_, Graph)) -->
 1521    [ 'Restoring ~p ... '-[Graph], flush ].
 1522message(restore(_, snapshot(_))) -->
 1523    [ at_same_line, '(snapshot) '-[], flush ].
 1524message(restore(_, journal(_))) -->
 1525    [ at_same_line, '(journal) '-[], flush ].
 1526message(restore(_, done(_, Time, Count))) -->
 1527    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1528% load_source/4
 1529message(restore(_, snapshot(G, _))) -->
 1530    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1531message(restore(_, journal(G, _))) -->
 1532    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1533message(restore(_, done(_, Time, Count))) -->
 1534    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1535% journal handling
 1536message(update_failed(S,P,O,Action)) -->
 1537    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1538% directory reindexing
 1539message(reindex(Count, Depth)) -->
 1540    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1541message(reindex(Depth)) -->
 1542    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1543message(read_only) -->
 1544    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1545      'All changes to the RDF store will be lost if this process terminates.'
 1546    ].
 1547
 1548silent_message(_Action) --> [].
 1549
 1550brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1551    { file_base_name(Graph, Base) },
 1552    [ at_same_line,
 1553      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1554      flush
 1555    ].
 1556brief_message(_) --> [].
 1557
 1558
 1559prolog:message_context(rdf_locked(Args)) -->
 1560    { memberchk(time(Time), Args),
 1561      memberchk(pid(Pid), Args),
 1562      format_time(string(S), '%+', Time)
 1563    },
 1564    [ nl,
 1565      'locked at ~s by process id ~w'-[S,Pid]
 1566    ]