diff options
author | Magnus Ahltorp <map@kth.se> | 2015-10-01 12:39:28 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2015-11-11 13:32:37 +0100 |
commit | 0a3e6aafee314eaf9e5343c4cad89a9e2ae1d913 (patch) | |
tree | 2ceb97ebf656a26ac384e0e550dc2070d1b7ec72 /src | |
parent | 55820add0bda7ac926f11ee49b232dc11d6fe39c (diff) |
Change index.erl to use gen_server and named databases.
Prefetch indices in frontend:fetchmissingentries/2.
Diffstat (limited to 'src')
-rw-r--r-- | src/db.erl | 17 | ||||
-rw-r--r-- | src/frontend.erl | 31 | ||||
-rw-r--r-- | src/index.erl | 167 | ||||
-rw-r--r-- | src/plop_app.erl | 1 | ||||
-rw-r--r-- | src/plop_sup.erl | 18 | ||||
-rw-r--r-- | src/storagedb.erl | 12 |
6 files changed, 151 insertions, 95 deletions
@@ -52,7 +52,7 @@ sendsth_verified() -> end. indexsize() -> - index:indexsize(index_path()). + index:indexsize(index_db). init(_Args) -> {ok, []}. @@ -182,11 +182,6 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% %% The meat. -% Table for Index -> Leaf hash -index_path() -> - {ok, Value} = application:get_env(plop, index_path), - Value. - % File that stores the number of verified entries verifiedsize_path() -> {ok, Value} = application:get_env(plop, verifiedsize_path), @@ -209,10 +204,10 @@ index_for_leafhash(LeafHash) -> end. leafhash_for_index(Index) -> - index:get(index_path(), Index). + index:get(index_db, Index). leafhash_for_indices(Start, End) -> - index:getrange(index_path(), Start, End). + index:getrange(index_db, Start, End). leafhash_for_entryhash(EntryHash) -> perm:getvalue(entryhash_db, EntryHash). @@ -235,7 +230,7 @@ handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({add_index_nosync_noreverse, {LeafHash, Index}}, _From, State) -> - ok = index:add_nosync(index_path(), Index, LeafHash), + ok = index:add_nosync(index_db, Index, LeafHash), {reply, ok, State}. indexforhash_nosync(LeafHash, Index) -> @@ -248,6 +243,4 @@ indexforhash_dosync() -> ok. index_sync() -> - Basepath = index_path(), - ok = util:fsync([Basepath, filename:dirname(Basepath)]), - ok. + index:sync(index_db). diff --git a/src/frontend.erl b/src/frontend.erl index 3b7c15b..b7fa4b1 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -302,26 +302,41 @@ check_entry_noreverse(LeafHash, Index) -> end end. +prefetchindices(Index, []) -> + case db:leafhash_for_indices(Index, Index + 1000) of + noentry -> + case db:leafhash_for_index(Index) of + noentry -> + noentry; + Hash -> + [Hash] + end; + Hashes -> + Hashes + end; +prefetchindices(_Index, PrefetchList) -> + PrefetchList. + -spec fetchmissingentries(non_neg_integer(), non_neg_integer()) -> [binary() | noentry]. fetchmissingentries(Index, MaxEntries) -> - lists:reverse(fetchmissingentries(Index, [], MaxEntries)). + lists:reverse(fetchmissingentries(Index, [], [], MaxEntries)). --spec fetchmissingentries(non_neg_integer(), [binary() | noentry], non_neg_integer()) -> +-spec fetchmissingentries(non_neg_integer(), [binary() | noentry], [binary()], non_neg_integer()) -> [binary() | noentry]. -fetchmissingentries(_Index, Acc, 0) -> +fetchmissingentries(_Index, Acc, _PrefetchList, 0) -> Acc; -fetchmissingentries(Index, Acc, MaxEntries) -> +fetchmissingentries(Index, Acc, PrefetchList, MaxEntries) -> lager:debug("index ~p", [Index]), - case db:leafhash_for_index(Index) of + case prefetchindices(Index, PrefetchList) of noentry -> Acc; - Hash -> + [Hash|PrefetchRest] -> case db:entry_for_leafhash(Hash) of noentry -> lager:debug("didn't find hash ~p", [Hash]), - fetchmissingentries(Index + 1, [Hash | Acc], MaxEntries - 1); + fetchmissingentries(Index + 1, [Hash | Acc], PrefetchRest, MaxEntries - 1); _ -> - fetchmissingentries(Index + 1, Acc, MaxEntries) + fetchmissingentries(Index + 1, Acc, PrefetchRest, MaxEntries) end end. diff --git a/src/index.erl b/src/index.erl index a91c17c..fe47f59 100644 --- a/src/index.erl +++ b/src/index.erl @@ -12,75 +12,121 @@ %% TODO: Checksums -module(index). +-behaviour(gen_server). + +-export([start_link/2, stop/1, init_module/0]). -export([get/2, getrange/3, add/3, add_nosync/3, addlast_nosync/2, indexsize/1, sync/1]). --define(ENTRYSIZE, 32). --define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). --spec add(string(), integer() | last, binary()) -> ok. -add(Basepath, Index, Entry) -> - add(Basepath, Index, Entry, sync). +-record(state, {name, add_file}). --spec add_nosync(string(), integer() | last, binary()) -> ok. -add_nosync(Basepath, Index, Entry) -> - add(Basepath, Index, Entry, nosync). +-define(DIRECTORY_TABLE, index_directory). + +init_module() -> + case ets:info(?DIRECTORY_TABLE) of + undefined -> ok; + _ -> ets:delete(?DIRECTORY_TABLE) + end, + ets:new(?DIRECTORY_TABLE, [set, public, named_table]). + +start_link(Name, Filename) -> + gen_server:start_link({local, Name}, ?MODULE, + [Name, Filename], []). -add(Basepath, Index, Entry, Syncflag) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> - case file:open(Basepath, [read, write, binary]) of +stop(Name) -> + gen_server:call(Name, stop). + +init([Name, Filename]) -> + lager:debug("registering ~p with file name ~p", [Name, Filename]), + true = ets:insert(?DIRECTORY_TABLE, {Name, Filename}), + case file:open(Filename, [read, write, binary]) of {ok, File} -> - {ok, Position} = file:position(File, eof), - Mode = case Index of - last when Position rem ?ENTRYSIZEINFILE == 0 -> - write; - Index when is_integer(Index), - Index * ?ENTRYSIZEINFILE == Position -> - write; - Index when is_integer(Index), - Index * ?ENTRYSIZEINFILE < Position -> - read; - _ -> - util:exit_with_error(invalid, writefile, - "Index not valid") - end, - EntryText = hex:bin_to_hexstr(Entry) ++ "\n", - case Mode of - write -> - ok = file:write(File, EntryText); - read -> - {ok, _Position} = - file:position(File, {bof, Index * ?ENTRYSIZEINFILE}), - {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE), - %% check that the written content is the same as - %% the old content - case binary_to_list(OldEntryText) of - EntryText -> - ok; - _ -> - util:exit_with_error(invalid, writefile, - "Written content not the" ++ - " same as old content") - end - end, - ok = file:close(File), - case Syncflag of - sync -> - sync(Basepath); - nosync -> - ok - end; + {ok, #state{name = Name, add_file = File}}; {error, Error} -> - util:exit_with_error(Error, writefile, - "Error opening file for writing") + {stop, Error} + end. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + io:format("~p terminating~n", [?MODULE]), + ok. + + +-spec add(string(), integer() | last, binary()) -> ok. +add(Name, Index, Entry) -> + ok = gen_server:call(Name, {add, Index, Entry}), + sync(Name). + +-spec add_nosync(string(), integer() | last, binary()) -> ok. +add_nosync(Name, Index, Entry) -> + gen_server:call(Name, {add, Index, Entry}). + +handle_call({add, Index, Entry}, _From, State) -> + Result = add_internal(State#state.add_file, Index, Entry), + {reply, Result, State}; +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. + + +-define(ENTRYSIZE, 32). +-define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). + + +add_internal(File, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> + {ok, Position} = file:position(File, eof), + Mode = case Index of + last when Position rem ?ENTRYSIZEINFILE == 0 -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE == Position -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE < Position -> + read; + _ -> + util:exit_with_error(invalid, writefile, + "Index not valid") + end, + EntryText = hex:bin_to_hexstr(Entry) ++ "\n", + case Mode of + write -> + ok = file:write(File, EntryText); + read -> + {ok, _Position} = + file:position(File, {bof, Index * ?ENTRYSIZEINFILE}), + {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE), + %% check that the written content is the same as + %% the old content + case binary_to_list(OldEntryText) of + EntryText -> + ok; + _ -> + util:exit_with_error(invalid, writefile, + "Written content not the" ++ + " same as old content") + end end. -spec sync(string()) -> ok. -sync(Basepath) -> +sync(Name) -> + [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), util:fsync([Basepath, filename:dirname(Basepath)]). -spec addlast_nosync(string(), binary()) -> ok. -addlast_nosync(Basepath, Entry) -> - add_nosync(Basepath, last, Entry). +addlast_nosync(Name, Entry) -> + add_nosync(Name, last, Entry). decodedata(Binary) -> lists:reverse(decodedata(Binary, [])). @@ -94,7 +140,8 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> "Index line not ending with linefeed"). -spec indexsize(string()) -> integer(). -indexsize(Basepath) -> +indexsize(Name) -> + [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), @@ -107,8 +154,8 @@ indexsize(Basepath) -> end. -spec get(string(), integer()) -> binary() | noentry. -get(Basepath, Index) -> - case getrange(Basepath, Index, Index) of +get(Name, Index) -> + case getrange(Name, Index, Index) of noentry -> noentry; [Entry] -> @@ -116,7 +163,9 @@ get(Basepath, Index) -> end. -spec getrange(string(), integer(), integer()) -> [binary()] | noentry. -getrange(Basepath, Start, End) when Start =< End -> +getrange(Name, Start, End) when Start =< End -> + lager:debug("db ~p", [Name]), + [{_, Basepath}] = ets:lookup(?DIRECTORY_TABLE, Name), lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]), case file:open(Basepath, [read, binary]) of {ok, File} -> diff --git a/src/plop_app.erl b/src/plop_app.erl index dc896e2..611012a 100644 --- a/src/plop_app.erl +++ b/src/plop_app.erl @@ -10,6 +10,7 @@ start(normal, Args) -> http_auth:init_key_table(), plop:initsize(), perm:init_module(), + index:init_module(), plop_sup:start_link(Args). stop(_State) -> diff --git a/src/plop_sup.erl b/src/plop_sup.erl index 442eabc..55d6e9a 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -26,11 +26,11 @@ permanent_worker(Name, StartFunc, Modules) -> 10000, worker, Modules}. -perm_database_children(DB) -> - lists:filtermap(fun ({Name, DBName, ConfigName}) -> +database_children(DB) -> + lists:filtermap(fun ({Module, Name, DBName, ConfigName}) -> case application:get_env(plop, ConfigName) of {ok, Path} -> - {true, permanent_worker(Name, {perm, start_link, [DBName, Path]})}; + {true, permanent_worker(Name, {Module, start_link, [DBName, Path]})}; undefined -> false end @@ -39,11 +39,13 @@ perm_database_children(DB) -> %% Supervisor callback init([]) -> Services = application:get_env(plop, services, []), - DBChildren = perm_database_children([ - {the_entryhash_db, entryhash_db, entryhash_root_path}, - {the_indexforhash_db, indexforhash_db, indexforhash_root_path}, - {the_entry_db, entry_db, entry_root_path} - ]), + DBChildren = database_children([ + {perm, the_entryhash_db, entryhash_db, entryhash_root_path}, + {perm, the_indexforhash_db, indexforhash_db, indexforhash_root_path}, + {perm, the_entry_db, entry_db, entry_root_path}, + {index, the_index_db, index_db, index_path}, + {index, the_newentries_db, newentries_db, newentries_path} + ]), Children = [permanent_worker(the_db, {db, start_link, []}, [db]), permanent_worker(the_storagedb, {storagedb, start_link, []}), permanent_worker(fsync, {fsyncport, start_link, []})], diff --git a/src/storagedb.erl b/src/storagedb.erl index d781033..9f7da37 100644 --- a/src/storagedb.erl +++ b/src/storagedb.erl @@ -31,11 +31,11 @@ lastverifiednewentry_path() -> %% Public API. fetchnewhashes(Index) -> - case index:indexsize(newentries_path()) of + case index:indexsize(newentries_db) of 0 -> []; Size -> - index:getrange(newentries_path(), Index, Size - 1) + index:getrange(newentries_db, Index, Size - 1) end. lastverifiednewentry() -> @@ -52,7 +52,7 @@ lastverifiednewentry() -> -spec add(binary()) -> ok. add(LeafHash) -> ok = call(?MODULE, {add_nosync, LeafHash}), - ok = index:sync(newentries_path()), + ok = index:sync(newentries_db), ok. %%%%%%%%%%%%%%%%%%%% @@ -73,13 +73,9 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% -newentries_path() -> - {ok, Value} = application:get_env(plop, newentries_path), - Value. - handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({add_nosync, LeafHash}, _From, State) -> - ok = index:addlast_nosync(newentries_path(), LeafHash), + ok = index:addlast_nosync(newentries_db, LeafHash), {reply, ok, State}. |