summaryrefslogtreecommitdiff
path: root/merge/src/merge_fetch_fetch.erl
blob: 2ba7e9ac8bfcd5731897e0ccba577b0367ee83aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
%%% Copyright (c) 2017, NORDUnet A/S.
%%% See LICENSE for licensing information.

-module(merge_fetch_fetch).
-behaviour(gen_server).

-export([start_link/1]).
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
         code_change/3]).
-export([loop/2]).

start_link(Args) ->
    gen_server:start_link(?MODULE, Args, []).

init({Name, Address}) ->
    lager:info("~p:~p starting", [?MODULE, Name]),
    ChildPid = spawn_link(?MODULE, loop, [Name, Address]),
    {ok, ChildPid}.

decode_entry({struct, EncodedEntry}) ->
    Hash = base64:decode(proplists:get_value(<<"hash">>, EncodedEntry)),
    Entry = base64:decode(proplists:get_value(<<"entry">>, EncodedEntry)),
    {Hash, Entry}.

get_entries(_, _, []) ->
    {ok, []};
get_entries(NodeName, NodeAddress, Hashes) ->
    DebugTag = "getentry",
    URL = NodeAddress ++ "getentry",
    EncodedHashes = lists:map(fun (H) -> {"hash", base64:encode(H)} end, Hashes),
    Params = hackney_url:qs(EncodedHashes),
    URLWithParams = [URL, "?", Params],
    case merge_util:request(DebugTag, binary_to_list(iolist_to_binary(URLWithParams)), NodeName) of
        {<<"ok">>, PropList} ->
            Entries = lists:map(fun (S) -> decode_entry(S) end, proplists:get_value(<<"entries">>, PropList)),
            {ok, Entries};
        Err ->
            throw({request_error, result, DebugTag, Err})
    end.

loop(Name, Address) ->
    receive after 1000 -> ok end,
    lager:info("~p:~p: asking for entries to get from ~p",
              [?MODULE, Name, Address]),
    Hashes = merge_fetch_ctrl:entriestofetch(Name, 10),
    {ok, Entries} = get_entries(Name, Address, Hashes),
    lists:foreach(fun ({Hash, Entry}) ->

                          try
                              case plop:verify_entry(Entry) of
                                  {ok, Hash} ->
                                      ok;
                                  {ok, DifferentLeafHash} ->
                                      lager:error("leaf hash not correct: requested hash is ~p " ++
                                                      "and contents are ~p",
                                                  [hex:bin_to_hexstr(Hash),
                                                   hex:bin_to_hexstr(DifferentLeafHash)]),
                                      throw({request_error, result, "", differentleafhash});
                                  {error, Reason} ->
                                      lager:error("verification failed: ~p", [Reason]),
                                      throw({request_error, result, "", verificationfailed})
                              end
                          catch
                              Type:What ->
                                  [CrashFunction | Stack] = erlang:get_stacktrace(),
                                  lager:error("Crash: ~p ~p~n~p~n~p~n",
                                              [Type, What, CrashFunction, Stack]),
                                  throw(What)
                          end,
                          
                          db:add_entry_sync(Hash, Entry),

                          merge_fetch_ctrl:fetchstatus(Hash, success),
                  end, Entries),
    loop(Name, Address).


%% TODO: if we crash here, we restart all of fetch -- spawn child proc
%% for the actual fetching?

handle_call(stop, _From, ChildPid) ->
    lager:info("~p: stopping child process ~p", [?MODULE, ChildPid]),
    exit(ChildPid, stop),
    {stop, normal, stopped, nil}.
handle_cast(_Request, State) ->
    {noreply, State}.
handle_info(_Info, State) ->
    {noreply, State}.
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.
terminate(_Reason, _State) ->
    lager:info("~p terminating", [?MODULE]),
    ok.