1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.
-module(merge_backup).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
-record(state, {
timer :: reference(),
node_name :: string(),
node_address :: string()
}).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
init([Name, Address]) ->
lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),
Timer = erlang:start_timer(1000, self(), backup),
{ok, #state{timer = Timer, node_name = Name, node_address = Address}}.
handle_call(stop, _From, State) ->
{stop, normal, stopped, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({timeout, _Timer, backup}, State) ->
backup(merge_util:readfile(fetched_path), State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(Reason, #state{timer = Timer}) ->
lager:info("~p terminating: ~p", [?MODULE, Reason]),
erlang:cancel_timer(Timer),
ok.
%%%%%%%%%%%%%%%%%%%%
backup(noentry, State) ->
{noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}};
backup({struct, Fetched}, State) ->
Index = proplists:get_value(<<"index">>, Fetched),
Hash = proplists:get_value(<<"hash">>, Fetched),
backup(Index, Hash, State).
backup(-1, _, State) ->
{noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}};
backup(Index, Hash,
#state{node_name = NodeName, node_address = NodeAddress} = State) ->
ok = merge_util:verify_logorder_and_fetched_consistency(Index, Hash),
Size = index:indexsize(logorder),
lager:debug("~p: logorder size ~B", [NodeName, Size]),
ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".
try
{ok, VerifiedSize} = verified_size(NodeAddress),
lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
case VerifiedSize == Size of
true ->
TreeHead = ht:root(Size - 1),
ok = check_root(NodeAddress, Size, TreeHead),
ok = write_backupfile(NodeName, Size, TreeHead);
false ->
true = VerifiedSize < Size, % Secondary ahead of primary?
ok = do_backup(NodeName, NodeAddress, VerifiedSize, Size - VerifiedSize)
end
catch
throw:{request_error, SubErrType, DebugTag, Error} ->
lager:error("~p: ~p: ~p", [DebugTag, SubErrType, Error])
end,
Wait = max(1, round(application:get_env(plop, merge_delay, 600) / 10)),
{noreply,
State#state{timer = erlang:start_timer(Wait * 1000, self(), backup)}}.
do_backup(_, _, _, 0) ->
ok;
do_backup(NodeName, NodeAddress, Start, NTotal) ->
N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),
Hashes = index:getrange(logorder, Start, Start + N - 1),
ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)),
ok = merge_util:sendentries(NodeAddress, Hashes, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),
Size = Start + N,
TreeHead = ht:root(Size - 1),
ok = check_root(NodeAddress, Size, TreeHead),
ok = setverifiedsize(NodeAddress, Size),
ok = write_backupfile(NodeName, Size, TreeHead),
true = NTotal >= N,
do_backup(NodeName, NodeAddress, Size, NTotal - N).
write_backupfile(NodeName, TreeSize, TreeHead) ->
{ok, BasePath} = application:get_env(plop, verified_path),
Path = BasePath ++ "." ++ NodeName,
Content = mochijson2:encode({[{"tree_size", TreeSize},
{"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}),
atomic:replacefile(Path, Content).
check_root(NodeAddress, Size, TreeHead) ->
{ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size),
case TreeHeadToVerify == TreeHead of
true ->
ok;
false ->
lager:error("~p: ~B: secondary merge root ~p != ~p",
[NodeAddress, Size, TreeHeadToVerify, TreeHead]),
root_mismatch
end.
verifyroot(NodeAddress, TreeSize) ->
DebugTag = io_lib:format("verifyroot ~B", [TreeSize]),
URL = NodeAddress ++ "verifyroot",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})),
case merge_util:request(DebugTag, URL, Headers, RequestBody) of
{<<"ok">>, PropList} ->
{ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))};
Err ->
throw({request_error, result, DebugTag, Err})
end.
verified_size(NodeAddress) ->
DebugTag = "verifiedsize",
URL = NodeAddress ++ "verifiedsize",
case merge_util:request(DebugTag, URL) of
{<<"ok">>, PropList} ->
{ok, proplists:get_value(<<"size">>, PropList)};
Err ->
throw({request_error, result, DebugTag, Err})
end.
setverifiedsize(NodeAddress, Size) ->
DebugTag = io_lib:format("setverifiedsize ~B", [Size]),
URL = NodeAddress ++ "setverifiedsize",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})),
case merge_util:request(DebugTag, URL, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
throw({request_error, result, DebugTag, Err})
end.
|