diff options
| -rw-r--r-- | merge/src/merge_backup.erl | 8 | ||||
| -rw-r--r-- | merge/src/merge_dist.erl | 8 | ||||
| -rw-r--r-- | src/bench.erl | 18 | ||||
| -rw-r--r-- | src/plop_compat.erl | 30 | ||||
| -rw-r--r-- | src/statusreport.erl | 68 | ||||
| -rw-r--r-- | statusserver/src/statusserver.erl | 50 | 
6 files changed, 156 insertions, 26 deletions
| diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index 068725c..3c19527 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -20,6 +20,7 @@ start_link(Args) ->  init([Name, Address]) ->      lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),      Timer = erlang:start_timer(1000, self(), backup), +    bench:timingpoint("merge_backup", Name, "start"),      {ok, #state{timer = Timer, node_name = Name, node_address = Address}}.  handle_call(stop, _From, State) -> @@ -42,14 +43,17 @@ terminate(Reason, #state{timer = Timer}) ->  backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) ->      lager:debug("~p: logorder size ~B", [NodeName, Size]), +    bench:timingpoint("merge_backup", NodeName, "idle"),      ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".      try          {ok, VerifiedSize} = verified_size(NodeName, NodeAddress), +        bench:timingpoint("merge_backup", NodeName, "verifiedsize"),          lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),          case VerifiedSize == Size of              true ->                  TreeHead = ht:root(Size - 1),                  ok = check_root(NodeName, NodeAddress, Size, TreeHead), +                bench:timingpoint("merge_backup", NodeName, "verifyroot"),                  ok = write_backupfile(NodeName, Size, TreeHead);              false ->                  true = VerifiedSize < Size, % Secondary ahead of primary? @@ -69,13 +73,17 @@ do_backup(NodeName, NodeAddress, Start, NTotal) ->      N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)),      Hashes = index:getrange(logorder, Start, Start + N - 1),      ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), +    bench:timingpoint("merge_backup", NodeName, "sendlog"),      {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName),      HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),      ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), +    bench:timingpoint("merge_backup", NodeName, "sendentries"),      Size = Start + N,      TreeHead = ht:root(Size - 1),      ok = check_root(NodeName, NodeAddress, Size, TreeHead), +    bench:timingpoint("merge_backup", NodeName, "verifyroot"),      ok = setverifiedsize(NodeName, NodeAddress, Size), +    bench:timingpoint("merge_backup", NodeName, "setverifiedsize"),      ok = write_backupfile(NodeName, Size, TreeHead),      true = NTotal >= N,      do_backup(NodeName, NodeAddress, Size, NTotal - N). diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index 3c38401..23c9d19 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -21,6 +21,7 @@ start_link(Args) ->  init([Name, Address]) ->      lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]),      Timer = erlang:start_timer(1000, self(), dist), +    bench:timingpoint("merge_dist", Name, "start"),      {ok, #state{timer = Timer,                  node_name = Name,                  node_address = Address, @@ -51,6 +52,7 @@ dist({struct, PropList} = STH,       #state{node_address = NodeAddress,              node_name = NodeName,              sth_timestamp = LastTimestamp} = State) -> +    bench:timingpoint("merge_dist", NodeName, "idle"),      Treesize = proplists:get_value(<<"tree_size">>, PropList),      Timestamp = proplists:get_value(<<"timestamp">>, PropList),      RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), @@ -59,12 +61,14 @@ dist({struct, PropList} = STH,      TS = case Timestamp > LastTimestamp of               true ->                   true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), +                 bench:timingpoint("merge_dist", NodeName, "verify_sth"),                   try                       lager:info("~p: starting dist, sth at ~B, logorder at ~B",                                  [NodeAddress, Treesize, Logordersize]),                       statusreport:report("merge_dist", NodeName, "targetsth", Treesize),                       ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)),                       ok = publish_sth(NodeName, NodeAddress, STH), +                     bench:timingpoint("merge_dist", NodeName, "publish_sth"),                       statusreport:report("merge_dist", NodeName, "sth", Treesize),                       lager:info("~p: Published STH with size ~B and timestamp " ++                                      "~p.", [NodeAddress, Treesize, Timestamp]), @@ -101,12 +105,16 @@ do_dist(NodeAddress, NodeName, Start, NTotal) ->      SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000),      SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100),      ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize), +    bench:timingpoint("merge_dist", NodeName, "sendlog"),      statusreport:report("merge_dist", NodeName, "sendlog", Start + N),      {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), +    bench:timingpoint("merge_dist", NodeName, "missingentries"),      lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]),      HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded),      ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize), +    bench:timingpoint("merge_dist", NodeName, "sendentries"),      {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N), +    bench:timingpoint("merge_dist", NodeName, "verifyentries"),      lager:info("~p: Done distributing ~B out of ~B entries.",                 [NodeAddress, NewSize-Start, NTotal]),      statusreport:report("merge_dist", NodeName, "verified", Start + N), diff --git a/src/bench.erl b/src/bench.erl new file mode 100644 index 0000000..06e4777 --- /dev/null +++ b/src/bench.erl @@ -0,0 +1,18 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(bench). + +-export([timingpoint/3]). + +timingpoint(Service, Target, Tag) -> +    Thispoint = plop_compat:monotonic_time(millisecond), +    Seq = plop_compat:unique_integer([monotonic]), +    case get(bench_lastpoint) of +        undefined -> +            statusreport:bench(Service, Target, Tag, Seq, Thispoint + plop_compat:time_offset(millisecond), null); +        Lastpoint -> +            statusreport:bench(Service, Target, Tag, Seq, Lastpoint + plop_compat:time_offset(millisecond), Thispoint - Lastpoint) +    end, +    put(bench_lastpoint, Thispoint), +    ok. diff --git a/src/plop_compat.erl b/src/plop_compat.erl index 4d45590..5c1fa17 100644 --- a/src/plop_compat.erl +++ b/src/plop_compat.erl @@ -2,7 +2,7 @@  %%% See LICENSE for licensing information.  -module(plop_compat). --export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4]). +-export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4, unique_integer/1, time_offset/1]).  -include_lib("public_key/include/public_key.hrl").  unpack_spki(SPKI) -> @@ -13,6 +13,10 @@ monotonic_time(Unit) ->      monotonic_time(erlang:system_info(otp_release), Unit).  start_timer(Time, Dest, Msg, Options) ->      start_timer(erlang:system_info(otp_release), Time, Dest, Msg, Options). +unique_integer(Modifiers) -> +    unique_integer(erlang:system_info(otp_release), Modifiers). +time_offset(Unit) -> +    time_offset(erlang:system_info(otp_release), Unit).  unpack_spki("R16" ++ _, SPKI) ->      #'SubjectPublicKeyInfo'{subjectPublicKey = {_, Octets}, @@ -43,15 +47,35 @@ timestamp("19") ->  monotonic_time("R16" ++ _, millisecond) ->      {MeS, S, MiS} = timestamp(), -    MeS * 1000000 + S * 1000 + MiS; +    (MeS * 1000000 + S) * 1000 + MiS div 1000;  monotonic_time("17", millisecond) ->      {MeS, S, MiS} = timestamp(), -    MeS * 1000000 + S * 1000 + MiS; +    (MeS * 1000000 + S) * 1000 + MiS div 1000;  monotonic_time("18", Unit) ->      erlang:monotonic_time(Unit);  monotonic_time("19", Unit) ->      erlang:monotonic_time(Unit). +unique_integer("R16", _Modifiers) -> +    {MeS, S, MiS} = erlang:now(), +    (MeS * 1000000 + S) * 1000000 + MiS; +unique_integer("17", _Modifiers) -> +    {MeS, S, MiS} = erlang:now(), +    (MeS * 1000000 + S) * 1000000 + MiS; +unique_integer("18", Modifiers) -> +    erlang:unique_integer(Modifiers); +unique_integer("19", Modifiers) -> +    erlang:unique_integer(Modifiers). + +time_offset("R16", _Unit) -> +    0; +time_offset("17", _Unit) -> +    0; +time_offset("18", Unit) -> +    erlang:time_offset(Unit); +time_offset("19", Unit) -> +    erlang:time_offset(Unit). +  start_timer("R16" ++ _, Time, Dest, Msg, [{abs, true}]) ->      erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg);  start_timer("17", Time, Dest, Msg, [{abs, true}]) -> diff --git a/src/statusreport.erl b/src/statusreport.erl index db85b84..8099f86 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -8,7 +8,7 @@  -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,           code_change/3]).  -export([report/4]). --export([report_multi/4]). +-export([report_multi/4, bench/6]).  -record(state, {            timer :: none|reference(), @@ -32,7 +32,7 @@ init([]) ->                  lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}.  store_status(State, Service, Target, Variable, Status) -> -    Statusreports = dict:store({Service, Target, Variable}, +    Statusreports = dict:store({{status, Service}, {Target, Variable}},                                 {single, Status},                                 State#state.statusreports),      State#state{statusreports = Statusreports}. @@ -45,13 +45,19 @@ dict_append_set(Key, Value, Dict) ->                  {multi, AppendSet}, Dict).  store_multi_status(State, Service, Target, Variable, Status) -> -    Statusreports = dict_append_set({Service, Target, Variable}, +    Statusreports = dict_append_set({{status, Service}, {Target, Variable}},                                      Status,                                      State#state.statusreports),      State#state{statusreports = Statusreports}. +store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed) -> +    Statusreports = dict:store({{bench, Service}, {Target, Tag, Seq}}, +                               {Starttime, Elapsed}, +                               State#state.statusreports), +    State#state{statusreports = Statusreports}. +  store_set_status(State, Service, Target, Variable, Statuses) -> -    Statusreports = dict:store({Service, Target, Variable}, +    Statusreports = dict:store({{status, Service}, {Target, Variable}},                                 {multi, Statuses},                                 State#state.statusreports),      State#state{statusreports = Statusreports}. @@ -72,6 +78,9 @@ handle_cast({report, Service, Target, Variable, Status}, State) ->      {noreply, try_send(NewState)};  handle_cast({report_multi, Service, Target, Variable, Status}, State) ->      NewState = store_multi_status(State, Service, Target, Variable, Status), +    {noreply, try_send(NewState)}; +handle_cast({bench, Service, Target, Tag, Seq, Starttime, Elapsed}, State) -> +    NewState = store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed),      {noreply, try_send(NewState)}.  handle_info({timeout, _Timer, force_send}, State) -> @@ -123,8 +132,8 @@ terminate(Reason, State) ->  group_by_service(Statusreports) ->      dict:to_list(        lists:foldl( -        fun ({{Service, Target, Variable}, Status}, Acc) -> -                dict:append(Service, {Target, Variable, Status}, Acc) +        fun ({{Service, Group}, Status}, Acc) -> +                dict:append(Service, {Group, Status}, Acc)          end, dict:new(), dict:to_list(Statusreports))).  encode_one_status(Status) when is_number(Status) -> @@ -141,22 +150,43 @@ encode_status({single, Status}) ->  encode_status({multi, Statuses}) ->      lists:map(fun encode_one_status/1, sets:to_list(Statuses)). -send(Service, Statusreports, Nodename) -> -    lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), -    [NodeAddress] = plopconfig:get_env(statusservers, []), +encode_report(status, Nodename, {{Target, Variable}, Status}) -> +    {struct, +     [{"target", list_to_binary(Target)}, +      {"source", list_to_binary(Nodename)}, +      {"key", list_to_binary(Variable)}, +      {"value", encode_status(Status)}]}; + +encode_report(bench, Nodename, {{Target, Tag, Seq}, {Starttime, Elapsed}}) -> +    {struct, +     [{"target", list_to_binary(Target)}, +      {"source", list_to_binary(Nodename)}, +      {"tag", list_to_binary(Tag)}, +      {"seq", Seq}, +      {"starttime", Starttime}, +      {"elapsed", Elapsed}]}. + +addresses_for_servicetype(status) -> +    plopconfig:get_env(statusservers, []); +addresses_for_servicetype(bench) -> +    plopconfig:get_env(benchservers, []). + +send({ServiceType, Service}, Statusreports, Nodename) -> +    lager:debug("reporting status to ~p ~p: ~p", [ServiceType, Service, Statusreports]), +    NodeAddresses = addresses_for_servicetype(ServiceType),      DebugTag = "statusreport", -    URL = NodeAddress ++ Service,      Headers = [{"Content-Type", "text/json"}],      RequestBody = list_to_binary(                      mochijson2:encode(                        [ -                       {struct, -                        [{"target", list_to_binary(Target)}, -                         {"source", list_to_binary(Nodename)}, -                         {"key", list_to_binary(Variable)}, -                         {"value", encode_status(Status)}]} -                       || {Target, Variable, Status} <- Statusreports +                       encode_report(ServiceType, Nodename, Statusreport) +                       || Statusreport <- Statusreports                        ])), +    lists:foreach(fun (NodeAddress) -> +                          send_one_server(DebugTag, NodeAddress ++ Service, Headers, RequestBody) +                  end, NodeAddresses). + +send_one_server(DebugTag, URL, Headers, RequestBody) ->      case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of          {error, Err} ->              lager:debug("request error ~p ~p", [DebugTag, Err]); @@ -187,7 +217,7 @@ try_send(State) ->              lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]),              set_timer(State);          true -> -            lager:debug("status report send long enough ago"), +            lager:debug("status report sent long enough ago"),              force_send(State)      end. @@ -198,3 +228,7 @@ report(Service, Target, Variable, Status) when is_number(Status); is_list(Status  report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) ->      lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]),      gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). + +bench(Service, Target, Tag, Seq, Starttime, Elapsed) -> +    lager:debug("reporting bench ~p ~p ~p ~p ~p ~p", [Service, Target, Tag, Seq, Starttime, Elapsed]), +    gen_server:cast(?MODULE, {bench, Service, Target, Tag, Seq, Starttime, Elapsed}). diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index 81fcd7a..ca11e7c 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -7,6 +7,7 @@  -export([request/4]).  -define(APPURL_PLOP_STATUS, "plop/v1/status"). +-define(APPURL_PLOP_BENCH, "plop/v1/bench").  request(post, ?APPURL_PLOP_STATUS, Service, Input) ->      case (catch mochijson2:decode(Input)) of @@ -22,7 +23,23 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) ->                            end, Entries)      end,      success({[{result, <<"ok">>}]}); -request(get, "", "status", Input) -> +request(post, ?APPURL_PLOP_BENCH, Service, Input) -> +    case (catch mochijson2:decode(Input)) of +        {error, E} -> +            html("bad input:", E); +        Entries when is_list(Entries) -> +            lists:foreach(fun ({struct, PropList}) -> +                                  Target = proplists:get_value(<<"target">>, PropList), +                                  Source = proplists:get_value(<<"source">>, PropList), +                                  Tag = proplists:get_value(<<"tag">>, PropList), +                                  Seq = proplists:get_value(<<"seq">>, PropList), +                                  Starttime = proplists:get_value(<<"starttime">>, PropList), +                                  Elapsed = proplists:get_value(<<"elapsed">>, PropList), +                                  add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed) +                          end, Entries) +    end, +    success({[{result, <<"ok">>}]}); +request(get, "", "status", _Input) ->      Now = plop_compat:monotonic_time(millisecond),      Variables = [{struct, [                             {service, list_to_binary(Service)}, @@ -32,6 +49,16 @@ request(get, "", "status", Input) ->                             {status, Status},                             {age, (Now - Timestamp) / 1000}                            ]}  || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()], +    success({[{result, Variables}]}); +request(get, "", "bench", _Input) -> +    Variables = [{struct, [ +                           {service, list_to_binary(Service)}, +                           {source, Source}, +                           {target, Target}, +                           {tag, Tag}, +                           {starttime, Starttime}, +                           {elapsed, Elapsed} +                          ]}  || {{Service, Source, Target, Tag, _Seq, _Starttime}, Starttime, Elapsed} <- get_bench()],      success({[{result, Variables}]}). @@ -47,25 +74,36 @@ html(Text, Input) ->             "</body></html>~n", [Text, Input])}.  -define(STATUSDB_TABLE, statusserver_statusdb). +-define(BENCHDB_TABLE, statusserver_benchdb).  init_module() -> -    create_statusdb(). +    create_table(?STATUSDB_TABLE), +    create_table(?BENCHDB_TABLE). -create_statusdb() -> -    case ets:info(?STATUSDB_TABLE) of +create_table(Table) -> +    case ets:info(Table) of  	undefined ->  	    ok;  	_ -> -	    ets:delete(?STATUSDB_TABLE) +	    ets:delete(Table)      end, -    ets:new(?STATUSDB_TABLE, [set, public, named_table]). +    ets:new(Table, [set, public, named_table]).  get_status() ->      [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. +get_bench() -> +    [E || [E] <- ets:match(?BENCHDB_TABLE, '$1')]. +  set_status(Service, Source, Target, Variable, Status) ->      lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]),      Timestamp = plop_compat:monotonic_time(millisecond),      true = ets:insert(?STATUSDB_TABLE,                        {{Service, Source, Target, Variable}, Status, Timestamp}),      ok. + +add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed) -> +    lager:info("bench: ~p ~p ~p ~p ~p ~p ~p", [Service, Source, Target, Tag, Seq, Starttime, Elapsed]), +    true = ets:insert(?BENCHDB_TABLE, +                      {{Service, Source, Target, Tag, Seq, Starttime}, Starttime, Elapsed}), +    ok. | 
