diff options
author | Magnus Ahltorp <map@kth.se> | 2015-09-24 16:38:03 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2015-11-11 13:32:36 +0100 |
commit | 90760d10d14c11ee4c99826163c206bbf20a77f6 (patch) | |
tree | 5965c6bb108538507464d98595cc631dd9441aa7 | |
parent | 346a3f973b828abc21cffb8a0a976daddcabe492 (diff) |
Change perm interface to be add/commit based
-rw-r--r-- | src/db.erl | 37 | ||||
-rw-r--r-- | src/frontend.erl | 4 | ||||
-rw-r--r-- | src/fsyncport.erl | 8 | ||||
-rw-r--r-- | src/perm.erl | 96 | ||||
-rw-r--r-- | src/plop_app.erl | 1 | ||||
-rw-r--r-- | src/plop_sup.erl | 17 | ||||
-rw-r--r-- | src/util.erl | 9 |
7 files changed, 118 insertions, 54 deletions
@@ -14,7 +14,7 @@ -export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]). -export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). -export([leafhash_for_indices/2, indexsize/0]). --export([indexforhash_sync/2, indexforhash_nosync/2, index_sync/0]). +-export([indexforhash_nosync/2, indexforhash_dosync/0, index_sync/0]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -78,13 +78,14 @@ stop() -> -spec add(binary(), binary()) -> ok. add(LeafHash, Data) -> lager:debug("add leafhash ~s", [mochihex:to_hex(LeafHash)]), - ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + ok = perm:addvalue(entry_db, LeafHash, Data), + perm:commit(entry_db), lager:debug("leafhash ~s added", [mochihex:to_hex(LeafHash)]), ok. -spec add_entryhash(binary(), binary()) -> ok | differ. add_entryhash(LeafHash, EntryHash) -> - perm:ensurefile_nosync(entryhash_root_path(), EntryHash, LeafHash). + perm:addvalue(entryhash_db, EntryHash, LeafHash). -spec add_index_nosync_noreverse(binary(), non_neg_integer()) -> ok. add_index_nosync_noreverse(LeafHash, Index) -> @@ -168,26 +169,11 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% %% The meat. -% Table for Leaf hash -> Entry -entry_root_path() -> - {ok, Value} = application:get_env(plop, entry_root_path), - Value. - -% Table for Leaf hash -> Entry -indexforhash_root_path() -> - {ok, Value} = application:get_env(plop, indexforhash_root_path), - Value. - % Table for Index -> Leaf hash index_path() -> {ok, Value} = application:get_env(plop, index_path), Value. -% Table for Entry hash -> Leaf hash -entryhash_root_path() -> - {ok, Value} = application:get_env(plop, entryhash_root_path), - Value. - % File that stores the number of verified entries verifiedsize_path() -> {ok, Value} = application:get_env(plop, verifiedsize_path), @@ -199,10 +185,10 @@ sendsth_verified_path() -> Value. entry_for_leafhash(LeafHash) -> - perm:readfile(entry_root_path(), LeafHash). + perm:getvalue(entry_db, LeafHash). index_for_leafhash(LeafHash) -> - case perm:readfile(indexforhash_root_path(), LeafHash) of + case perm:getvalue(indexforhash_db, LeafHash) of noentry -> noentry; Index -> @@ -216,7 +202,7 @@ leafhash_for_indices(Start, End) -> index:getrange(index_path(), Start, End). leafhash_for_entryhash(EntryHash) -> - perm:readfile(entryhash_root_path(), EntryHash). + perm:getvalue(entryhash_db, EntryHash). get_by_indices_helper(Start, _End) when Start < 0 -> []; @@ -240,13 +226,12 @@ handle_call({add_index_nosync_noreverse, {LeafHash, Index}}, _From, State) -> {reply, ok, State}. indexforhash_nosync(LeafHash, Index) -> - ok = perm:ensurefile_nosync(indexforhash_root_path(), - LeafHash, integer_to_binary(Index)), + ok = perm:addvalue(indexforhash_db, + LeafHash, integer_to_binary(Index)), ok. -indexforhash_sync(LeafHash, Index) -> - ok = perm:ensurefile(indexforhash_root_path(), - LeafHash, integer_to_binary(Index)), +indexforhash_dosync() -> + perm:commit(indexforhash_db, 300000), ok. index_sync() -> diff --git a/src/frontend.erl b/src/frontend.erl index 5b9157b..bce26b7 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -222,9 +222,7 @@ check_entries_onechunk(Start, End) -> end, lists:zip(Entries, lists:seq(Start, End))), case check_entries_int(Entries, Start, End) of [] -> - lists:foreach(fun ({Hash, Index}) -> - ok = db:indexforhash_sync(Hash, Index) - end, lists:zip(Entries, lists:seq(Start, End))), + ok = db:indexforhash_dosync(), case Entries of [] -> none; diff --git a/src/fsyncport.erl b/src/fsyncport.erl index b688f9c..ef8b37f 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -4,7 +4,7 @@ -module(fsyncport). -behaviour(gen_server). -export([start_link/0, stop/0]). --export([fsync/1, fsyncall/1]). +-export([fsync/1, fsyncall/2]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -19,8 +19,10 @@ stop() -> fsync(Path) -> gen_server:call(?MODULE, {fsync, [Path]}). -fsyncall(Paths) -> - gen_server:call(?MODULE, {fsync, Paths}). +fsyncall([], Timeout) -> + ok; +fsyncall(Paths, Timeout) -> + gen_server:call(?MODULE, {fsync, Paths}, Timeout). -record(state, {idleports, busyports, waiting, requests}). diff --git a/src/perm.erl b/src/perm.erl index 5de8231..6c62b59 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -2,7 +2,83 @@ %%% See LICENSE for licensing information. -module(perm). --export([ensurefile/3, ensurefile_nosync/3, readfile/2]). +-behaviour(gen_server). + +-export([start_link/2, stop/1, init_directory_table/0]). +-export([getvalue/2, addvalue/3, commit/1, commit/2]). + +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, {name, dirtylistname}). + +-define(DIRECTORY_TABLE, perm_directory). + +init_directory_table() -> + case ets:info(?DIRECTORY_TABLE) of + undefined -> ok; + _ -> ets:delete(?DIRECTORY_TABLE) + end, + ets:new(?DIRECTORY_TABLE, [set, public, named_table]). + +start_link(Name, Filename) -> + gen_server:start_link({local, Name}, ?MODULE, + [Name, Filename], []). + +stop(Name) -> + gen_server:call(Name, stop). + +addfsyncfiles(Name, Files) -> + gen_server:call(Name, {addfsyncfiles, Files}). + +init([Name, Filename]) -> + Dirtylistname = list_to_atom(atom_to_list(Name) ++ "_dirty"), + true = ets:insert(?DIRECTORY_TABLE, {Name, Filename}), + ets:new(Dirtylistname, [set, public, named_table]), + {ok, #state{name = Name, dirtylistname = Dirtylistname}}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + io:format("~p terminating~n", [?MODULE]), + ok. + +handle_call({addfsyncfiles, Files}, _From, State) -> + Insert = lists:map(fun (File) -> {File} end, Files), + ets:insert(State#state.dirtylistname, Insert), + {reply, ok, State}; +handle_call({commit, Timeout}, _From, State) -> + lager:debug("doing commit for ~p", [State#state.name]), + Files = lists:map(fun ([File]) -> File end, ets:match(State#state.dirtylistname, {'$1'})), + util:fsync(Files, Timeout), + ets:delete_all_objects(State#state.dirtylistname), + {reply, ok, State}; +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. + +getvalue(Name, Key) -> + [{_, Filename}] = ets:lookup(?DIRECTORY_TABLE, Name), + readfile(Filename, Key). + +addvalue(Name, Key, Value) -> + [{_, Filename}] = ets:lookup(?DIRECTORY_TABLE, Name), + {Result, FsyncFiles} = ensurefile(Filename, Key, Value), + addfsyncfiles(Name, FsyncFiles), + Result. + +commit(Name) -> + commit(Name, 5000). + +commit(Name, Timeout) -> + gen_server:call(Name, {commit, Timeout}, Timeout). -spec readfile_and_verify(string(), binary()) -> ok | differ | {error, atom()}. readfile_and_verify(Name, Content) -> @@ -47,15 +123,7 @@ path_for_key(Rootdir, Key) -> Fullpath = Thirdlevel ++ "/" ++ Name, {[Firstlevel, Secondlevel, Thirdlevel], Fullpath}. --spec ensurefile(string(), binary(), binary()) -> ok | differ. ensurefile(Rootdir, Key, Content) -> - ensurefile(Rootdir, Key, Content, sync). - --spec ensurefile_nosync(string(), binary(), binary()) -> ok | differ. -ensurefile_nosync(Rootdir, Key, Content) -> - ensurefile(Rootdir, Key, Content, nosync). - -ensurefile(Rootdir, Key, Content, Syncflag) -> lager:debug("dir ~p key ~s", [Rootdir, mochihex:to_hex(Key)]), {Dirs, Path} = path_for_key(Rootdir, Key), Result = @@ -81,15 +149,7 @@ ensurefile(Rootdir, Key, Content, Syncflag) -> {error, Error} -> util:exit_with_error(Error, readfile, "Error reading file") end, - case Syncflag of - sync -> - lager:debug("key ~s added, fsync", [mochihex:to_hex(Key)]), - ok = util:fsync([Path, Rootdir | Dirs]), - lager:debug("key ~s fsynced", [mochihex:to_hex(Key)]), - Result; - nosync -> - Result - end. + {Result, [Path, Rootdir | Dirs]}. -spec readfile(string(), binary()) -> binary() | noentry. readfile(Rootdir, Key) -> diff --git a/src/plop_app.erl b/src/plop_app.erl index f8cb03f..e154555 100644 --- a/src/plop_app.erl +++ b/src/plop_app.erl @@ -9,6 +9,7 @@ start(normal, Args) -> hackney:start(), http_auth:init_key_table(), plop:initsize(), + perm:init_directory_table(), plop_sup:start_link(Args). stop(_State) -> diff --git a/src/plop_sup.erl b/src/plop_sup.erl index 87316f4..442eabc 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -26,9 +26,24 @@ permanent_worker(Name, StartFunc, Modules) -> 10000, worker, Modules}. +perm_database_children(DB) -> + lists:filtermap(fun ({Name, DBName, ConfigName}) -> + case application:get_env(plop, ConfigName) of + {ok, Path} -> + {true, permanent_worker(Name, {perm, start_link, [DBName, Path]})}; + undefined -> + false + end + end, DB). + %% Supervisor callback init([]) -> Services = application:get_env(plop, services, []), + DBChildren = perm_database_children([ + {the_entryhash_db, entryhash_db, entryhash_root_path}, + {the_indexforhash_db, indexforhash_db, indexforhash_root_path}, + {the_entry_db, entry_db, entry_root_path} + ]), Children = [permanent_worker(the_db, {db, start_link, []}, [db]), permanent_worker(the_storagedb, {storagedb, start_link, []}), permanent_worker(fsync, {fsyncport, start_link, []})], @@ -41,5 +56,5 @@ init([]) -> end end, Services), {ok, {{one_for_one, 3, 10}, - Children ++ OptionalChildren + DBChildren ++ Children ++ OptionalChildren }}. diff --git a/src/util.erl b/src/util.erl index 2125d5e..c3b30db 100644 --- a/src/util.erl +++ b/src/util.erl @@ -2,7 +2,7 @@ %%% See LICENSE for licensing information. -module(util). --export([tempfilename/1, fsync/1, exit_with_error/3, +-export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3, check_error/3, write_tempfile_and_rename/3, spawn_and_wait/1]). @@ -14,14 +14,17 @@ tempfilename(Base) -> Filename. -spec fsync([string()]) -> ok. -fsync(Paths) -> - case fsyncport:fsyncall(Paths) of +fsync(Paths, Timeout) -> + case fsyncport:fsyncall(Paths, Timeout) of ok -> ok; {error, Error} -> exit_with_error(fsync, Error, "Error in fsync") end. +fsync(Paths) -> + fsync(Paths, 5000). + -spec exit_with_error(atom(), atom(), string()) -> no_return(). exit_with_error(Operation, Error, ErrorMessage) -> io:format("~s(~w): ~w~n", [ErrorMessage, Operation, Error]), |