diff options
| -rw-r--r-- | merge/src/merge_backup.erl | 30 | ||||
| -rw-r--r-- | merge/src/merge_dist.erl | 24 | ||||
| -rw-r--r-- | merge/src/merge_sth.erl | 4 | ||||
| -rw-r--r-- | merge/src/merge_util.erl | 54 | ||||
| -rw-r--r-- | src/http_auth.erl | 3 | ||||
| -rw-r--r-- | src/statusreport.erl | 41 | ||||
| -rw-r--r-- | statusserver/src/statusserver.erl | 12 | 
7 files changed, 99 insertions, 69 deletions
| diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index f1e9253..068725c 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -44,12 +44,12 @@ backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) -      lager:debug("~p: logorder size ~B", [NodeName, Size]),      ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".      try -        {ok, VerifiedSize} = verified_size(NodeAddress), +        {ok, VerifiedSize} = verified_size(NodeName, NodeAddress),          lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),          case VerifiedSize == Size of              true ->                  TreeHead = ht:root(Size - 1), -                ok = check_root(NodeAddress, Size, TreeHead), +                ok = check_root(NodeName, NodeAddress, Size, TreeHead),                  ok = write_backupfile(NodeName, Size, TreeHead);              false ->                  true = VerifiedSize < Size, % Secondary ahead of primary? @@ -68,14 +68,14 @@ do_backup(_, _, _, 0) ->  do_backup(NodeName, NodeAddress, Start, NTotal) ->      N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),      Hashes = index:getrange(logorder, Start, Start + N - 1), -    ok = merge_util:sendlog(NodeAddress, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), -    {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), +    ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), +    {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),      HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), -    ok = merge_util:sendentries(NodeAddress, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), +    ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)),      Size = Start + N,      TreeHead = ht:root(Size - 1), -    ok = check_root(NodeAddress, Size, TreeHead), -    ok = setverifiedsize(NodeAddress, Size), +    ok = check_root(NodeName, NodeAddress, Size, TreeHead), +    ok = setverifiedsize(NodeName, NodeAddress, Size),      ok = write_backupfile(NodeName, Size, TreeHead),      true = NTotal >= N,      do_backup(NodeName, NodeAddress, Size, NTotal - N). @@ -88,8 +88,8 @@ write_backupfile(NodeName, TreeSize, TreeHead) ->                                    {"sha256_root_hash", list_to_binary(hex:bin_to_hexstr(TreeHead))}]}),      atomic:replacefile(Path, Content). -check_root(NodeAddress, Size, TreeHead) -> -    {ok, TreeHeadToVerify} = verifyroot(NodeAddress, Size), +check_root(NodeName, NodeAddress, Size, TreeHead) -> +    {ok, TreeHeadToVerify} = verifyroot(NodeName, NodeAddress, Size),      case TreeHeadToVerify == TreeHead of          true ->              ok; @@ -99,34 +99,34 @@ check_root(NodeAddress, Size, TreeHead) ->              root_mismatch      end. -verifyroot(NodeAddress, TreeSize) -> +verifyroot(NodeName, NodeAddress, TreeSize) ->      DebugTag = io_lib:format("verifyroot ~B", [TreeSize]),      URL = NodeAddress ++ "verifyroot",      Headers = [{"Content-Type", "text/json"}],      RequestBody = list_to_binary(mochijson2:encode({[{"tree_size", TreeSize}]})), -    case merge_util:request(DebugTag, URL, Headers, RequestBody) of +    case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, PropList} ->              {ok, base64:decode(proplists:get_value(<<"root_hash">>, PropList))};          Err ->              throw({request_error, result, DebugTag, Err})      end. -verified_size(NodeAddress) -> +verified_size(NodeName, NodeAddress) ->      DebugTag = "verifiedsize",      URL = NodeAddress ++ "verifiedsize", -    case merge_util:request(DebugTag, URL) of +    case merge_util:request(DebugTag, URL, NodeName) of          {<<"ok">>, PropList} ->              {ok, proplists:get_value(<<"size">>, PropList)};          Err ->              throw({request_error, result, DebugTag, Err})      end. -setverifiedsize(NodeAddress, Size) -> +setverifiedsize(NodeName, NodeAddress, Size) ->      DebugTag = io_lib:format("setverifiedsize ~B", [Size]),      URL = NodeAddress ++ "setverifiedsize",      Headers = [{"Content-Type", "text/json"}],      RequestBody = list_to_binary(mochijson2:encode({[{"size", Size}]})), -    case merge_util:request(DebugTag, URL, Headers, RequestBody) of +    case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, _} ->              ok;          Err -> diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index da6b667..3c38401 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -64,7 +64,7 @@ dist({struct, PropList} = STH,                                  [NodeAddress, Treesize, Logordersize]),                       statusreport:report("merge_dist", NodeName, "targetsth", Treesize),                       ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)), -                     ok = publish_sth(NodeAddress, STH), +                     ok = publish_sth(NodeName, NodeAddress, STH),                       statusreport:report("merge_dist", NodeName, "sth", Treesize),                       lager:info("~p: Published STH with size ~B and timestamp " ++                                      "~p.", [NodeAddress, Treesize, Timestamp]), @@ -87,7 +87,7 @@ dist({struct, PropList} = STH,  %% @doc Has nonlocal return because of throw further down in  %% merge_util:request/4.  do_dist(NodeAddress, NodeName, Size) -> -    {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress), +    {ok, VerifiedSize} = frontend_get_verifiedsize(NodeName, NodeAddress),      lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]),      true = VerifiedSize =< Size,      do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize). @@ -100,40 +100,40 @@ do_dist(NodeAddress, NodeName, Start, NTotal) ->      Hashes = index:getrange(logorder, Start, Start + N - 1),      SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),      SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), -    ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), +    ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize),      statusreport:report("merge_dist", NodeName, "sendlog", Start + N), -    {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress), +    {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),      lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]),      HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), -    ok = merge_util:sendentries(NodeAddress, HashesMissing, SendentriesChunksize), -    {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N), +    ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize), +    {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N),      lager:info("~p: Done distributing ~B out of ~B entries.",                 [NodeAddress, NewSize-Start, NTotal]),      statusreport:report("merge_dist", NodeName, "verified", Start + N),      true = NTotal >= NewSize - Start,      do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)). -frontend_get_verifiedsize(NodeAddress) -> -    frontend_verify_entries(NodeAddress, 0). +frontend_get_verifiedsize(NodeName, NodeAddress) -> +    frontend_verify_entries(NodeName, NodeAddress, 0). -frontend_verify_entries(NodeAddress, Size) -> +frontend_verify_entries(NodeName, NodeAddress, Size) ->      DebugTag = io_lib:format("verify-entries ~B", [Size]),      URL = NodeAddress ++ "verify-entries",      Headers = [{"Content-Type", "text/json"}],      RequestBody = list_to_binary(mochijson2:encode({[{"verify_to", Size}]})), -    case merge_util:request(DebugTag, URL, Headers, RequestBody) of +    case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, PropList} ->              {ok, proplists:get_value(<<"verified">>, PropList)};          Err ->              throw({request_error, result, DebugTag, Err})      end. -publish_sth(NodeAddress, STH) -> +publish_sth(NodeName, NodeAddress, STH) ->      DebugTag = "publish-sth",      URL = NodeAddress ++ "publish-sth",      Headers = [{"Content-Type", "text/json"}],      RequestBody = list_to_binary(mochijson2:encode(STH)), -    case merge_util:request(DebugTag, URL, Headers, RequestBody) of +    case merge_util:request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, _} ->              ok;          Err -> diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl index b8ae1f9..4b77864 100644 --- a/merge/src/merge_sth.erl +++ b/merge/src/merge_sth.erl @@ -66,7 +66,7 @@ make_sth(CurSize, State) ->      Wait =           case NewSize < CurSize of              true -> -                statusreport:report("merge_sth", "sth", "sth", null), +                statusreport:report("merge_sth", http_auth:own_name(), "sth", null),                  lager:debug("waiting for enough backups to reach ~B, now at ~B",                              [CurSize, NewSize]),                  1; @@ -91,7 +91,7 @@ do_make_sth(Size) ->                             {"sha256_root_hash", base64:encode(NewRoot)},                             {"tree_head_signature", base64:encode(PackedSignature)}],                   ok = plop:save_sth({struct, NewSTH}), -                 statusreport:report("merge_sth", "sth", "sth", Size), +                 statusreport:report("merge_sth", http_auth:own_name(), "sth", Size),                   ok;               false ->                   lager:error("The signature we got for new tree of size ~B doesn't " ++ diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl index 24eba60..c76d05f 100644 --- a/merge/src/merge_util.erl +++ b/merge/src/merge_util.erl @@ -2,82 +2,82 @@  %%% See LICENSE for licensing information.  -module(merge_util). --export([sendlog/4, sendentries/3, missingentries/1]). --export([request/2, request/4]). +-export([sendlog/5, sendentries/4, missingentries/2]). +-export([request/3, request/5]).  -export([readfile/1, nfetched/0]). -request(DebugTag, URL) -> -    request(DebugTag, URL, [], <<>>). +request(DebugTag, URL, NodeName) -> +    request(DebugTag, URL, NodeName, [], <<>>). -request(DebugTag, URL, Headers, RequestBody) -> +request(DebugTag, URL, NodeName, Headers, RequestBody) ->      case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of          {error, Err} -> -            statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(io_lib:format("~w", [Err]))), +            statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(io_lib:format("~w", [Err]))),              throw({request_error, request, DebugTag, Err});          {failure, {none, StatusCode, none}, _RespHeaders, _Body}  -> -            statusreport:report_multi("merge_errors", URL, "http_error", StatusCode), +            statusreport:report_multi("merge_errors", NodeName, "http_error", StatusCode),              throw({request_error, failure, DebugTag, StatusCode});          {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->              case (catch mochijson2:decode(Body)) of                  {error, Err} -> -                    statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(Err)), +                    statusreport:report_multi("merge_errors", NodeName, "http_error", list_to_binary(Err)),                      throw({request_error, decode, DebugTag, Err});                  {struct, PropList} -> -                    statusreport:report_multi("merge_errors", URL, "http_error", 200), +                    statusreport:report_multi("merge_errors", NodeName, "http_error", 200),                      {proplists:get_value(<<"result">>, PropList), PropList}              end      end. -sendlog(NodeAddress, Start, Hashes, Chunksize) -> +sendlog(NodeAddress, NodeName, Start, Hashes, Chunksize) ->      lager:debug("sending log: start=~B, N=~B, chunksize=~B", [Start, length(Hashes), Chunksize]), -    sendlog_chunk(NodeAddress, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). +    sendlog_chunk(NodeAddress, NodeName, Start, lists:split(min(Chunksize, length(Hashes)), Hashes), Chunksize). -sendlog_chunk(_, _, {[], _}, _) -> +sendlog_chunk(_, _, _, {[], _}, _) ->      ok; -sendlog_chunk(NodeAddress, Start, {Chunk, Rest}, Chunksize) -> +sendlog_chunk(NodeAddress, NodeName, Start, {Chunk, Rest}, Chunksize) ->      lager:debug("sending log chunk: start=~B, N=~B", [Start, length(Chunk)]), -    ok = sendlog_request(NodeAddress, Start, Chunk), -    sendlog_chunk(NodeAddress, Start + length(Chunk), +    ok = sendlog_request(NodeAddress, NodeName, Start, Chunk), +    sendlog_chunk(NodeAddress, NodeName, Start + length(Chunk),                    lists:split(min(Chunksize, length(Rest)), Rest), Chunksize). -sendlog_request(NodeAddress, Start, Hashes) -> +sendlog_request(NodeAddress, NodeName, Start, Hashes) ->      DebugTag = io_lib:format("sendlog ~B:~B", [Start, length(Hashes)]),      URL = NodeAddress ++ "sendlog",      Headers = [{"Content-Type", "text/json"}],      EncodedHashes = [base64:encode(H) || H <- Hashes],      RequestBody = list_to_binary(mochijson2:encode({[{"start", Start},                                                       {"hashes", EncodedHashes}]})), -    case request(DebugTag, URL, Headers, RequestBody) of +    case request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, _} ->              ok;          Err ->              throw({request_error, result, DebugTag, Err})      end. -missingentries(NodeAddress) -> +missingentries(NodeAddress, NodeName) ->      DebugTag = "missingentries",      URL = NodeAddress ++ "missingentries", -    case request(DebugTag, URL) of +    case request(DebugTag, URL, NodeName) of          {<<"ok">>, PropList} ->              {ok, proplists:get_value(<<"entries">>, PropList)};          Err ->              throw({request_error, result, DebugTag, Err})      end. -sendentries(NodeAddress, Hashes, Chunksize) -> +sendentries(NodeAddress, NodeName, Hashes, Chunksize) ->      lager:debug("sending entries: N=~B, chunksize=~B", [length(Hashes), Chunksize]),      {ChunkOfHashes, RestOfHashes} = lists:split(min(Chunksize, length(Hashes)), Hashes), -    sendentries_chunk(NodeAddress, {ChunkOfHashes, RestOfHashes}, Chunksize). +    sendentries_chunk(NodeAddress, NodeName, {ChunkOfHashes, RestOfHashes}, Chunksize). -sendentries_chunk(_, {[], _}, _) -> +sendentries_chunk(_, _, {[], _}, _) ->      ok; -sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) -> +sendentries_chunk(NodeAddress, NodeName, {Chunk, Rest}, Chunksize) ->      lager:debug("sending entries chunk: N=~B", [length(Chunk)]),      HashesAndEntries = lists:zip(Chunk, lists:map(fun db:entry_for_leafhash/1, Chunk)),      case lists:keysearch(noentry, 2, HashesAndEntries) of          false -> -            ok = sendentries_request(NodeAddress, HashesAndEntries), -            sendentries_chunk(NodeAddress, +            ok = sendentries_request(NodeAddress, NodeName, HashesAndEntries), +            sendentries_chunk(NodeAddress, NodeName,                                lists:split(min(Chunksize, length(Rest)), Rest),                                Chunksize);          Missing -> @@ -85,13 +85,13 @@ sendentries_chunk(NodeAddress, {Chunk, Rest}, Chunksize) ->              {error, entrynotindb}      end. -sendentries_request(NodeAddress, HashesAndEntries) -> +sendentries_request(NodeAddress, NodeName, HashesAndEntries) ->      DebugTag = io_lib:format("sendentry ~B", [length(HashesAndEntries)]),      URL = NodeAddress ++ "sendentry",      Headers = [{"Content-Type", "text/json"}],      L = mochijson2:encode([[{"entry", base64:encode(E)}, {"treeleafhash", base64:encode(H)}] || {H, E} <- HashesAndEntries]),      RequestBody = list_to_binary(L), -    case request(DebugTag, URL, Headers, RequestBody) of +    case request(DebugTag, URL, NodeName, Headers, RequestBody) of          {<<"ok">>, _} ->              ok;          Err -> diff --git a/src/http_auth.erl b/src/http_auth.erl index e083a2c..c8a8389 100644 --- a/src/http_auth.erl +++ b/src/http_auth.erl @@ -25,9 +25,8 @@ read_key_table() ->              none      end. -  own_name() -> -    {_Key, KeyName} = own_key(), +    {ok, {KeyName, _}} = application:get_env(plop, own_key),      KeyName.  own_key() -> diff --git a/src/statusreport.erl b/src/statusreport.erl index 63414cd..f0b7503 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -24,7 +24,8 @@ init([]) ->      process_flag(trap_exit, true),      ReportInterval = application:get_env(plop, status_report_interval, 1000),      lager:info("~p: starting", [?MODULE]), -    %Timer = erlang:start_timer(1000, self(), dist), +    HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), +    erlang:start_timer(HeartbeatInterval, self(), heartbeat),      {ok, #state{timer = none,                  nodename = http_auth:own_name(),                  statusreports = dict:new(), @@ -49,6 +50,20 @@ store_multi_status(State, Service, Target, Variable, Status) ->                                      State#state.statusreports),      State#state{statusreports = Statusreports}. +store_set_status(State, Service, Target, Variable, Statuses) -> +    Statusreports = dict:store({Service, Target, Variable}, +                               {multi, Statuses}, +                               State#state.statusreports), +    State#state{statusreports = Statusreports}. + +heartbeat(State) -> +    {ok, ConfigVersion} = plopconfig:get_env(version), +    RunningApps = [atom_to_list(App) ++ " " ++ Vsn || {App, _Desc, Vsn} <- application:which_applications()], + +    NewState1 = store_status(State, "heartbeat", "", "configversion", ConfigVersion), +    NewState2 = store_set_status(NewState1, "heartbeat", "", "applications", sets:from_list(RunningApps)), +    NewState2. +  handle_call(_, _From, State) ->      {noreply, State}. @@ -61,7 +76,14 @@ handle_cast({report_multi, Service, Target, Variable, Status}, State) ->  handle_info({timeout, _Timer, force_send}, State) ->      lager:debug("statusreport timer timeout"), -    {noreply, force_send(State)}. +    {noreply, force_send(State)}; + +handle_info({timeout, _Timer, heartbeat}, State) -> +    lager:debug("statusreport timer timeout"), +    HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), +    erlang:start_timer(HeartbeatInterval, self(), heartbeat), +    NewState = heartbeat(State), +    {noreply, try_send(NewState)}.  code_change(_OldVsn, State, _Extra) ->      {ok, State}. @@ -105,10 +127,17 @@ group_by_service(Statusreports) ->                  dict:append(Service, {Target, Variable, Status}, Acc)          end, dict:new(), dict:to_list(Statusreports))). -encode_status({single, Status}) -> +encode_one_status(Status) when is_number(Status) ->      Status; +encode_one_status(Status) when is_list(Status) -> +    list_to_binary(Status); +encode_one_status(Status) when is_binary(Status) -> +    Status. + +encode_status({single, Status}) -> +    encode_one_status(Status);  encode_status({multi, Statuses}) -> -    sets:to_list(Statuses). +    lists:map(fun encode_one_status/1, sets:to_list(Statuses)).  send(Service, Statusreports, Nodename) ->      lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), @@ -160,10 +189,10 @@ try_send(State) ->              force_send(State)      end. -report(Service, Target, Variable, Status) -> +report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) ->      lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]),      gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). -report_multi(Service, Target, Variable, Status) -> +report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) ->      lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]),      gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index 323b28d..f38bf69 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -17,9 +17,10 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) ->          Entries when is_list(Entries) ->              lists:foreach(fun ({struct, PropList}) ->                                    Target = proplists:get_value(<<"target">>, PropList), +                                  Source = proplists:get_value(<<"source">>, PropList),                                    Variable = proplists:get_value(<<"key">>, PropList),                                    Status = proplists:get_value(<<"value">>, PropList), -                                  set_status(Service, Target, Variable, Status) +                                  set_status(Service, Source, Target, Variable, Status)                            end, Entries)      end,      success({[{result, <<"ok">>}]}); @@ -27,11 +28,12 @@ request(get, "", "status", Input) ->      Now = erlang:monotonic_time(millisecond),      Variables = [{struct, [                             {service, list_to_binary(Service)}, +                           {source, Source},                             {target, Target},                             {variable, Variable},                             {status, Status},                             {age, (Now - Timestamp) / 1000} -                          ]}  || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], +                          ]}  || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()],      success({[{result, Variables}]}). @@ -63,9 +65,9 @@ create_statusdb() ->  get_status() ->      [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. -set_status(Service, Target, Variable, Status) -> -    lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), +set_status(Service, Source, Target, Variable, Status) -> +    lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]),      Timestamp = erlang:monotonic_time(millisecond),      true = ets:insert(?STATUSDB_TABLE, -                      {{Service, Target, Variable}, Status, Timestamp}), +                      {{Service, Source, Target, Variable}, Status, Timestamp}),      ok. | 
