diff options
author | Linus Nordberg <linus@nordu.net> | 2017-02-02 02:25:42 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2017-02-02 02:25:42 +0100 |
commit | 0070a4f70dd78f1f8aacb0657c741a2c311a7f32 (patch) | |
tree | b194d02cd11519ca20d161c939ab024094b6e0a6 /merge/src/merge_dist.erl | |
parent | 829ab97fccb991832445862ec8246197a225ecec (diff) |
Parallelised merge, backup phase.
Diffstat (limited to 'merge/src/merge_dist.erl')
-rw-r--r-- | merge/src/merge_dist.erl | 90 |
1 files changed, 12 insertions, 78 deletions
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index 8d3dc2b..4aa94aa 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -75,7 +75,8 @@ dist({struct, PropList} = STH, State#state{timer = erlang:start_timer(Wait * 1000, self(), dist), sth_timestamp = TS}}. -%% @doc Has nonlocal return because of throw further down in do_request/4. +%% @doc Has nonlocal return because of throw further down in +%% merge_util:request/4. do_dist(NodeAddress, Size) -> {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), true = Size >= VerifiedSize, @@ -83,70 +84,18 @@ do_dist(NodeAddress, Size) -> do_dist(_, _, 0) -> ok; -do_dist(NodeAddress, Size, NTotal) -> +do_dist(NodeAddress, Start, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), - Hashes = index:getrange(logorder, Size, Size + N - 1), - ok = frontend_sendlog(NodeAddress, Size, Hashes), - ok = frontend_send_missing_entries(NodeAddress, Hashes), - {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N), - lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]), - true = NTotal >= NewSize - Size, - do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)). - -frontend_sendlog(NodeAddress, Start, Hashes) -> + Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), - frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize). - -frontend_sendlog_chunk(_, _, {[], _}, _) -> - ok; -frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> - ok = frontend_sendlog_request(NodeAddress, Start, Chunk), - frontend_sendlog_chunk(NodeAddress, Start + length(Chunk), - lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). - -frontend_sendlog_request(NodeAddress, Start, Hashes) -> - DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), - URL = NodeAddress ++ "sendlog", - Headers = [{"Content-Type", "text/json"}], - EncodedHashes = [base64:encode(H) || H <- Hashes], - RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, - {"hashes", EncodedHashes}]})), - case do_request(DebugTag, URL, Headers, RequestBody) of - {<<"ok">>, _} -> - ok; - Err -> - throw({request_error, result, DebugTag, Err}) - end. - -frontend_send_missing_entries(NodeAddress, Hashes) -> SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), - {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes), - frontend_send_entries_chunk(NodeAddress, - {ChunkOfHashes, RestOfHashes}, - SendentriesChunksize). - -frontend_send_entries_chunk(_, {[], _}, _) -> - ok; -frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> - HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]), - ok = frontend_send_entries_request(NodeAddress, HashesAndEntries), - frontend_send_entries_chunk(NodeAddress, - lists:split(min(Chunksize, length(Rest)), Rest), - Chunksize). - -frontend_send_entries_request(NodeAddress, HashesAndEntries) -> - DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), - URL = NodeAddress ++ "sendentry", - Headers = [{"Content-Type", "text/json"}], - L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), - RequestBody = list_to_binary(L), - case do_request(DebugTag, URL, Headers, RequestBody) of - {<<"ok">>, _} -> - ok; - Err -> - throw({request_error, result, DebugTag, Err}) - end. + ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), + ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize), + {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), + lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]), + true = NTotal >= NewSize - Start, + do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). frontend_get_verifiedsize(NodeAddress) -> frontend_verify_entries(NodeAddress, 0). @@ -156,7 +105,7 @@ frontend_verify_entries(NodeAddress, Size) -> URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), - case do_request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> @@ -168,24 +117,9 @@ publish_sth(NodeAddress, STH) -> URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), - case do_request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. - -do_request(DebugTag, URL, Headers, RequestBody) -> - case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of - {error, Err} -> - throw({request_error, request, DebugTag, Err}); - {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> - throw({request_error, failure, DebugTag, StatusCode}); - {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> - case (catch mochijson2:decode(Body)) of - {error, Err} -> - throw({request_error, decode, DebugTag, Err}); - {struct, PropList} -> - {proplists:get_value(<<"result">>, PropList), PropList} - end - end. |