From 489df8ecaf16ca7429eb15b31ffbe6f686f5b0d1 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Thu, 6 Jul 2017 18:15:08 +0200 Subject: wip --- merge/src/merge_fetch_ctrl.erl | 308 +++++++++++++++++++++++++++++++ merge/src/merge_fetch_fetch.erl | 31 ++++ merge/src/merge_fetch_newentries.erl | 38 ++++ merge/src/merge_fetch_newentries_sup.erl | 30 +++ merge/src/merge_fetch_sup.erl | 29 +++ 5 files changed, 436 insertions(+) create mode 100644 merge/src/merge_fetch_ctrl.erl create mode 100644 merge/src/merge_fetch_fetch.erl create mode 100644 merge/src/merge_fetch_newentries.erl create mode 100644 merge/src/merge_fetch_newentries_sup.erl create mode 100644 merge/src/merge_fetch_sup.erl diff --git a/merge/src/merge_fetch_ctrl.erl b/merge/src/merge_fetch_ctrl.erl new file mode 100644 index 0000000..6c055df --- /dev/null +++ b/merge/src/merge_fetch_ctrl.erl @@ -0,0 +1,308 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_ctrl). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). +-export([newentries/2]). +-export([entriestofetch/2, fetchstatus/2]). + +-import(lists, [keydelete/3, keymember/3, keyreplace/4, member/2, reverse/1, + filter/2, split/2, filtermap/2]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(MISSINGENTRIES_TABLE, missingentries). + +%% tofetch contains entries that are not already fetched. Each list +%% member is on the form {Hash, BeingFetchedAt} with BeingFetchedAt +%% being a node or undefined. +-record(state, {tofetch :: [{binary(), atom()}]}). + +%%%%%%%%%%%%%%%%%%%% +%% @doc Update the missingentries table and the tofetch list with new +%% entries in List, available at Node. +newentries(List, Node) -> + %% not_fetched() is called here, in caller context, before calling + %% the server. + gen_server:call(?MODULE, {newentries, not_fetched(List), Node}). + +%% @doc Return list of entries for Node to fetch, but no more than +%% NMAx of them. +-spec entriestofetch(atom(), non_neg_integer()) -> list(). +entriestofetch(Node, NMax) -> + gen_server:call(?MODULE, {entriestofetch, Node, NMax}). + +%% Update the missingentries table and the tofetch list to reflect +%% that the fetching of Entry has been either succesfull or has +%% failed. +-spec fetchstatus(binary(), success|failure) -> ok. +fetchstatus(Entry, Status) -> % FIXME: Rename function. + gen_server:call(?MODULE, {fetchstatus, Entry, Status}). + +%%%%%%%%%%%%%%%%%%%% +start_link(Args) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []). + +init(StorageNodes) -> + lager:info("~p starting with storagenodes ~p", [?MODULE, StorageNodes]), + ok = init_missingentries(protected), + ok = init_fetchers(StorageNodes), + {ok, #state{tofetch = []}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +%% @doc Add entries in List to table (missingentries) and list (tofetch). +%% NOTE: Traversing List twice. +handle_call({newentries, List, Node}, _From, #state{tofetch = ToFetch} = State) -> + [missingentries_update(H, Node) || H <- List], + NewEntries = + filtermap(fun(Hash) -> + case keymember(Hash, 1, ToFetch) of + true -> + false; + false -> + {true, {Hash, undefined}} + end + end, List), + {noreply, State#state{tofetch = [ToFetch | NewEntries]}}; + +handle_call({entriestofetch, Node, NMax}, _From, + #state{tofetch = ToFetch} = State) -> + {NewToFetch, Entries} = etf(ToFetch, Node, NMax), + {reply, Entries, State#state{tofetch = NewToFetch}}; + +handle_call({fetchstatus, Entry, success}, _From, + #state{tofetch = ToFetch} = State) -> + NewToFetch = fetch_success(ToFetch, Entry), + {noreply, State#state{tofetch = NewToFetch}}; + +handle_call({fetchstatus, Entry, failure}, _From, + #state{tofetch = ToFetch} = State) -> + NewToFetch = fetch_failure(ToFetch, Entry), + {noreply, State#state{tofetch = NewToFetch}}. + +handle_cast(_Request, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +terminate(_Reason, _State) -> + lager:info("~p terminating", [?MODULE]), + ok. + +%%%%%%%%%%%%%%%%%%%% +-spec fetch_success(list(), binary()) -> list(). +fetch_success(ToFetch, Entry) -> + true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, []}), + keydelete(Entry, 1, ToFetch). + +-spec fetch_failure(list(), binary()) -> list(). +fetch_failure(ToFetch, Entry) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {Entry, '$1'}), + true = ets:insert(?MISSINGENTRIES_TABLE, {Entry, + reverse([hd(Nodes) | reverse(tl(Nodes))])}), + keyreplace(Entry, 1, ToFetch, {Entry, undefined}). + +%% @doc Return list of entries to fetch for Node and an updated +%% tofetch list. +-spec etf([{binary(), atom()}], atom(), non_neg_integer()) -> + {[{binary(), atom()}], [binary()]}. +etf(ToFetch, Node, NMax) -> + etf(ToFetch, Node, NMax, [], []). +etf(ToFetchRest, _Node, 0, AccToFetch, AccEntries) -> + {[reverse(AccToFetch) | ToFetchRest], reverse(AccEntries)}; +etf([], Node, _NMax, AccToFetch, AccEntries) -> + etf([], Node, 0, AccToFetch, AccEntries); +etf([H|T], Node, NMax, AccToFetch, AccEntries) -> + {Entry, BeingFetchedBy} = H, + {Acc1, Acc2} = + case BeingFetchedBy of + undefined -> + [[PresentAtNodes]] = ets:match(?MISSINGENTRIES_TABLE, + {Entry, '$1'}), + case member(Node, PresentAtNodes) of + true -> % Present at Node -- update and add. + lager:debug("Good entry for node ~p: ~p", [Node, Entry]), + {[{Entry, Node} | AccToFetch], [Entry | AccEntries]}; + false -> % Not present at Node -- move along. + lager:debug("Ignoring entry not @ ~p: ~p", [Node, Entry]), + {[H | AccToFetch], AccEntries} + end; + _ -> % Already being fetched -- move along. + lager:debug("Ignoring entry already being fetched: ~p", [Entry]), + {[H | AccToFetch], AccEntries} + end, + etf(T, Node, NMax - 1, Acc1, Acc2). + +not_fetched(List) -> + filter(fun(H) -> case ets:match(?MISSINGENTRIES_TABLE, {H, '$1'}) of + [[[]]] -> false; % Match: Empty list. + _ -> true + end + end, + List). + +insert_at_random_pos(First, Elem, List) -> + Last = length(List) + 1, + case First >= Last of + true -> + List ++ [Elem]; + false -> + case crypto:rand_uniform(First, Last + 1) of + Last -> + List ++ [Elem]; + N -> + {L1, L2} = split(N - 1, List), + L1 ++ [Elem] ++ L2 + end + end. + +%% @doc Update missingentries table Tab, return the list of nodes +%% having the entry. +-spec missingentries_update(binary(), atom()) -> [atom()]. +missingentries_update(Hash, Node) -> + case ets:match(?MISSINGENTRIES_TABLE, {Hash, '$1'}) of + [] -> + missingentries_add(Hash, [Node]); + [[Nodes]] -> + NewNodes = insert_at_random_pos(2, Node, Nodes), + true = ets:update_element(?MISSINGENTRIES_TABLE, Hash, {2, NewNodes}), + NewNodes + end. + +%% @doc Add hash (Hash) [fixme remove: or list of hashes (List)] to missingentries +%% table, with Nodes being the value. +%% We're wrapping adding and testing existence of entries to the table +%% since we're (soon to be) storing differently sized tuples, in order +%% to save one word (8 octets on a 64-bit system) per entry (saves +%% 800MB of RAM in a log with 100M entries). +-spec missingentries_add(binary(), [atom()]) -> [atom()]. +missingentries_add(Hash, Nodes) when is_binary(Hash) -> + ets:insert(?MISSINGENTRIES_TABLE, {Hash, Nodes}), + Nodes. + +-spec init_missingentries(ets:access()) -> ok | {error, term()}. +init_missingentries(ETSAccess) -> + case ets:info(?MISSINGENTRIES_TABLE) of + undefined -> ok; + _ -> ets:delete(?MISSINGENTRIES_TABLE) + end, + ets:new(?MISSINGENTRIES_TABLE, [set, named_table, ETSAccess]), + add_entries(0, index:indexsize(logorder)). + +add_entries(_Start, 0) -> + ok; +add_entries(Start, Count) -> + Chunksize = min(Count, 100000), + Entries = index:getrange(logorder, Start, Start + Chunksize - 1), + lists:foreach(fun (Entry) -> + %% [] means entry has been fetched. + missingentries_add(Entry, []) + end, Entries), + add_entries(Start + Chunksize, Count - Chunksize). + +init_fetchers(StorageNodes) -> + lists:foreach(fun(StorageNode) -> + spawn_link(merge_fetch_fetch, start_link, + [StorageNode]) + end, StorageNodes), + ok. + +%%%%%%%%%%%%%%%%%%%% +%% eunit tests. + +%% First few entries in test/testdata/merge/logorder, in binary form. +-define(LOGORDER_1, hex:hexstr_to_bin("5806F2A019465B1BCA6694DDFF05F99307D3A25CB842123110221802B1CD1813")). +-define(LOGORDER_2, hex:hexstr_to_bin("E87519AE80C0E1385D114FAF7E5BC9C4811FD0E0E155F28C026E25F0F14C7715")). +-define(LOGORDER_3, hex:hexstr_to_bin("224B10E05E3A1E38D34A0122E4B844E838A9D671288345FC77D245246A8D822F")). +-define(LOGORDER_4, hex:hexstr_to_bin("06529FA6ADA25188C527D610BE2FD6C16ED57796ACAA4CCA9B90D1FF2BE8ACD7")). + +test_setup() -> + index:init_module(), + index:start_link(logorder, "test/testdata/merge/logorder"), + + %% Table needs to be public in tests since this is run in a + %% different process than the tests themselves. + ok = init_missingentries(public), + + %% #1 is already fetched. + missingentries_update(?LOGORDER_2, node1), % #2 at node1. + missingentries_update(?LOGORDER_3, node1), % #3 at node1, + missingentries_update(?LOGORDER_3, node2), % ... node2, + missingentries_update(?LOGORDER_3, node3), % ... and node3. + missingentries_update(?LOGORDER_4, node1), + missingentries_update(?LOGORDER_4, node3), + + %% lager:debug("Nodes for LOGORDER_3: ~p", + %% [element(2, hd(ets:match_object(Tab, {?LOGORDER_3, '$1'})))]), + %% lager:debug("ETS size, bytes: ~p", + %% [ets:info(Tab, memory) * + %% erlang:system_info(wordsize)]), + + %% Return ToFetch list. + [ + {?LOGORDER_4, undefined}, % Not being fetched. + {?LOGORDER_2, undefined}, % Not being fetched. + {?LOGORDER_3, node1} % Currently being fetched by node1. + ]. + +test_cleanup(_) -> + true = ets:delete(?MISSINGENTRIES_TABLE), + index:stop(logorder). + +init_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(_ToFetch) -> + [ + ?_assertEqual(1000, ets:info(?MISSINGENTRIES_TABLE, size)), + ?_assertEqual(1, length(not_fetched([crypto:hash(sha256, <<>>)]))), + ?_assertEqual(0, length(not_fetched([?LOGORDER_1]))), + ?_assertEqual(1, length(not_fetched([?LOGORDER_2]))), + ?_assertEqual(2, length(not_fetched([crypto:hash(sha256, <<>>), ?LOGORDER_1, ?LOGORDER_2]))), + ?_assert(member(node1, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), + ?_assert(member(node2, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))), + ?_assert(member(node3, element(2, hd(ets:match_object(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}))))) + ] + end}. + +etf_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [ + ?_assertEqual([?LOGORDER_4, ?LOGORDER_2], element(2, etf(ToFetch, node1, 10))), % node1 can fetch #2 and #4 (in reversed order). + ?_assertEqual([], element(2, etf(ToFetch, node1, 0))), % Limit == 0. + ?_assertEqual([], element(2, etf(ToFetch, node2, 10))), % node2 has nothing to fetch. + ?_assertEqual([?LOGORDER_4], element(2, etf(ToFetch, node3, 10))) % node3 can fetch #4 (but only because we don't remember that #2 got it above). + ] + end}. + +fetchstatus_success_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), + [ + ?_assert(ets:member(?MISSINGENTRIES_TABLE, ?LOGORDER_3)), + ?_assertEqual(3, length(Nodes)), + ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}], + fetch_success(ToFetch, ?LOGORDER_3)), + ?_assertEqual([[[]]], ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) + ] + end}. + +fetchstatus_failure_test_() -> + {setup, fun test_setup/0, fun test_cleanup/1, + fun(ToFetch) -> + [[Nodes]] = ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'}), + [ + ?_assertEqual([{?LOGORDER_4, undefined}, {?LOGORDER_2, undefined}, {?LOGORDER_3, undefined}], + fetch_failure(ToFetch, ?LOGORDER_3)), + ?_assertEqual([[tl(Nodes) ++ [hd(Nodes)]]], + ets:match(?MISSINGENTRIES_TABLE, {?LOGORDER_3, '$1'})) + ] + end}. diff --git a/merge/src/merge_fetch_fetch.erl b/merge/src/merge_fetch_fetch.erl new file mode 100644 index 0000000..b2fadd9 --- /dev/null +++ b/merge/src/merge_fetch_fetch.erl @@ -0,0 +1,31 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_fetch). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init({Name, _Address}) -> + lager:info("~p:~p: starting", [?MODULE, Name]), + {ok, []}. + +%% TODO: if we crash here, we restart all of fetch -- spawn child proc +%% for the actual fetching? + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +terminate(_Reason, _State) -> + lager:info("~p terminating", [?MODULE]), + ok. diff --git a/merge/src/merge_fetch_newentries.erl b/merge/src/merge_fetch_newentries.erl new file mode 100644 index 0000000..b45aaec --- /dev/null +++ b/merge/src/merge_fetch_newentries.erl @@ -0,0 +1,38 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_newentries). +-behaviour(gen_server). + +-export([start_link/1, loop/3]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init({Name, Address, Period}) -> + lager:info("~p:~p starting", [?MODULE, Name]), + ChildPid = spawn_link(?MODULE, loop, [Name, Address, Period]), + {ok, ChildPid}. + +handle_call(stop, _From, ChildPid) -> + lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]), + exit(ChildPid, stop), + {stop, normal, stopped, nil}. + +loop(Name, Address, Period) -> + lager:info("~p:~p: asking storage node at ~p for missing entries", + [?MODULE, Name, Address]), + receive after Period -> ok end, + loop(Name, Address, Period). + +handle_cast(_Request, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +terminate(_Reason, _State) -> + lager:info("~p terminating", [?MODULE]), + ok. diff --git a/merge/src/merge_fetch_newentries_sup.erl b/merge/src/merge_fetch_newentries_sup.erl new file mode 100644 index 0000000..bd33e60 --- /dev/null +++ b/merge/src/merge_fetch_newentries_sup.erl @@ -0,0 +1,30 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_newentries_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(Nodes) -> + {ok, Pid} = + supervisor:start_link({local, ?MODULE}, ?MODULE, []), + Children = + lists:map(fun({NodeName, NodeAddress}) -> + lager:info("starting newentry worker: ~p", [NodeName]), + + {ok, Child} = supervisor:start_child( + ?MODULE, + [{NodeName, NodeAddress, 3000}]), + Child + end, Nodes), + lager:debug("~p started newentry workers: ~p", [Pid, Children]), + {ok, Pid}. + +init([]) -> + {ok, + {{simple_one_for_one, 3, 10}, + [{ignored, + {merge_fetch_newentries, start_link, []}, + permanent, 10000, worker, + [merge_fetch_newentries]}]}}. diff --git a/merge/src/merge_fetch_sup.erl b/merge/src/merge_fetch_sup.erl new file mode 100644 index 0000000..6dd1735 --- /dev/null +++ b/merge/src/merge_fetch_sup.erl @@ -0,0 +1,29 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_fetch_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, Args). + +init(StorageNodes) -> + lager:debug("starting with storage nodes: ~p", [StorageNodes]), + {ok, + {{one_for_all, 3, 10}, + [ + {merge_fetch_newentries_sup, + {merge_fetch_newentries_sup, start_link, [StorageNodes]}, + transient, infinity, supervisor, + [merge_fetch_newentries_sup]}, + {merge_fetch_ctrl, + {merge_fetch_ctrl, start_link, [StorageNodes]}, + permanent, 10000, worker, + [merge_fetch_ctrl]}, + {merge_fetch_fetch_sup, + {merge_fetch_fetch_sup, start_link, [StorageNodes]}, + transient, infinity, supervisor, + [merge_fetch_fetch_sup]} + ]}}. -- cgit v1.1