summaryrefslogtreecommitdiff
path: root/merge/src/merge_dist.erl
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2017-02-02 02:25:42 +0100
committerLinus Nordberg <linus@nordu.net>2017-02-02 02:25:42 +0100
commit0070a4f70dd78f1f8aacb0657c741a2c311a7f32 (patch)
treeb194d02cd11519ca20d161c939ab024094b6e0a6 /merge/src/merge_dist.erl
parent829ab97fccb991832445862ec8246197a225ecec (diff)
Parallelised merge, backup phase.
Diffstat (limited to 'merge/src/merge_dist.erl')
-rw-r--r--merge/src/merge_dist.erl90
1 files changed, 12 insertions, 78 deletions
diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl
index 8d3dc2b..4aa94aa 100644
--- a/merge/src/merge_dist.erl
+++ b/merge/src/merge_dist.erl
@@ -75,7 +75,8 @@ dist({struct, PropList} = STH,
State#state{timer = erlang:start_timer(Wait * 1000, self(), dist),
sth_timestamp = TS}}.
-%% @doc Has nonlocal return because of throw further down in do_request/4.
+%% @doc Has nonlocal return because of throw further down in
+%% merge_util:request/4.
do_dist(NodeAddress, Size) ->
{ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress),
true = Size >= VerifiedSize,
@@ -83,70 +84,18 @@ do_dist(NodeAddress, Size) ->
do_dist(_, _, 0) ->
ok;
-do_dist(NodeAddress, Size, NTotal) ->
+do_dist(NodeAddress, Start, NTotal) ->
DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000),
N = min(DistMaxWindow, NTotal),
- Hashes = index:getrange(logorder, Size, Size + N - 1),
- ok = frontend_sendlog(NodeAddress, Size, Hashes),
- ok = frontend_send_missing_entries(NodeAddress, Hashes),
- {ok, NewSize} = frontend_verify_entries(NodeAddress, Size + N),
- lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Size]),
- true = NTotal >= NewSize - Size,
- do_dist(NodeAddress, NewSize, NTotal - (NewSize - Size)).
-
-frontend_sendlog(NodeAddress, Start, Hashes) ->
+ Hashes = index:getrange(logorder, Start, Start + N - 1),
SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),
- frontend_sendlog_chunk(NodeAddress, Start, lists:split(min(SendlogChunksize, length(Hashes)), Hashes), SendlogChunksize).
-
-frontend_sendlog_chunk(_, _, {[], _}, _) ->
- ok;
-frontend_sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) ->
- ok = frontend_sendlog_request(NodeAddress, Start, Chunk),
- frontend_sendlog_chunk(NodeAddress, Start + length(Chunk),
- lists:split(min(Chunksize, length(Rest)), Rest), Chunksize).
-
-frontend_sendlog_request(NodeAddress, Start, Hashes) ->
- DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]),
- URL = NodeAddress ++ "sendlog",
- Headers = [{"Content-Type", "text/json"}],
- EncodedHashes = [base64:encode(H) || H <- Hashes],
- RequestBody = list_to_binary(mochijson2:encode({[{"start", Start},
- {"hashes", EncodedHashes}]})),
- case do_request(DebugTag, URL, Headers, RequestBody) of
- {<<"ok">>, _} ->
- ok;
- Err ->
- throw({request_error, result, DebugTag, Err})
- end.
-
-frontend_send_missing_entries(NodeAddress, Hashes) ->
SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),
- {ChunkOfHashes, RestOfHashes} = lists:split(min(SendentriesChunksize, length(Hashes)), Hashes),
- frontend_send_entries_chunk(NodeAddress,
- {ChunkOfHashes, RestOfHashes},
- SendentriesChunksize).
-
-frontend_send_entries_chunk(_, {[], _}, _) ->
- ok;
-frontend_send_entries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) ->
- HashesAndEntries = lists:zip(Chunk, [db:entry_for_leafhash(H) || H <- Chunk]),
- ok = frontend_send_entries_request(NodeAddress, HashesAndEntries),
- frontend_send_entries_chunk(NodeAddress,
- lists:split(min(Chunksize, length(Rest)), Rest),
- Chunksize).
-
-frontend_send_entries_request(NodeAddress, HashesAndEntries) ->
- DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]),
- URL = NodeAddress ++ "sendentry",
- Headers = [{"Content-Type", "text/json"}],
- L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]),
- RequestBody = list_to_binary(L),
- case do_request(DebugTag, URL, Headers, RequestBody) of
- {<<"ok">>, _} ->
- ok;
- Err ->
- throw({request_error, result, DebugTag, Err})
- end.
+ ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize),
+ ok = merge_util:sendentries(NodeAddress, Hashes, SendentriesChunksize),
+ {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N),
+ lager:info("~p: Done distributing ~B entries.", [NodeAddress, NewSize-Start]),
+ true = NTotal >= NewSize - Start,
+ do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)).
frontend_get_verifiedsize(NodeAddress) ->
frontend_verify_entries(NodeAddress, 0).
@@ -156,7 +105,7 @@ frontend_verify_entries(NodeAddress, Size) ->
URL = NodeAddress ++ "verify-entries",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})),
- case do_request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, Headers, RequestBody) of
{<<"ok">>, PropList} ->
{ok, proplists:get_value(<<"verified">>, PropList)};
Err ->
@@ -168,24 +117,9 @@ publish_sth(NodeAddress, STH) ->
URL = NodeAddress ++ "publish-sth",
Headers = [{"Content-Type", "text/json"}],
RequestBody = list_to_binary(mochijson2:encode(STH)),
- case do_request(DebugTag, URL, Headers, RequestBody) of
+ case merge_util:request(DebugTag, URL, Headers, RequestBody) of
{<<"ok">>, _} ->
ok;
Err ->
throw({request_error, result, DebugTag, Err})
end.
-
-do_request(DebugTag, URL, Headers, RequestBody) ->
- case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
- {error, Err} ->
- throw({request_error, request, DebugTag, Err});
- {failure, {none, StatusCode, none}, _RespHeaders, _Body} ->
- throw({request_error, failure, DebugTag, StatusCode});
- {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
- case (catch mochijson2:decode(Body)) of
- {error, Err} ->
- throw({request_error, decode, DebugTag, Err});
- {struct, PropList} ->
- {proplists:get_value(<<"result">>, PropList), PropList}
- end
- end.