diff options
| -rw-r--r-- | Emakefile | 6 | ||||
| -rw-r--r-- | Makefile | 1 | ||||
| -rw-r--r-- | merge/src/merge_backup.erl | 1 | ||||
| -rw-r--r-- | merge/src/merge_dist.erl | 20 | ||||
| -rw-r--r-- | merge/src/merge_sth.erl | 2 | ||||
| -rw-r--r-- | merge/src/merge_util.erl | 4 | ||||
| -rw-r--r-- | src/http_auth.erl | 6 | ||||
| -rw-r--r-- | src/plop_sup.erl | 1 | ||||
| -rw-r--r-- | src/statusreport.erl | 169 | ||||
| -rw-r--r-- | src/storage.erl | 14 | ||||
| -rw-r--r-- | statsserver/ebin/statsserver.app | 13 | ||||
| -rw-r--r-- | statsserver/src/statsserver.erl | 71 | ||||
| -rw-r--r-- | statsserver/src/statsserver_app.erl | 13 | ||||
| -rw-r--r-- | statsserver/src/statsserver_sup.erl | 42 | 
14 files changed, 355 insertions, 8 deletions
| @@ -10,3 +10,9 @@    {i, "src/"},                           % For plop.hrl.    {outdir, "merge/ebin/"},    {parse_transform, lager_transform}]}. +{["statsserver/src/*"], + [debug_info, +  {i, "../"},                            % For hackney. +  {i, "src/"},                           % For plop.hrl. +  {outdir, "statsserver/ebin/"}, +  {parse_transform, lager_transform}]}. @@ -11,6 +11,7 @@ clean:  	-rm priv/fsynchelper  	-rm ebin/*.beam  	-rm merge/ebin/*.beam +	-rm statsserver/ebin/*.beam  dialyze: build  	dialyzer ebin merge/ebin  tags: diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index bf20f23..f1e9253 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -81,6 +81,7 @@ do_backup(NodeName, NodeAddress, Start, NTotal) ->      do_backup(NodeName, NodeAddress, Size, NTotal - N).  write_backupfile(NodeName, TreeSize, TreeHead) -> +    statusreport:report("merge_backup", NodeName, "verified", TreeSize),      {ok, BasePath} = application:get_env(plop, verified_path),      Path = BasePath ++ "." ++ NodeName,      Content = mochijson2:encode({[{"tree_size", TreeSize}, diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index f8f0c7c..da6b667 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -48,7 +48,9 @@ dist(noentry, State) ->      Timer = erlang:start_timer(1000, self(), dist),      {noreply, State#state{timer = Timer}};  dist({struct, PropList} = STH, -     #state{node_address = NodeAddress, sth_timestamp = LastTimestamp} = State) -> +     #state{node_address = NodeAddress, +            node_name = NodeName, +            sth_timestamp = LastTimestamp} = State) ->      Treesize = proplists:get_value(<<"tree_size">>, PropList),      Timestamp = proplists:get_value(<<"timestamp">>, PropList),      RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), @@ -60,8 +62,10 @@ dist({struct, PropList} = STH,                   try                       lager:info("~p: starting dist, sth at ~B, logorder at ~B",                                  [NodeAddress, Treesize, Logordersize]), -                     ok = do_dist(NodeAddress, min(Treesize, Logordersize)), +                     statusreport:report("merge_dist", NodeName, "targetsth", Treesize), +                     ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)),                       ok = publish_sth(NodeAddress, STH), +                     statusreport:report("merge_dist", NodeName, "sth", Treesize),                       lager:info("~p: Published STH with size ~B and timestamp " ++                                      "~p.", [NodeAddress, Treesize, Timestamp]),                       Timestamp @@ -82,21 +86,22 @@ dist({struct, PropList} = STH,  %% @doc Has nonlocal return because of throw further down in  %% merge_util:request/4. -do_dist(NodeAddress, Size) -> +do_dist(NodeAddress, NodeName, Size) ->      {ok, VerifiedSize} = frontend_get_verifiedsize(NodeAddress),      lager:debug("~p: verifiedsize ~B", [NodeAddress, VerifiedSize]),      true = VerifiedSize =< Size, -    do_dist(NodeAddress, VerifiedSize, Size - VerifiedSize). +    do_dist(NodeAddress, NodeName, VerifiedSize, Size - VerifiedSize). -do_dist(_, _, 0) -> +do_dist(_, _, _, 0) ->      ok; -do_dist(NodeAddress, Start, NTotal) -> +do_dist(NodeAddress, NodeName, Start, NTotal) ->      DistMaxWindow = application:get_env(plop, merge_dist_winsize, 1000),      N = min(DistMaxWindow, NTotal),      Hashes = index:getrange(logorder, Start, Start + N - 1),      SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),      SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),      ok = merge_util:sendlog(NodeAddress, Start, Hashes, SendlogChunksize), +    statusreport:report("merge_dist", NodeName, "sendlog", Start + N),      {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress),      lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]),      HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), @@ -104,8 +109,9 @@ do_dist(NodeAddress, Start, NTotal) ->      {ok, NewSize} = frontend_verify_entries(NodeAddress, Start + N),      lager:info("~p: Done distributing ~B out of ~B entries.",                 [NodeAddress, NewSize-Start, NTotal]), +    statusreport:report("merge_dist", NodeName, "verified", Start + N),      true = NTotal >= NewSize - Start, -    do_dist(NodeAddress, NewSize, NTotal - (NewSize - Start)). +    do_dist(NodeAddress, NodeName, NewSize, NTotal - (NewSize - Start)).  frontend_get_verifiedsize(NodeAddress) ->      frontend_verify_entries(NodeAddress, 0). diff --git a/merge/src/merge_sth.erl b/merge/src/merge_sth.erl index ab1cd8f..b8ae1f9 100644 --- a/merge/src/merge_sth.erl +++ b/merge/src/merge_sth.erl @@ -66,6 +66,7 @@ make_sth(CurSize, State) ->      Wait =           case NewSize < CurSize of              true -> +                statusreport:report("merge_sth", "sth", "sth", null),                  lager:debug("waiting for enough backups to reach ~B, now at ~B",                              [CurSize, NewSize]),                  1; @@ -90,6 +91,7 @@ do_make_sth(Size) ->                             {"sha256_root_hash", base64:encode(NewRoot)},                             {"tree_head_signature", base64:encode(PackedSignature)}],                   ok = plop:save_sth({struct, NewSTH}), +                 statusreport:report("merge_sth", "sth", "sth", Size),                   ok;               false ->                   lager:error("The signature we got for new tree of size ~B doesn't " ++ diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl index 7598e40..24eba60 100644 --- a/merge/src/merge_util.erl +++ b/merge/src/merge_util.erl @@ -12,14 +12,18 @@ request(DebugTag, URL) ->  request(DebugTag, URL, Headers, RequestBody) ->      case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of          {error, Err} -> +            statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(io_lib:format("~w", [Err]))),              throw({request_error, request, DebugTag, Err});          {failure, {none, StatusCode, none}, _RespHeaders, _Body}  -> +            statusreport:report_multi("merge_errors", URL, "http_error", StatusCode),              throw({request_error, failure, DebugTag, StatusCode});          {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->              case (catch mochijson2:decode(Body)) of                  {error, Err} -> +                    statusreport:report_multi("merge_errors", URL, "http_error", list_to_binary(Err)),                      throw({request_error, decode, DebugTag, Err});                  {struct, PropList} -> +                    statusreport:report_multi("merge_errors", URL, "http_error", 200),                      {proplists:get_value(<<"result">>, PropList), PropList}              end      end. diff --git a/src/http_auth.erl b/src/http_auth.erl index 2cee51f..e083a2c 100644 --- a/src/http_auth.erl +++ b/src/http_auth.erl @@ -2,7 +2,7 @@  %%% See LICENSE for licensing information.  -module(http_auth). --export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3]). +-export([verify_auth/4, create_auth/3, init_key_table/0, sign_stored/1, verify_stored/3, own_name/0]).  -define(KEY_TABLE, http_auth_keys). @@ -26,6 +26,10 @@ read_key_table() ->      end. +own_name() -> +    {_Key, KeyName} = own_key(), +    KeyName. +  own_key() ->      case application:get_env(plop, own_key, none) of          {KeyName, _KeyFile} -> diff --git a/src/plop_sup.erl b/src/plop_sup.erl index 27f7680..15f3fe8 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -50,6 +50,7 @@ init([]) ->      Children = [permanent_worker(the_db, {db, start_link, []}, [db]),                  permanent_worker(the_storagedb, {storagedb, start_link, []}),                  permanent_worker(fsync, {fsyncport, start_link, []}), +                permanent_worker(the_statusreport, {statusreport, start_link, []}),                  permanent_worker(plopcontrol, {plopcontrol, start_link, []})],      OptionalChildren = lists:map(fun (ServiceName) ->                                           case ServiceName of diff --git a/src/statusreport.erl b/src/statusreport.erl new file mode 100644 index 0000000..cd5bb5a --- /dev/null +++ b/src/statusreport.erl @@ -0,0 +1,169 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statusreport). +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, +         code_change/3]). +-export([report/4]). +-export([report_multi/4]). + +-record(state, { +          timer :: none|reference(), +          nodename :: string(), +          statusreports :: dict:dict(), +          lastsent :: integer() +         }). + +start_link() -> +    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> +    process_flag(trap_exit, true), +    ReportInterval = application:get_env(plop, status_report_interval, 1000), +    lager:info("~p: starting", [?MODULE]), +    %Timer = erlang:start_timer(1000, self(), dist), +    {ok, #state{timer = none, +                nodename = http_auth:own_name(), +                statusreports = dict:new(), +                lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}. + +store_status(State, Service, Target, Variable, Status) -> +    Statusreports = dict:store({Service, Target, Variable}, +                               {single, Status}, +                               State#state.statusreports), +    State#state{statusreports = Statusreports}. + +dict_append_set(Key, Value, Dict) -> +    AppendSet = sets:from_list([Value]), +    dict:update(Key, fun ({multi, Old}) -> +                             {multi, sets:union(Old, AppendSet)} +                     end, +                {multi, AppendSet}, Dict). + +store_multi_status(State, Service, Target, Variable, Status) -> +    Statusreports = dict_append_set({Service, Target, Variable}, +                                    Status, +                                    State#state.statusreports), +    State#state{statusreports = Statusreports}. + +handle_call(_, _From, State) -> +    {noreply, State}. + +handle_cast({report, Service, Target, Variable, Status}, State) -> +    NewState = store_status(State, Service, Target, Variable, Status), +    {noreply, try_send(NewState)}; +handle_cast({report_multi, Service, Target, Variable, Status}, State) -> +    NewState = store_multi_status(State, Service, Target, Variable, Status), +    {noreply, try_send(NewState)}. + +handle_info({timeout, _Timer, force_send}, State) -> +    lager:debug("statusreport timer timeout"), +    {noreply, force_send(State)}. + +code_change(_OldVsn, State, _Extra) -> +    {ok, State}. + +cancel_timer(State) -> +    case State#state.timer of +        none -> +            none; +        _ -> +            erlang:cancel_timer(State#state.timer) +    end, +    State#state{timer = none}. + +set_timer(State) -> +    case State#state.timer of +        none -> +            ReportInterval = application:get_env(plop, status_report_interval, 1000), +            Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), +            State#state{timer = Timer}; +        _ -> +            State +    end. + +terminate(Reason, State) -> +    lager:info("~p terminating: ~p", [?MODULE, Reason]), +    NewState = cancel_timer(State), +    case Reason of +        shutdown -> +            force_send(NewState); +        _ -> +            none +    end, +    ok. + + + +group_by_service(Statusreports) -> +    dict:to_list( +      lists:foldl( +        fun ({{Service, Target, Variable}, Status}, Acc) -> +                dict:append(Service, {Target, Variable, Status}, Acc) +        end, dict:new(), dict:to_list(Statusreports))). + +encode_status({single, Status}) -> +    Status; +encode_status({multi, Statuses}) -> +    sets:to_list(Statuses). + +send(Service, Statusreports, Nodename) -> +    lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), +    [NodeAddress] = plopconfig:get_env(statsservers, []), +    DebugTag = "statusreport", +    URL = NodeAddress ++ Service, +    Headers = [{"Content-Type", "text/json"}], +    RequestBody = list_to_binary( +                    mochijson2:encode( +                      [ +                       {struct, +                        [{"target", list_to_binary(Target)}, +                         {"source", list_to_binary(Nodename)}, +                         {"key", list_to_binary(Variable)}, +                         {"value", encode_status(Status)}]} +                       || {Target, Variable, Status} <- Statusreports +                      ])), +    case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of +        {error, Err} -> +            lager:debug("request error ~p ~p", [DebugTag, Err]); +        {failure, {none, StatusCode, none}, _RespHeaders, _Body}  -> +            lager:debug("request failure ~p ~p", [DebugTag, StatusCode]); +        {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> +            case (catch mochijson2:decode(Body)) of +                {error, Err} -> +                    lager:debug("error returned ~p ~p", [DebugTag, Err]); +                {struct, _PropList} -> +                    none +            end +    end. + +force_send(State) -> +    lists:foreach(fun ({Service, Statusreports}) -> +                          send(Service, Statusreports, State#state.nodename) +                  end, group_by_service(State#state.statusreports)), +    NewState = cancel_timer(State), +    NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}. + +try_send(State) -> +    ReportInterval = application:get_env(plop, status_report_interval, 1000), +    NextSend = State#state.lastsent + ReportInterval, +    Now = erlang:monotonic_time(millisecond), +    if +        NextSend > Now -> +            lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), +            set_timer(State); +        true -> +            lager:debug("status report send long enough ago"), +            force_send(State) +    end. + +report(Service, Target, Variable, Status) -> +    lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), +    gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). + +report_multi(Service, Target, Variable, Status) -> +    lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), +    gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). diff --git a/src/storage.erl b/src/storage.erl index 7498635..6114a57 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -9,6 +9,18 @@  -define(APPURL_PLOP_STORAGE, "plop/v1/storage"). +reportnewentries() -> +    NodeName = http_auth:own_name(), +    {LastIndexOrZero, LastHash} = storagedb:lastverifiednewentry(), +    VerifiedEntries = case LastHash of +                          none -> +                              0; +                          _ -> +                              LastIndexOrZero + 1 +                      end, +    NewEntries = index:indexsize(newentries_db) - VerifiedEntries, +    statusreport:report("storage", NodeName, "newentries", NewEntries). +  request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) ->      case (catch mochijson2:decode(Input)) of          {error, E} -> @@ -20,6 +32,7 @@ request(post, ?APPURL_PLOP_STORAGE, "sendentry", Input) ->              ok = db:add_entry_sync(TreeLeafHash, LogEntry),              ok = storagedb:add(TreeLeafHash),              {KeyName, Sig} = http_auth:sign_stored(plop:spt_data_from_entry(LogEntry)), +            reportnewentries(),              success({[{result, <<"ok">>},                        {"sig", KeyName ++ ":" ++ base64:encode_to_string(Sig)}                       ]}) @@ -55,6 +68,7 @@ request(get, ?APPURL_PLOP_STORAGE, "fetchnewentries", _Input) ->      Entries = lists:map(fun(LeafHash) ->                                  base64:encode(LeafHash)                          end, NewHashes), +    reportnewentries(),      success({[{result, <<"ok">>},                {entries, Entries}]});  request(get, ?APPURL_PLOP_STORAGE, "getentry", Query) -> diff --git a/statsserver/ebin/statsserver.app b/statsserver/ebin/statsserver.app new file mode 100644 index 0000000..9c642ed --- /dev/null +++ b/statsserver/ebin/statsserver.app @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% Application resource file for statsserver, see app(5). + +{application, statsserver, + [{description, "Plop statsserver"}, +  {vsn, "0.10.1"}, +  {modules, [statsserver_app, statsserver_sup, statsserver]}, +  {applications, [kernel, stdlib, lager, plop]}, +  {registered, [statsserver_sup, statsserver]}, +  {mod, {statsserver_app, []}} + ]}. diff --git a/statsserver/src/statsserver.erl b/statsserver/src/statsserver.erl new file mode 100644 index 0000000..1d42b27 --- /dev/null +++ b/statsserver/src/statsserver.erl @@ -0,0 +1,71 @@ +%%% Copyright (c) 2014-2016, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Frontend node API + +-module(statsserver). +-export([init_module/0]). +%% API (URL) +-export([request/4]). + +-define(APPURL_PLOP_STATUS, "plop/v1/status"). + +request(post, ?APPURL_PLOP_STATUS, Service, Input) -> +    case (catch mochijson2:decode(Input)) of +        {error, E} -> +            html("bad input:", E); +        Entries when is_list(Entries) -> +            lists:foreach(fun ({struct, PropList}) -> +                                  Target = proplists:get_value(<<"target">>, PropList), +                                  Variable = proplists:get_value(<<"key">>, PropList), +                                  Status = proplists:get_value(<<"value">>, PropList), +                                  set_status(Service, Target, Variable, Status) +                          end, Entries) +    end, +    success({[{result, <<"ok">>}]}); +request(get, "", "status", Input) -> +    Now = erlang:monotonic_time(millisecond), +    Variables = [{struct, [ +                           {service, list_to_binary(Service)}, +                           {target, Target}, +                           {variable, Variable}, +                           {status, Status}, +                           {age, (Now - Timestamp) / 1000} +                          ]}  || {{Service, Target, Variable}, Status, Timestamp} <- get_status()], +    success({[{result, Variables}]}). + + +success(Data) -> +    {200, [{"Content-Type", "text/json"}, {"Access-Control-Allow-Origin", "*"}], mochijson2:encode(Data)}. + +html(Text, Input) -> +    {400, [{"Content-Type", "text/html"}], +     io_lib:format( +       "<html><body><p>~n" ++ +           "~s~n" ++ +           "~p~n" ++ +           "</body></html>~n", [Text, Input])}. + +-define(STATUSDB_TABLE, statsserver_statusdb). + +init_module() -> +    create_statusdb(). + +create_statusdb() -> +    case ets:info(?STATUSDB_TABLE) of +	undefined -> +	    ok; +	_ -> +	    ets:delete(?STATUSDB_TABLE) +    end, +    ets:new(?STATUSDB_TABLE, [set, public, named_table]). + +get_status() -> +    [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. + +set_status(Service, Target, Variable, Status) -> +    lager:info("status: ~p ~p ~p ~p", [Service, Target, Variable, Status]), +    Timestamp = erlang:monotonic_time(millisecond), +    true = ets:insert(?STATUSDB_TABLE, +                      {{Service, Target, Variable}, Status, Timestamp}), +    ok. diff --git a/statsserver/src/statsserver_app.erl b/statsserver/src/statsserver_app.erl new file mode 100644 index 0000000..6caf2b7 --- /dev/null +++ b/statsserver/src/statsserver_app.erl @@ -0,0 +1,13 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statsserver_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(normal, Args) -> +    statsserver:init_module(), +    statsserver_sup:start_link(Args). + +stop(_State) -> +    ok. diff --git a/statsserver/src/statsserver_sup.erl b/statsserver/src/statsserver_sup.erl new file mode 100644 index 0000000..6b92e35 --- /dev/null +++ b/statsserver/src/statsserver_sup.erl @@ -0,0 +1,42 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statsserver_sup). +-behaviour(supervisor). + +-export([start_link/1, init/1]). + +start_link(_Args) -> +    supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +gen_http_config(Config, SSLOptions, SSLFlag) -> +    {ChildName, IpAddress, Port, Module} = Config, +    {ok, IPv4Address} = inet:parse_ipv4strict_address(IpAddress), +    WebConfig = [{ip, IPv4Address}, +                 {port, Port}, +                 {ssl, SSLFlag}, +                 {acceptor_pool_size, +                  application:get_env(catlfish, http_server_pool_size, 16)}, +                 {ssl_opts, SSLOptions} +                ], +    {ChildName, +     {catlfish_web, start, [WebConfig, Module, ChildName]}, +     permanent, 5000, worker, dynamic}. + + +init([]) -> +    SSLOptions = +        [{certfile, application:get_env(plop, https_certfile, none)}, +         {keyfile, application:get_env(plop, https_keyfile, none)}, +         {cacertfile, application:get_env(plop, https_cacertfile, none)}], +    Servers = +        lists:map(fun (Config) -> +                          gen_http_config(Config, SSLOptions, true) +                  end, application:get_env(plop, https_servers, [])) ++ +        lists:map(fun (Config) -> +                          gen_http_config(Config, SSLOptions, false) +                  end, application:get_env(plop, http_servers, [])), +    lager:debug("Starting servers ~p", [Servers]), +    {ok, +     {{one_for_one, 3, 10}, +      Servers}}. | 
