1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.
-module(merge_fetch_fetch).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
-export([loop/2]).
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
init({Name, Address}) ->
lager:info("~p:~p starting", [?MODULE, Name]),
ChildPid = spawn_link(?MODULE, loop, [Name, Address]),
{ok, ChildPid}.
decode_entry({struct, EncodedEntry}) ->
Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)),
Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)),
{Hash, Entry}.
get_entries(_, _, []) ->
{ok, []};
get_entries(NodeName, NodeAddress, Hashes) ->
DebugTag = "getentry",
URL = NodeAddress ++ "getentry",
EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes),
Params = hackney_url:qs(EncodedHashes),
URLWithParams = [URL, "?", Params],
case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of
{<<"ok">>, PropList} ->
Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)),
{ok, Entries};
Err ->
throw({request_error, result, DebugTag, Err})
end.
loop(Name, Address) ->
receive after 1000 -> ok end,
lager:info("~p:~p: asking for entries to get from ~p",
[?MODULE, Name, Address]),
Hashes = merge_fetch_ctrl:entriestofetch(Name, 10),
{ok, Entries} = get_entries(Name, Address, Hashes),
lists:foreach(fun ({Hash, Entry}) ->
try
case plop:verify_entry(Entry) of
{ok, Hash} ->
ok;
{ok, DifferentLeafHash} ->
lager:error("leaf hash not correct: requested hash is ~p " ++
"and contents are ~p",
[hex:bin_to_hexstr(Hash),
hex:bin_to_hexstr(DifferentLeafHash)]),
throw({request_error, result, "", differentleafhash});
{error, Reason} ->
lager:error("verification failed: ~p", [Reason]),
throw({request_error, result, "", verificationfailed})
end
catch
Type:What ->
[CrashFunction | Stack] = erlang:get_stacktrace(),
lager:error("Crash: ~p ~p~n~p~n~p~n",
[Type, What, CrashFunction, Stack]),
throw(What)
end,
db:add_entry_sync(Hash, Entry),
merge_fetch_ctrl:fetchstatus(Hash, success),
end, Entries),
loop(Name, Address).
%% TODO: if we crash here, we restart all of fetch -- spawn child proc
%% for the actual fetching?
handle_call(stop, _From, ChildPid) ->
lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
exit(ChildPid, stop),
{stop, normal, stopped, nil}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, _State) ->
lager:info("~p terminating", [?MODULE]),
ok.
|