diff options
Diffstat (limited to 'merge/src/merge_fetch_ctrl.erl')
-rw-r--r-- | merge/src/merge_fetch_ctrl.erl | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl new file mode 100644 index 0000000..6c055df --- /dev/null +++ b/merge/src/merge_fetch_ctrl.erl @@ -0,0 +1,308 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_ctrl). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). +-export([newentries/2]). +-export([entriestofetch/2, fetchstatus/2]). + +-import(lists, [keydelete/3, keymember/3, keyreplace/4, member/2, reverse/1, + filter/2, split/2, filtermap/2]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(MISSINGENTRIES_TABLE, missingentries). + +%% tofetch contains entries that are not already fetched. Each list +%% member is on the form {Hash, BeingFetchedAt} with BeingFetchedAt +%% being a node or undefined. +-record(state, {tofetch :: [{binary(), atom()}]}). + +%%%%%%%%%%%%%%%%%%%% +%% @doc Update the missingentries table and the tofetch list with new +%% entries in List, available at Node. +newentries(List, Node) -> + %% not_fetched() is called here, in caller context, before calling + %% the server. + gen_server:call(?MODULE, {newentries, not_fetched(List), Node}). + +%% @doc Return list of entries for Node to fetch, but no more than +%% NMAx of them. +-spec entriestofetch(atom(), non_neg_integer()) -> list(). +entriestofetch(Node, NMax) -> + gen_server:call(?MODULE, {entriestofetch, Node, NMax}). + +%% Update the missingentries table and the tofetch list to reflect +%% that the fetching of Entry has been either succesfull or has +%% failed. +-spec fetchstatus(binary(), success|failure) -> ok. +fetchstatus(Entry, Status) -> % FIXME: Rename function. + gen_server:call(?MODULE, {fetchstatus, Entry, Status}). + +%%%%%%%%%%%%%%%%%%%% +start_link(Args) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []). + +init(StorageNodes) -> + lager:info("~p starting with storagenodes ~p", [?MODULE, StorageNodes]), + ok = init_missingentries(protected), + ok = init_fetchers(StorageNodes), + {ok, #state{tofetch = []}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +%% @doc Add entries in List to table (missingentries) and list (tofetch). +%% NOTE: Traversing List twice. +handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State) -> + [missingentries_update(H, Node) || H <- List], + NewEntries = + filtermap(fun(Hash) -> + case keymember(Hash, 1, ToFetch) of + true -> + false; + false -> + {true, {Hash, undefined}} + end + end, List), + {noreply, State#state{tofetch = [ToFetch | NewEntries]}}; + +handle_call({entriestofetch, Node, NMax}, _From, + #state{tofetch = ToFetch} = State) -> + {NewToFetch, Entries} = etf(ToFetch, Node, NMax), + {reply, Entries, State#state{tofetch = NewToFetch}}; + +handle_call({fetchstatus, Entry, success}, _From, + #state{tofetch = ToFetch} = State) -> + NewToFetch = fetch_success(ToFetch, Entry), + {noreply, State#state{tofetch = NewToFetch}}; + +handle_call({fetchstatus, Entry, failure}, _From, + #state{tofetch = ToFetch} = State) -> + NewToFetch = fetch_failure(ToFetch, Entry), + {noreply, State#state{tofetch = NewToFetch}}. + +handle_cast(_Request, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +terminate(_Reason, _State) -> + lager:info("~p terminating", [?MODULE]), + ok. + +%%%%%%%%%%%%%%%%%%%% +-spec fetch_success(list(), binary()) -> list(). +fetch_success(ToFetch, Entry) -> + true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}), + keydelete(Entry, 1, ToFetch). + +-spec fetch_failure(list(), binary()) -> list(). +fetch_failure(ToFetch, Entry) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {Entry, '$1'}), + true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, + reverse([hd(Nodes) | reverse(tl(Nodes))])}), + keyreplace(Entry, 1, ToFetch, {Entry, undefined}). + +%% @doc Return list of entries to fetch for Node and an updated +%% tofetch list. +-spec etf([{binary(), atom()}], atom(), non_neg_integer()) -> + {[{binary(), atom()}], [binary()]}. +etf(ToFetch, Node, NMax) -> + etf(ToFetch, Node, NMax, [], []). +etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) -> + {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)}; +etf([], Node, _NMax, AccToFetch, AccEntries) -> + etf([], Node, 0, AccToFetch, AccEntries); +etf([H|T], Node, NMax, AccToFetch, AccEntries) -> + {Entry, BeingFetchedBy} = H, + {Acc1, Acc2} = + case BeingFetchedBy of + undefined -> + [[PresentAtNodes]] = ets:match(?MISSINGENTRIES_TABLE, + {Entry, '$1'}), + case member(Node, PresentAtNodes) of + true -> % Present at Node -- update and add. + lager:debug("Good entry for node ~p: ~p", [Node, Entry]), + {[{Entry, Node} | AccToFetch], [Entry | AccEntries]}; + false -> % Not present at Node -- move along. + lager:debug("Ignoring entry not @ ~p: ~p", [Node, Entry]), + {[H | AccToFetch], AccEntries} + end; + _ -> % Already being fetched -- move along. + lager:debug("Ignoring entry already being fetched: ~p", [Entry]), + {[H | AccToFetch], AccEntries} + end, + etf(T, Node, NMax - 1, Acc1, Acc2). + +not_fetched(List) -> + filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of + [[[]]] -> false; % Match: Empty list. + _ -> true + end + end, + List). + +insert_at_random_pos(First, Elem, List) -> + Last = length(List) + 1, + case First >= Last of + true -> + List ++ [Elem]; + false -> + case crypto:rand_uniform(First, Last + 1) of + Last -> + List ++ [Elem]; + N -> + {L1, L2} = split(N - 1, List), + L1 ++ [Elem] ++ L2 + end + end. + +%% @doc Update missingentries table Tab, return the list of nodes +%% having the entry. +-spec missingentries_update(binary(), atom()) -> [atom()]. +missingentries_update(Hash, Node) -> + case ets:match(?MISSINGENTRIES_TABLE, {Hash, '$1'}) of + [] -> + missingentries_add(Hash, [Node]); + [[Nodes]] -> + NewNodes = insert_at_random_pos(2, Node, Nodes), + true = ets:update_element(?MISSINGENTRIES_TABLE, Hash, {2, NewNodes}), + NewNodes + end. + +%% @doc Add hash (Hash) [fixme remove: or list of hashes (List)] to missingentries +%% table, with Nodes being the value. +%% We're wrapping adding and testing existence of entries to the table +%% since we're (soon to be) storing differently sized tuples, in order +%% to save one word (8 octets on a 64-bit system) per entry (saves +%% 800MB of RAM in a log with 100M entries). +-spec missingentries_add(binary(), [atom()]) -> [atom()]. +missingentries_add(Hash, Nodes) when is_binary(Hash) -> + ets:insert(?MISSINGENTRIES_TABLE, {Hash, Nodes}), + Nodes. + +-spec init_missingentries(ets:access()) -> ok | {error, term()}. +init_missingentries(ETSAccess) -> + case ets:info(?MISSINGENTRIES_TABLE) of + undefined -> ok; + _ -> ets:delete(?MISSINGENTRIES_TABLE) + end, + ets:new(?MISSINGENTRIES_TABLE, [set, named_table, ETSAccess]), + add_entries(0, index:indexsize(logorder)). + +add_entries(_Start, 0) -> + ok; +add_entries(Start, Count) -> + Chunksize = min(Count, 100000), + Entries = index:getrange(logorder, Start, Start + Chunksize - 1), + lists:foreach(fun (Entry) -> + %% [] means entry has been fetched. + missingentries_add(Entry, []) + end, Entries), + add_entries(Start + Chunksize, Count - Chunksize). + +init_fetchers(StorageNodes) -> + lists:foreach(fun(StorageNode) -> + spawn_link(merge_fetch_fetch, start_link, + [StorageNode]) + end, StorageNodes), + ok. + +%%%%%%%%%%%%%%%%%%%% +%% eunit tests. + +%% First few entries in test/testdata/merge/logorder, in binary form. +-define(LOGORDER_1, hex:hexstr_to_bin("5806F2A019465B1BCA6694DDFF05F99307D3A25CB842123110221802B1CD1813")). +-define(LOGORDER_2, hex:hexstr_to_bin("E87519AE80C0E1385D114FAF7E5BC9C4811FD0E0E155F28C026E25F0F14C7715")). +-define(LOGORDER_3, hex:hexstr_to_bin("224B10E05E3A1E38D34A0122E4B844E838A9D671288345FC77D245246A8D822F")). +-define(LOGORDER_4, hex:hexstr_to_bin("06529FA6ADA25188C527D610BE2FD6C16ED57796ACAA4CCA9B90D1FF2BE8ACD7")). + +test_setup() -> + index:init_module(), + index:start_link(logorder, "test/testdata/merge/logorder"), + + %% Table needs to be public in tests since this is run in a + %% different process than the tests themselves. + ok = init_missingentries(public), + + %% #1 is already fetched. + missingentries_update(?LOGORDER_2, node1), % #2 at node1. + missingentries_update(?LOGORDER_3, node1), % #3 at node1, + missingentries_update(?LOGORDER_3, node2), % ... node2, + missingentries_update(?LOGORDER_3, node3), % ... and node3. + missingentries_update(?LOGORDER_4, node1), + missingentries_update(?LOGORDER_4, node3), + + %% lager:debug("Nodes for LOGORDER_3: ~p", + %% [element(2, hd(ets:match_object(Tab, {?LOGORDER_3, '$1'})))]), + %% lager:debug("ETS size, bytes: ~p", + %% [ets:info(Tab, memory) * + %% erlang:system_info(wordsize)]), + + %% Return ToFetch list. + [ + {?LOGORDER_4, undefined}, % Not being fetched. + {?LOGORDER_2, undefined}, % Not being fetched. + {?LOGORDER_3, node1} % Currently being fetched by node1. + ]. + +test_cleanup(_) -> + true = ets:delete(?MISSINGENTRIES_TABLE), + index:stop(logorder). + +init_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(_ToFetch) -> + [ + ?_assertEqual(1000, ets:info(?MISSINGENTRIES_TABLE, size)), + ?_assertEqual(1, length(not_fetched([crypto:hash(sha256, <<>>)]))), + ?_assertEqual(0, length(not_fetched([?LOGORDER_1]))), + ?_assertEqual(1, length(not_fetched([?LOGORDER_2]))), + ?_assertEqual(2, length(not_fetched([crypto:hash(sha256, <<>>), ?LOGORDER_1, ?LOGORDER_2]))), + ?_assert(member(node1, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), + ?_assert(member(node2, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), + ?_assert(member(node3, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))) + ] + end}. + +etf_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [ + ?_assertEqual([?LOGORDER_4, ?LOGORDER_2], element(2, etf(ToFetch, node1, 10))), % node1 can fetch #2 and #4 (in reversed order). + ?_assertEqual([], element(2, etf(ToFetch, node1, 0))), % Limit == 0. + ?_assertEqual([], element(2, etf(ToFetch, node2, 10))), % node2 has nothing to fetch. + ?_assertEqual([?LOGORDER_4], element(2, etf(ToFetch, node3, 10))) % node3 can fetch #4 (but only because we don't remember that #2 got it above). + ] + end}. + +fetchstatus_success_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), + [ + ?_assert(ets:member(?MISSINGENTRIES_TABLE, ?LOGORDER_3)), + ?_assertEqual(3, length(Nodes)), + ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}], + fetch_success(ToFetch, ?LOGORDER_3)), + ?_assertEqual([[[]]], ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) + ] + end}. + +fetchstatus_failure_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), + [ + ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}, {?LOGORDER_3, undefined}], + fetch_failure(ToFetch, ?LOGORDER_3)), + ?_assertEqual([[tl(Nodes) ++ [hd(Nodes)]]], + ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) + ] + end}. |