From 3e9fae0e1a2b77187d62b5adfd5a10f79e09fa42 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 20 Aug 2015 16:21:42 +0200 Subject: Save sendsth verification position and restart from that point --- src/db.erl | 52 ++++++++++++++++++++++++++++++++++++++++------------ src/frontend.erl | 55 ++++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 86 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/db.erl b/src/db.erl index feb864b..8c096a1 100644 --- a/src/db.erl +++ b/src/db.erl @@ -7,13 +7,14 @@ %% API. -export([start_link/0, stop/0]). -export([create_size_table/0]). --export([add/2, add_entryhash/2, add_index_nosync/2, set_treesize/1, size/0]). +-export([add/2, add_entryhash/2, set_treesize/1, size/0]). -export([add_index_nosync_noreverse/2]). -export([verifiedsize/0, set_verifiedsize/1]). +-export([sendsth_verified/0, set_sendsth_verified/2]). -export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]). -export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). -export([leafhash_for_indices/2, indexsize/0]). --export([indexforhash_sync/2, index_sync/0]). +-export([indexforhash_sync/2, indexforhash_nosync/2, index_sync/0]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -31,6 +32,25 @@ size() -> verifiedsize() -> binary_to_integer(atomic:readfile(verifiedsize_path())). +sendsth_verified() -> + case atomic:readfile(sendsth_verified_path()) of + noentry -> + -1; + Contents -> + {struct, PropList} = mochijson2:decode(Contents), + Index = proplists:get_value(<<"index">>, PropList), + HashHex = proplists:get_value(<<"hash">>, PropList), + Hash = mochihex:to_bin(binary_to_list(HashHex)), + HashFromDB = leafhash_for_index(Index), + case Hash of + HashFromDB -> + Index; + _ -> + lager:error("~p corrupt: contained ~p:~p, but real hash is ~p", [sendsth_verified_path(), Index, Hash, HashFromDB]), + -1 + end + end. + indexsize() -> index:indexsize(index_path()). @@ -66,10 +86,6 @@ add(LeafHash, Data) -> add_entryhash(LeafHash, EntryHash) -> perm:ensurefile_nosync(entryhash_root_path(), EntryHash, LeafHash). --spec add_index_nosync(binary(), non_neg_integer()) -> ok. -add_index_nosync(LeafHash, Index) -> - call(?MODULE, {add_index_nosync, {LeafHash, Index}}). - -spec add_index_nosync_noreverse(binary(), non_neg_integer()) -> ok. add_index_nosync_noreverse(LeafHash, Index) -> call(?MODULE, {add_index_nosync_noreverse, {LeafHash, Index}}). @@ -83,6 +99,14 @@ set_treesize(Size) -> set_verifiedsize(Size) -> ok = atomic:replacefile(verifiedsize_path(), integer_to_binary(Size)). +-spec set_sendsth_verified(non_neg_integer(), binary()) -> ok. +set_sendsth_verified(Index, Hash) -> + Contents = mochijson2:encode( + {[{index, Index}, + {hash, list_to_binary(mochihex:to_hex(Hash))} + ]}), + ok = atomic:replacefile(sendsth_verified_path(), Contents). + -spec get_by_indices(integer(), integer(), {sorted, true|false}) -> [{non_neg_integer(), binary(), notfetched}]. get_by_indices(Start, End, {sorted, _Sorted}) -> @@ -169,6 +193,11 @@ verifiedsize_path() -> {ok, Value} = application:get_env(plop, verifiedsize_path), Value. +% File that stores how far sendsth has verified entries +sendsth_verified_path() -> + {ok, Value} = application:get_env(plop, sendsth_verified_path), + Value. + entry_for_leafhash(LeafHash) -> perm:readfile(entry_root_path(), LeafHash). @@ -206,16 +235,15 @@ get_by_indices_helper(Start, End) -> handle_call(stop, _From, State) -> {stop, normal, stopped, State}; -handle_call({add_index_nosync, {LeafHash, Index}}, _From, State) -> - ok = perm:ensurefile_nosync(indexforhash_root_path(), - LeafHash, integer_to_binary(Index)), - ok = index:add_nosync(index_path(), Index, LeafHash), - {reply, ok, State}; - handle_call({add_index_nosync_noreverse, {LeafHash, Index}}, _From, State) -> ok = index:add_nosync(index_path(), Index, LeafHash), {reply, ok, State}. +indexforhash_nosync(LeafHash, Index) -> + ok = perm:ensurefile_nosync(indexforhash_root_path(), + LeafHash, integer_to_binary(Index)), + ok. + indexforhash_sync(LeafHash, Index) -> ok = perm:ensurefile(indexforhash_root_path(), LeafHash, integer_to_binary(Index)), diff --git a/src/frontend.erl b/src/frontend.erl index f0c0614..5b9157b 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -29,10 +29,7 @@ request(post, "plop/v1/frontend/sendlog", Input) -> Indices = lists:seq(Start, Start + length(Hashes) - 1), lists:foreach(fun ({Hash, Index}) -> - ok = db:add_index_nosync(Hash, Index) - end, lists:zip(Hashes, Indices)), - lists:foreach(fun ({Hash, Index}) -> - ok = db:indexforhash_sync(Hash, Index) + ok = db:add_index_nosync_noreverse(Hash, Index) end, lists:zip(Hashes, Indices)), ok = db:index_sync(), success({[{result, <<"ok">>}]}) @@ -59,11 +56,10 @@ request(post, "plop/v1/frontend/sendsth", Input) -> Treesize > Indexsize -> html("Has too few entries", Indexsize); true -> - NewEntries = get_new_entries(OldSize, Treesize), - lager:debug("old size: ~p new size: ~p entries: ~p", - [OldSize, Treesize, NewEntries]), + lager:debug("old size: ~p new size: ~p", + [OldSize, Treesize]), - Errors = check_entries(NewEntries, OldSize, Treesize - 1), + Errors = check_entries(Treesize), case Errors of [] -> @@ -199,7 +195,48 @@ get_new_entries(OldSize, Treesize) when OldSize < Treesize -> get_new_entries(OldSize, Treesize) when OldSize == Treesize -> []. -check_entries(Entries, Start, End) -> +check_entries(Treesize) -> + End = Treesize - 1, + Start = db:sendsth_verified() + 1, + lager:debug("Top level checking entries ~p-~p", [Start, End]), + check_entries_chunked(Start, End). + +check_entries_chunked(Start, End) -> + lager:debug("Checking entries ~p-~p", [Start, End]), + Chunksize = 1, + PartialEnd = min(Start + Chunksize - 1, End), + case check_entries_onechunk(Start, PartialEnd) of + [] when PartialEnd == End -> + []; + [] -> + check_entries_chunked(PartialEnd + 1, End); + Errors -> + Errors + end. + +check_entries_onechunk(Start, End) -> + Entries = get_new_entries(Start, End + 1), + lager:debug("Checking chunk ~p-~p: ~p", [Start, End, Entries]), + lists:foreach(fun ({Hash, Index}) -> + ok = db:indexforhash_nosync(Hash, Index) + end, lists:zip(Entries, lists:seq(Start, End))), + case check_entries_int(Entries, Start, End) of + [] -> + lists:foreach(fun ({Hash, Index}) -> + ok = db:indexforhash_sync(Hash, Index) + end, lists:zip(Entries, lists:seq(Start, End))), + case Entries of + [] -> + none; + Entries -> + db:set_sendsth_verified(End, lists:nth(End - Start + 1, Entries)) + end, + []; + Errors -> + Errors + end. + +check_entries_int(Entries, Start, End) -> lists:foldl(fun ({Hash, Index}, Acc) -> case check_entry(Hash, Index) of ok -> -- cgit v1.1