diff options
Diffstat (limited to 'merge/src/merge_dist.erl')
-rw-r--r-- | merge/src/merge_dist.erl | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl new file mode 100644 index 0000000..25e13ec --- /dev/null +++ b/merge/src/merge_dist.erl @@ -0,0 +1,129 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(merge_dist). +-behaviour(gen_server). + +-export([start_link/1]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-record(state, { + timer :: reference(), + node_address :: string(), + sth_timestamp :: non_neg_integer() + }). + +start_link(Args) -> + gen_server:start_link(?MODULE, Args, []). + +init(Node) -> + lager:info("~p:~p: starting", [?MODULE, Node]), + Timer = erlang:start_timer(1000, self(), dist), + {ok, #state{timer = Timer, + node_address = Node, + sth_timestamp = 0}}. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}. +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, _Timer, dist}, State) -> + dist(plop:sth(), State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{timer = Timer}) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + erlang:cancel_timer(Timer), + ok. + +%%%%%%%%%%%%%%%%%%%% + +dist(noentry, State) -> + Timer = erlang:start_timer(1000, self(), dist), + {noreply, State#state{timer = Timer}}; +dist({struct, PropList} = STH, + #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> + Treesize = proplists:get_value(<<"tree_size">>, PropList), + Timestamp = proplists:get_value(<<"timestamp">>, PropList), + RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), + Signature = base64:decode(proplists:get_value(<<"tree_head_signature">>, PropList)), + Logordersize = index:indexsize(logorder), + TS = case Timestamp > LastTimestamp of + true -> + true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), + try + lager:info("~p: starting dist, sth at ~B, logorder at ~B", + [NodeAddress, Treesize, Logordersize]), + ok = do_dist(NodeAddress, min(Treesize, Logordersize)), + ok = publish_sth(NodeAddress, STH), + lager:info("~p: Published STH with size ~B and timestamp " ++ + "~p.", [NodeAddress, Treesize, Timestamp]), + Timestamp + catch + throw:{request_error, SubErrType, DebugTag, Error} -> + lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error]), + LastTimestamp + end; + false -> + lager:debug("~p: STH timestamp ~p <= ~p, waiting.", + [NodeAddress, Timestamp, LastTimestamp]), + LastTimestamp + end, + Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 60)), + {noreply, + State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), + sth_timestamp = TS}}. + +%% @doc Has nonlocal return because of throw further down in +%% merge_util:request/4. +do_dist(NodeAddress, Size) -> + {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), + lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]), + true = VerifiedSize =< Size, + do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). + +do_dist(_, _, 0) -> + ok; +do_dist(NodeAddress, Start, NTotal) -> + DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), + N = min(DistMaxWindow, NTotal), + Hashes = index:getrange(logorder, Start, Start + N - 1), + SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), + SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), + ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), + ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize), + {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), + lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]), + true = NTotal >= NewSize - Start, + do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). + +frontend_get_verifiedsize(NodeAddress) -> + frontend_verify_entries(NodeAddress, 0). + +frontend_verify_entries(NodeAddress, Size) -> + DebugTag = io_lib:format("verify-entries ~B", [Size]), + URL = NodeAddress ++ "verify-entries", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, PropList} -> + {ok, proplists:get_value(<<"verified">>, PropList)}; + Err -> + throw({request_error, result, DebugTag, Err}) + end. + +publish_sth(NodeAddress, STH) -> + DebugTag = "publish-sth", + URL = NodeAddress ++ "publish-sth", + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary(mochijson2:encode(STH)), + case merge_util:request(DebugTag, URL, Headers, RequestBody) of + {<<"ok">>, _} -> + ok; + Err -> + throw({request_error, result, DebugTag, Err}) + end. |