diff options
Diffstat (limited to 'merge')
-rw-r--r-- | merge/ebin/merge.app | 4 | ||||
-rw-r--r-- | merge/src/merge_fetch_ctrl.erl | 24 | ||||
-rw-r--r-- | merge/src/merge_fetch_fetch.erl | 72 | ||||
-rw-r--r-- | merge/src/merge_fetch_fetch_sup.erl | 30 | ||||
-rw-r--r-- | merge/src/merge_fetch_newentries.erl | 26 | ||||
-rw-r--r-- | merge/src/merge_sup.erl | 5 |
6 files changed, 146 insertions, 15 deletions
diff --git a/merge/ebin/merge.app b/merge/ebin/merge.app index b34334a..c195185 100644 --- a/merge/ebin/merge.app +++ b/merge/ebin/merge.app @@ -6,8 +6,8 @@ {application, merge, [{description, "Plop merge"}, {vsn, "1.0.1-alpha-dev"}, - {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup]}, - {applications, [kernel, stdlib, lager, plop]}, + {modules, [merge_app, merge_dist, merge_dist_sup, merge_sup, merge_fetch]}, + {applications, [kernel, stdlib, lager, catlfish, plop]}, {registered, [merge_dist, merge_dist_sup, merge_sup]}, {mod, {merge_app, []}} ]}. diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl index 6c055df..49fe043 100644 --- a/merge/src/merge_fetch_ctrl.erl +++ b/merge/src/merge_fetch_ctrl.erl @@ -69,7 +69,7 @@ handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State) {true, {Hash, undefined}} end end, List), - {noreply, State#state{tofetch = [ToFetch | NewEntries]}}; + {reply, ok, State#state{tofetch = ToFetch ++ NewEntries}}; handle_call({entriestofetch, Node, NMax}, _From, #state{tofetch = ToFetch} = State) -> @@ -79,12 +79,12 @@ handle_call({entriestofetch, Node, NMax}, _From, handle_call({fetchstatus, Entry, success}, _From, #state{tofetch = ToFetch} = State) -> NewToFetch = fetch_success(ToFetch, Entry), - {noreply, State#state{tofetch = NewToFetch}}; + {reply, ok, State#state{tofetch = NewToFetch}}; handle_call({fetchstatus, Entry, failure}, _From, #state{tofetch = ToFetch} = State) -> NewToFetch = fetch_failure(ToFetch, Entry), - {noreply, State#state{tofetch = NewToFetch}}. + {reply, ok, State#state{tofetch = NewToFetch}}. handle_cast(_Request, State) -> {noreply, State}. @@ -97,8 +97,18 @@ terminate(_Reason, _State) -> ok. %%%%%%%%%%%%%%%%%%%% + +write_currentsize(CurrentSize, LastHash) -> + {ok, CurrentSizeFile} = application:get_env(plop, fetched_path), + CurrentSizeData = {struct, [{index, CurrentSize - 1}, {hash, list_to_binary(hex:bin_to_hexstr(LastHash))}]}, + ok = atomic:replacefile(CurrentSizeFile, mochijson2:encode(CurrentSizeData)). + -spec fetch_success(list(), binary()) -> list(). fetch_success(ToFetch, Entry) -> + CurrentPos = db:indexsize(), + db:add_index_nosync_noreverse(Entry, CurrentPos), + db:index_sync(), + write_currentsize(CurrentPos + 1, Entry), true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}), keydelete(Entry, 1, ToFetch). @@ -116,7 +126,7 @@ fetch_failure(ToFetch, Entry) -> etf(ToFetch, Node, NMax) -> etf(ToFetch, Node, NMax, [], []). etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) -> - {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)}; + {reverse(AccToFetch) ++ ToFetchRest, reverse(AccEntries)}; etf([], Node, _NMax, AccToFetch, AccEntries) -> etf([], Node, 0, AccToFetch, AccEntries); etf([H|T], Node, NMax, AccToFetch, AccEntries) -> @@ -142,8 +152,10 @@ etf([H|T], Node, NMax, AccToFetch, AccEntries) -> not_fetched(List) -> filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of - [[[]]] -> false; % Match: Empty list. - _ -> true + [] -> + true; % Match: Empty list. + E -> + false end end, List). diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl index b2fadd9..2ba7e9a 100644 --- a/merge/src/merge_fetch_fetch.erl +++ b/merge/src/merge_fetch_fetch.erl @@ -7,19 +7,81 @@ -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). +-export([loop/2]). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -init({Name, _Address}) -> - lager:info("~p:~p: starting", [?MODULE, Name]), - {ok, []}. +init({Name, Address}) -> + lager:info("~p:~p starting", [?MODULE, Name]), + ChildPid = spawn_link(?MODULE, loop, [Name, Address]), + {ok, ChildPid}. + +decode_entry({struct, EncodedEntry}) -> + Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)), + Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)), + {Hash, Entry}. + +get_entries(_, _, []) -> + {ok, []}; +get_entries(NodeName, NodeAddress, Hashes) -> + DebugTag = "getentry", + URL = NodeAddress ++ "getentry", + EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes), + Params = hackney_url:qs(EncodedHashes), + URLWithParams = [URL, "?", Params], + case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of + {<<"ok">>, PropList} -> + Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)), + {ok, Entries}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +loop(Name, Address) -> + receive after 1000 -> ok end, + lager:info("~p:~p: asking for entries to get from ~p", + [?MODULE, Name, Address]), + Hashes = merge_fetch_ctrl:entriestofetch(Name, 10), + {ok, Entries} = get_entries(Name, Address, Hashes), + lists:foreach(fun ({Hash, Entry}) -> + + try + case plop:verify_entry(Entry) of + {ok, Hash} -> + ok; + {ok, DifferentLeafHash} -> + lager:error("leaf hash not correct: requested hash is ~p " ++ + "and contents are ~p", + [hex:bin_to_hexstr(Hash), + hex:bin_to_hexstr(DifferentLeafHash)]), + throw({request_error, result, "", differentleafhash}); + {error, Reason} -> + lager:error("verification failed: ~p", [Reason]), + throw({request_error, result, "", verificationfailed}) + end + catch + Type:What -> + [CrashFunction | Stack] = erlang:get_stacktrace(), + lager:error("Crash: ~p ~p~n~p~n~p~n", + [Type, What, CrashFunction, Stack]), + throw(What) + end, + + db:add_entry_sync(Hash, Entry), + + merge_fetch_ctrl:fetchstatus(Hash, success), + end, Entries), + loop(Name, Address). + %% TODO: if we crash here, we restart all of fetch -- spawn child proc %% for the actual fetching? -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}. +handle_call(stop, _From, ChildPid) -> + lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]), + exit(ChildPid, stop), + {stop, normal, stopped, nil}. handle_cast(_Request, State) -> {noreply, State}. handle_info(_Info, State) -> diff --git a/merge/src/merge_fetch_fetch_sup.erl b/merge/src/merge_fetch_fetch_sup.erl new file mode 100644 index 0000000..fb89ab4 --- /dev/null +++ b/merge/src/merge_fetch_fetch_sup.erl @@ -0,0 +1,30 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_fetch_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(Nodes) -> + {ok, Pid} = + supervisor:start_link({local, ?MODULE}, ?MODULE, []), + Children = + lists:map(fun({NodeName, NodeAddress}) -> + lager:info("starting fetch worker: ~p", [NodeName]), + + {ok, Child} = supervisor:start_child( + ?MODULE, + [{NodeName, NodeAddress}]), + Child + end, Nodes), + lager:debug("~p started newentry workers: ~p", [Pid, Children]), + {ok, Pid}. + +init([]) -> + {ok, + {{simple_one_for_one, 3, 10}, + [{ignored, + {merge_fetch_fetch, start_link, []}, + permanent, 10000, worker, + [merge_fetch_fetch]}]}}. diff --git a/merge/src/merge_fetch_newentries.erl b/merge/src/merge_fetch_newentries.erl index b45aaec..befb3f7 100644 --- a/merge/src/merge_fetch_newentries.erl +++ b/merge/src/merge_fetch_newentries.erl @@ -21,10 +21,34 @@ handle_call(stop, _From, ChildPid) -> exit(ChildPid, stop), {stop, normal, stopped, nil}. +get_newentries(NodeName, NodeAddress) -> + DebugTag = "fetchnewentries", + URL = NodeAddress ++ "fetchnewentries", + case merge_util:request(DebugTag, URL, NodeName) of + {<<"ok">>, PropList} -> + Entries = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"entries">>, PropList)), + {ok, Entries}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + loop(Name, Address, Period) -> + receive after Period -> ok end, lager:info("~p:~p: asking storage node at ~p for missing entries", [?MODULE, Name, Address]), - receive after Period -> ok end, + EntriesResult = try + get_newentries(Name, Address) + catch + throw:{request_error,request,Tag,Error2} -> + {error, Tag, Error2} + end, + case EntriesResult of + {ok, Entries} -> + lager:debug("got entries: ~p", [Entries]), + merge_fetch_ctrl:newentries(Entries, Name); + {error, _Tag, Error} -> + lager:info("failed to get entries from ~p: ~p", [Name, Error]) + end, loop(Name, Address, Period). handle_cast(_Request, State) -> diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl index a158077..7373de4 100644 --- a/merge/src/merge_sup.erl +++ b/merge/src/merge_sup.erl @@ -11,6 +11,7 @@ start_link(_Args) -> init([]) -> {ok, LogorderPath} = application:get_env(plop, index_path), + {ok, StorageNodes} = plopconfig:get_env(storage_nodes), {ok, {{one_for_one, 3, 10}, [ @@ -21,5 +22,7 @@ init([]) -> {merge_dist_sup, {merge_dist_sup, start_link, [[]]}, transient, infinity, supervisor, [merge_dist_sup]}, {merge_sth, {merge_sth, start_link, [[]]}, - transient, 10000, worker, [merge_sth]} + transient, 10000, worker, [merge_sth]}, + {merge_fetch_sup, {merge_fetch_sup, start_link, [StorageNodes]}, + transient, 10000, worker, [merge_fetch]} ]}}. |