%%% Copyright (c) 2017, NORDUnet A/S. %%% See LICENSE for licensing information. -module(merge_backup). -behaviour(gen_server). -export([start_link/1]). -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -record(state, { timer :: reference(), node_name :: string(), node_address :: string() }). start_link(Args) -> gen_server:start_link(?MODULE, Args, []). init([Name, Address]) -> lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]), Timer = erlang:start_timer(1000, self(), backup), {ok, #state{timer = Timer, node_name = Name, node_address = Address}}. handle_call(stop, _From, State) -> {stop, normal, stopped, State}. handle_cast(_Request, State) -> {noreply, State}. handle_info({timeout, _Timer, backup}, State) -> backup(merge_util:nfetched(), State). code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(Reason, #state{timer = Timer}) -> lager:info("~p terminating: ~p", [?MODULE, Reason]), erlang:cancel_timer(Timer), ok. %%%%%%%%%%%%%%%%%%%% backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) -> lager:debug("~p: logorder size ~B", [NodeName, Size]), ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards". try {ok, VerifiedSize} = verified_size(NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), case VerifiedSize == Size of true -> TreeHead = ht:root(Size - 1), ok = check_root(NodeAddress, Size, TreeHead), ok = write_backupfile(NodeName, Size, TreeHead); false -> true = VerifiedSize < Size, % Secondary ahead of primary? ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize) end catch throw:{request_error, SubErrType, DebugTag, Error} -> lager:error("~s: ~p: ~p", [DebugTag, SubErrType, Error]) end, Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)), {noreply, State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}. do_backup(_, _, _, 0) -> ok; do_backup(NodeName, NodeAddress, Start, NTotal) -> N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), Hashes = index:getrange(logorder, Start, Start + N - 1), ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), Size = Start + N, TreeHead = ht:root(Size - 1), ok = check_root(NodeAddress, Size, TreeHead), ok = setverifiedsize(NodeAddress, Size), ok = write_backupfile(NodeName, Size, TreeHead), true = NTotal >= N, do_backup(NodeName, NodeAddress, Size, NTotal - N). write_backupfile(NodeName, TreeSize, TreeHead) -> {ok, BasePath} = application:get_env(plop, verified_path), Path = BasePath ++ "." ++ NodeName, Content = mochijson2:encode({[{"tree_size", TreeSize}, {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}), atomic:replacefile(Path, Content). check_root(NodeAddress, Size, TreeHead) -> {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), case TreeHeadToVerify == TreeHead of true -> ok; false -> lager:error("~p: ~B: secondary merge root ~p != ~p", [NodeAddress, Size, TreeHeadToVerify, TreeHead]), root_mismatch end. verifyroot(NodeAddress, TreeSize) -> DebugTag = io_lib:format("verifyroot ~B", [TreeSize]), URL = NodeAddress ++ "verifyroot", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))}; Err -> throw({request_error, result, DebugTag, Err}) end. verified_size(NodeAddress) -> DebugTag = "verifiedsize", URL = NodeAddress ++ "verifiedsize", case merge_util:request(DebugTag, URL) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"size">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. setverifiedsize(NodeAddress, Size) -> DebugTag = io_lib:format("setverifiedsize ~B", [Size]), URL = NodeAddress ++ "setverifiedsize", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end.