diff options
author | Linus Nordberg <linus@nordu.net> | 2017-03-15 17:17:58 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2017-03-15 17:17:58 +0100 |
commit | 598d6ae6d00644c7f6e318cf5a4928ee5a8eb9ca (patch) | |
tree | 7653518afa13e7ccc1a3925603d36d85570cb88c /merge | |
parent | 7e41c7c7630c4a96567029e6b4d7688a7df6ccee (diff) | |
parent | 8bb572816040a8ecda50be9687cd1ddc76436f65 (diff) |
Merge branch 'map-statusserver'
Diffstat (limited to 'merge')
-rw-r--r-- | merge/src/merge_backup.erl | 31 | ||||
-rw-r--r-- | merge/src/merge_dist.erl | 44 | ||||
-rw-r--r-- | merge/src/merge_sth.erl | 2 | ||||
-rw-r--r-- | merge/src/merge_util.erl | 50 |
4 files changed, 70 insertions, 57 deletions
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index bf20f23..068725c 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -44,12 +44,12 @@ backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) - lager:debug("~p: logorder size ~B", [NodeName, Size]), ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards". try - {ok, VerifiedSize} = verified_size(NodeAddress), + {ok, VerifiedSize} = verified_size(NodeName, NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), case VerifiedSize == Size of true -> TreeHead = ht:root(Size - 1), - ok = check_root(NodeAddress, Size, TreeHead), + ok = check_root(NodeName, NodeAddress, Size, TreeHead), ok = write_backupfile(NodeName, Size, TreeHead); false -> true = VerifiedSize < Size, % Secondary ahead of primary? @@ -68,27 +68,28 @@ do_backup(_, _, _, 0) -> do_backup(NodeName, NodeAddress, Start, NTotal) -> N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), Hashes = index:getrange(logorder, Start, Start + N - 1), - ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), - {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), + ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), + {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), - ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), + ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), Size = Start + N, TreeHead = ht:root(Size - 1), - ok = check_root(NodeAddress, Size, TreeHead), - ok = setverifiedsize(NodeAddress, Size), + ok = check_root(NodeName, NodeAddress, Size, TreeHead), + ok = setverifiedsize(NodeName, NodeAddress, Size), ok = write_backupfile(NodeName, Size, TreeHead), true = NTotal >= N, do_backup(NodeName, NodeAddress, Size, NTotal - N). write_backupfile(NodeName, TreeSize, TreeHead) -> + statusreport:report("merge_backup", NodeName, "verified", TreeSize), {ok, BasePath} = application:get_env(plop, verified_path), Path = BasePath ++ "." ++ NodeName, Content = mochijson2:encode({[{"tree_size", TreeSize}, {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}), atomic:replacefile(Path, Content). -check_root(NodeAddress, Size, TreeHead) -> - {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), +check_root(NodeName, NodeAddress, Size, TreeHead) -> + {ok, TreeHeadToVerify} = verifyroot(NodeName, NodeAddress, Size), case TreeHeadToVerify == TreeHead of true -> ok; @@ -98,34 +99,34 @@ check_root(NodeAddress, Size, TreeHead) -> root_mismatch end. -verifyroot(NodeAddress, TreeSize) -> +verifyroot(NodeName, NodeAddress, TreeSize) -> DebugTag = io_lib:format("verifyroot ~B", [TreeSize]), URL = NodeAddress ++ "verifyroot", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))}; Err -> throw({request_error, result, DebugTag, Err}) end. -verified_size(NodeAddress) -> +verified_size(NodeName, NodeAddress) -> DebugTag = "verifiedsize", URL = NodeAddress ++ "verifiedsize", - case merge_util:request(DebugTag, URL) of + case merge_util:request(DebugTag, URL, NodeName) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"size">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -setverifiedsize(NodeAddress, Size) -> +setverifiedsize(NodeName, NodeAddress, Size) -> DebugTag = io_lib:format("setverifiedsize ~B", [Size]), URL = NodeAddress ++ "setverifiedsize", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index f8f0c7c..3c38401 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -48,7 +48,9 @@ dist(noentry, State) -> Timer = erlang:start_timer(1000, self(), dist), {noreply, State#state{timer = Timer}}; dist({struct, PropList} = STH, - #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> + #state{node_address = NodeAddress, + node_name = NodeName, + sth_timestamp = LastTimestamp} = State) -> Treesize = proplists:get_value(<<"tree_size">>, PropList), Timestamp = proplists:get_value(<<"timestamp">>, PropList), RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), @@ -60,8 +62,10 @@ dist({struct, PropList} = STH, try lager:info("~p: starting dist, sth at ~B, logorder at ~B", [NodeAddress, Treesize, Logordersize]), - ok = do_dist(NodeAddress, min(Treesize, Logordersize)), - ok = publish_sth(NodeAddress, STH), + statusreport:report("merge_dist", NodeName, "targetsth", Treesize), + ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)), + ok = publish_sth(NodeName, NodeAddress, STH), + statusreport:report("merge_dist", NodeName, "sth", Treesize), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), Timestamp @@ -82,52 +86,54 @@ dist({struct, PropList} = STH, %% @doc Has nonlocal return because of throw further down in %% merge_util:request/4. -do_dist(NodeAddress, Size) -> - {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), +do_dist(NodeAddress, NodeName, Size) -> + {ok, VerifiedSize} = frontend_get_verifiedsize(NodeName, NodeAddress), lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]), true = VerifiedSize =< Size, - do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). + do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize). -do_dist(_, _, 0) -> +do_dist(_, _, _, 0) -> ok; -do_dist(NodeAddress, Start, NTotal) -> +do_dist(NodeAddress, NodeName, Start, NTotal) -> DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000), N = min(DistMaxWindow, NTotal), Hashes = index:getrange(logorder, Start, Start + N - 1), SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), - ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), - {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), + ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize), + statusreport:report("merge_dist", NodeName, "sendlog", Start + N), + {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), - ok = merge_util:sendentries(NodeAddress, HashesMissing, SendentriesChunksize), - {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), + ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize), + {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N), lager:info("~p: Done distributing ~B out of ~B entries.", [NodeAddress, NewSize-Start, NTotal]), + statusreport:report("merge_dist", NodeName, "verified", Start + N), true = NTotal >= NewSize - Start, - do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). + do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)). -frontend_get_verifiedsize(NodeAddress) -> - frontend_verify_entries(NodeAddress, 0). +frontend_get_verifiedsize(NodeName, NodeAddress) -> + frontend_verify_entries(NodeName, NodeAddress, 0). -frontend_verify_entries(NodeAddress, Size) -> +frontend_verify_entries(NodeName, NodeAddress, Size) -> DebugTag = io_lib:format("verify-entries ~B", [Size]), URL = NodeAddress ++ "verify-entries", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"verified">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -publish_sth(NodeAddress, STH) -> +publish_sth(NodeName, NodeAddress, STH) -> DebugTag = "publish-sth", URL = NodeAddress ++ "publish-sth", Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary(mochijson2:encode(STH)), - case merge_util:request(DebugTag, URL, Headers, RequestBody) of + case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl index ab1cd8f..4b77864 100644 --- a/merge/src/merge_sth.erl +++ b/merge/src/merge_sth.erl @@ -66,6 +66,7 @@ make_sth(CurSize, State) -> Wait = case NewSize < CurSize of true -> + statusreport:report("merge_sth", http_auth:own_name(), "sth", null), lager:debug("waiting for enough backups to reach ~B, now at ~B", [CurSize, NewSize]), 1; @@ -90,6 +91,7 @@ do_make_sth(Size) -> {"sha256_root_hash", base64:encode(NewRoot)}, {"tree_head_signature", base64:encode(PackedSignature)}], ok = plop:save_sth({struct, NewSTH}), + statusreport:report("merge_sth", http_auth:own_name(), "sth", Size), ok; false -> lager:error("The signature we got for new tree of size ~B doesn't " ++ diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl index 7598e40..c76d05f 100644 --- a/merge/src/merge_util.erl +++ b/merge/src/merge_util.erl @@ -2,78 +2,82 @@ %%% See LICENSE for licensing information. -module(merge_util). --export([sendlog/4, sendentries/3, missingentries/1]). --export([request/2, request/4]). +-export([sendlog/5, sendentries/4, missingentries/2]). +-export([request/3, request/5]). -export([readfile/1, nfetched/0]). -request(DebugTag, URL) -> - request(DebugTag, URL, [], <<>>). +request(DebugTag, URL, NodeName) -> + request(DebugTag, URL, NodeName, [], <<>>). -request(DebugTag, URL, Headers, RequestBody) -> +request(DebugTag, URL, NodeName, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> + statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(io_lib:format("~w", [Err]))), throw({request_error, request, DebugTag, Err}); {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + statusreport:report_multi("merge_errors", NodeName, "http_error", StatusCode), throw({request_error, failure, DebugTag, StatusCode}); {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> case (catch mochijson2:decode(Body)) of {error, Err} -> + statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(Err)), throw({request_error, decode, DebugTag, Err}); {struct, PropList} -> + statusreport:report_multi("merge_errors", NodeName, "http_error", 200), {proplists:get_value(<<"result">>, PropList), PropList} end end. -sendlog(NodeAddress, Start, Hashes, Chunksize) -> +sendlog(NodeAddress, NodeName, Start, Hashes, Chunksize) -> lager:debug("sending log: start=~B, N=~B, chunksize=~B", [Start, length(Hashes), Chunksize]), - sendlog_chunk(NodeAddress, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). + sendlog_chunk(NodeAddress, NodeName, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). -sendlog_chunk(_, _, {[], _}, _) -> +sendlog_chunk(_, _, _, {[], _}, _) -> ok; -sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> +sendlog_chunk(NodeAddress, NodeName, Start, {Chunk, Rest}, Chunksize) -> lager:debug("sending log chunk: start=~B, N=~B", [Start, length(Chunk)]), - ok = sendlog_request(NodeAddress, Start, Chunk), - sendlog_chunk(NodeAddress, Start + length(Chunk), + ok = sendlog_request(NodeAddress, NodeName, Start, Chunk), + sendlog_chunk(NodeAddress, NodeName, Start + length(Chunk), lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). -sendlog_request(NodeAddress, Start, Hashes) -> +sendlog_request(NodeAddress, NodeName, Start, Hashes) -> DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]), URL = NodeAddress ++ "sendlog", Headers = [{"Content-Type", "text/json"}], EncodedHashes = [base64:encode(H) || H <- Hashes], RequestBody = list_to_binary(mochijson2:encode({[{"start", Start}, {"hashes", EncodedHashes}]})), - case request(DebugTag, URL, Headers, RequestBody) of + case request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> throw({request_error, result, DebugTag, Err}) end. -missingentries(NodeAddress) -> +missingentries(NodeAddress, NodeName) -> DebugTag = "missingentries", URL = NodeAddress ++ "missingentries", - case request(DebugTag, URL) of + case request(DebugTag, URL, NodeName) of {<<"ok">>, PropList} -> {ok, proplists:get_value(<<"entries">>, PropList)}; Err -> throw({request_error, result, DebugTag, Err}) end. -sendentries(NodeAddress, Hashes, Chunksize) -> +sendentries(NodeAddress, NodeName, Hashes, Chunksize) -> lager:debug("sending entries: N=~B, chunksize=~B", [length(Hashes), Chunksize]), {ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes), - sendentries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, Chunksize). + sendentries_chunk(NodeAddress, NodeName, {ChunkOfHashes, RestOfHashes}, Chunksize). -sendentries_chunk(_, {[], _}, _) -> +sendentries_chunk(_, _, {[], _}, _) -> ok; -sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> +sendentries_chunk(NodeAddress, NodeName, {Chunk, Rest}, Chunksize) -> lager:debug("sending entries chunk: N=~B", [length(Chunk)]), HashesAndEntries = lists:zip(Chunk, lists:map(fun db:entry_for_leafhash/1, Chunk)), case lists:keysearch(noentry, 2, HashesAndEntries) of false -> - ok = sendentries_request(NodeAddress, HashesAndEntries), - sendentries_chunk(NodeAddress, + ok = sendentries_request(NodeAddress, NodeName, HashesAndEntries), + sendentries_chunk(NodeAddress, NodeName, lists:split(min(Chunksize, length(Rest)), Rest), Chunksize); Missing -> @@ -81,13 +85,13 @@ sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> {error, entrynotindb} end. -sendentries_request(NodeAddress, HashesAndEntries) -> +sendentries_request(NodeAddress, NodeName, HashesAndEntries) -> DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]), URL = NodeAddress ++ "sendentry", Headers = [{"Content-Type", "text/json"}], L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]), RequestBody = list_to_binary(L), - case request(DebugTag, URL, Headers, RequestBody) of + case request(DebugTag, URL, NodeName, Headers, RequestBody) of {<<"ok">>, _} -> ok; Err -> |