From df2441c6315de4b245e1faf5b72517c5199fe179 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Fri, 17 Mar 2017 01:24:44 +0100 Subject: Added benchmark reporting. --- merge/src/merge_backup.erl | 8 +++++ merge/src/merge_dist.erl | 8 +++++ src/bench.erl | 18 +++++++++++ src/plop_compat.erl | 30 +++++++++++++++-- src/statusreport.erl | 68 +++++++++++++++++++++++++++++---------- statusserver/src/statusserver.erl | 50 ++++++++++++++++++++++++---- 6 files changed, 156 insertions(+), 26 deletions(-) create mode 100644 src/bench.erl diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl index 068725c..3c19527 100644 --- a/merge/src/merge_backup.erl +++ b/merge/src/merge_backup.erl @@ -20,6 +20,7 @@ start_link(Args) -> init([Name, Address]) -> lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]), Timer = erlang:start_timer(1000, self(), backup), + bench:timingpoint("merge_backup", Name, "start"), {ok, #state{timer = Timer, node_name = Name, node_address = Address}}. handle_call(stop, _From, State) -> @@ -42,14 +43,17 @@ terminate(Reason, #state{timer = Timer}) -> backup(Size, #state{node_name = NodeName, node_address = NodeAddress} = State) -> lager:debug("~p: logorder size ~B", [NodeName, Size]), + bench:timingpoint("merge_backup", NodeName, "idle"), ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards". try {ok, VerifiedSize} = verified_size(NodeName, NodeAddress), + bench:timingpoint("merge_backup", NodeName, "verifiedsize"), lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]), case VerifiedSize == Size of true -> TreeHead = ht:root(Size - 1), ok = check_root(NodeName, NodeAddress, Size, TreeHead), + bench:timingpoint("merge_backup", NodeName, "verifyroot"), ok = write_backupfile(NodeName, Size, TreeHead); false -> true = VerifiedSize < Size, % Secondary ahead of primary? @@ -69,13 +73,17 @@ do_backup(NodeName, NodeAddress, Start, NTotal) -> N = min(NTotal, plopconfig:get_env(merge_backup_winsize, 1000)), Hashes = index:getrange(logorder, Start, Start + N - 1), ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, plopconfig:get_env(merge_backup_sendlog_chunksize, 1000)), + bench:timingpoint("merge_backup", NodeName, "sendlog"), {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, plopconfig:get_env(merge_backup_sendentries_chunksize, 100)), + bench:timingpoint("merge_backup", NodeName, "sendentries"), Size = Start + N, TreeHead = ht:root(Size - 1), ok = check_root(NodeName, NodeAddress, Size, TreeHead), + bench:timingpoint("merge_backup", NodeName, "verifyroot"), ok = setverifiedsize(NodeName, NodeAddress, Size), + bench:timingpoint("merge_backup", NodeName, "setverifiedsize"), ok = write_backupfile(NodeName, Size, TreeHead), true = NTotal >= N, do_backup(NodeName, NodeAddress, Size, NTotal - N). diff --git a/merge/src/merge_dist.erl b/merge/src/merge_dist.erl index 3c38401..23c9d19 100644 --- a/merge/src/merge_dist.erl +++ b/merge/src/merge_dist.erl @@ -21,6 +21,7 @@ start_link(Args) -> init([Name, Address]) -> lager:info("~p:~p: starting (~p)", [?MODULE, Name, Address]), Timer = erlang:start_timer(1000, self(), dist), + bench:timingpoint("merge_dist", Name, "start"), {ok, #state{timer = Timer, node_name = Name, node_address = Address, @@ -51,6 +52,7 @@ dist({struct, PropList} = STH, #state{node_address = NodeAddress, node_name = NodeName, sth_timestamp = LastTimestamp} = State) -> + bench:timingpoint("merge_dist", NodeName, "idle"), Treesize = proplists:get_value(<<"tree_size">>, PropList), Timestamp = proplists:get_value(<<"timestamp">>, PropList), RootHash = base64:decode(proplists:get_value(<<"sha256_root_hash">>, PropList)), @@ -59,12 +61,14 @@ dist({struct, PropList} = STH, TS = case Timestamp > LastTimestamp of true -> true = plop:verify_sth(Treesize, Timestamp, RootHash, Signature), + bench:timingpoint("merge_dist", NodeName, "verify_sth"), try lager:info("~p: starting dist, sth at ~B, logorder at ~B", [NodeAddress, Treesize, Logordersize]), statusreport:report("merge_dist", NodeName, "targetsth", Treesize), ok = do_dist(NodeAddress, NodeName, min(Treesize, Logordersize)), ok = publish_sth(NodeName, NodeAddress, STH), + bench:timingpoint("merge_dist", NodeName, "publish_sth"), statusreport:report("merge_dist", NodeName, "sth", Treesize), lager:info("~p: Published STH with size ~B and timestamp " ++ "~p.", [NodeAddress, Treesize, Timestamp]), @@ -101,12 +105,16 @@ do_dist(NodeAddress, NodeName, Start, NTotal) -> SendlogChunksize = application:get_env(plop, merge_dist_sendlog_chunksize, 1000), SendentriesChunksize = application:get_env(plop, merge_dist_sendentries_chunksize, 100), ok = merge_util:sendlog(NodeAddress, NodeName, Start, Hashes, SendlogChunksize), + bench:timingpoint("merge_dist", NodeName, "sendlog"), statusreport:report("merge_dist", NodeName, "sendlog", Start + N), {ok, HashesMissingEncoded} = merge_util:missingentries(NodeAddress, NodeName), + bench:timingpoint("merge_dist", NodeName, "missingentries"), lager:debug("number of missing entries: ~B", [length(HashesMissingEncoded)]), HashesMissing = lists:map(fun base64:decode/1, HashesMissingEncoded), ok = merge_util:sendentries(NodeAddress, NodeName, HashesMissing, SendentriesChunksize), + bench:timingpoint("merge_dist", NodeName, "sendentries"), {ok, NewSize} = frontend_verify_entries(NodeName, NodeAddress, Start + N), + bench:timingpoint("merge_dist", NodeName, "verifyentries"), lager:info("~p: Done distributing ~B out of ~B entries.", [NodeAddress, NewSize-Start, NTotal]), statusreport:report("merge_dist", NodeName, "verified", Start + N), diff --git a/src/bench.erl b/src/bench.erl new file mode 100644 index 0000000..06e4777 --- /dev/null +++ b/src/bench.erl @@ -0,0 +1,18 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(bench). + +-export([timingpoint/3]). + +timingpoint(Service, Target, Tag) -> + Thispoint = plop_compat:monotonic_time(millisecond), + Seq = plop_compat:unique_integer([monotonic]), + case get(bench_lastpoint) of + undefined -> + statusreport:bench(Service, Target, Tag, Seq, Thispoint + plop_compat:time_offset(millisecond), null); + Lastpoint -> + statusreport:bench(Service, Target, Tag, Seq, Lastpoint + plop_compat:time_offset(millisecond), Thispoint - Lastpoint) + end, + put(bench_lastpoint, Thispoint), + ok. diff --git a/src/plop_compat.erl b/src/plop_compat.erl index 4d45590..5c1fa17 100644 --- a/src/plop_compat.erl +++ b/src/plop_compat.erl @@ -2,7 +2,7 @@ %%% See LICENSE for licensing information. -module(plop_compat). --export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4]). +-export([unpack_spki/1, timestamp/0, monotonic_time/1, start_timer/4, unique_integer/1, time_offset/1]). -include_lib("public_key/include/public_key.hrl"). unpack_spki(SPKI) -> @@ -13,6 +13,10 @@ monotonic_time(Unit) -> monotonic_time(erlang:system_info(otp_release), Unit). start_timer(Time, Dest, Msg, Options) -> start_timer(erlang:system_info(otp_release), Time, Dest, Msg, Options). +unique_integer(Modifiers) -> + unique_integer(erlang:system_info(otp_release), Modifiers). +time_offset(Unit) -> + time_offset(erlang:system_info(otp_release), Unit). unpack_spki("R16" ++ _, SPKI) -> #'SubjectPublicKeyInfo'{subjectPublicKey = {_, Octets}, @@ -43,15 +47,35 @@ timestamp("19") -> monotonic_time("R16" ++ _, millisecond) -> {MeS, S, MiS} = timestamp(), - MeS * 1000000 + S * 1000 + MiS; + (MeS * 1000000 + S) * 1000 + MiS div 1000; monotonic_time("17", millisecond) -> {MeS, S, MiS} = timestamp(), - MeS * 1000000 + S * 1000 + MiS; + (MeS * 1000000 + S) * 1000 + MiS div 1000; monotonic_time("18", Unit) -> erlang:monotonic_time(Unit); monotonic_time("19", Unit) -> erlang:monotonic_time(Unit). +unique_integer("R16", _Modifiers) -> + {MeS, S, MiS} = erlang:now(), + (MeS * 1000000 + S) * 1000000 + MiS; +unique_integer("17", _Modifiers) -> + {MeS, S, MiS} = erlang:now(), + (MeS * 1000000 + S) * 1000000 + MiS; +unique_integer("18", Modifiers) -> + erlang:unique_integer(Modifiers); +unique_integer("19", Modifiers) -> + erlang:unique_integer(Modifiers). + +time_offset("R16", _Unit) -> + 0; +time_offset("17", _Unit) -> + 0; +time_offset("18", Unit) -> + erlang:time_offset(Unit); +time_offset("19", Unit) -> + erlang:time_offset(Unit). + start_timer("R16" ++ _, Time, Dest, Msg, [{abs, true}]) -> erlang:start_timer(max(0, Time - monotonic_time(millisecond)), Dest, Msg); start_timer("17", Time, Dest, Msg, [{abs, true}]) -> diff --git a/src/statusreport.erl b/src/statusreport.erl index db85b84..8099f86 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -8,7 +8,7 @@ -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). -export([report/4]). --export([report_multi/4]). +-export([report_multi/4, bench/6]). -record(state, { timer :: none|reference(), @@ -32,7 +32,7 @@ init([]) -> lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}. store_status(State, Service, Target, Variable, Status) -> - Statusreports = dict:store({Service, Target, Variable}, + Statusreports = dict:store({{status, Service}, {Target, Variable}}, {single, Status}, State#state.statusreports), State#state{statusreports = Statusreports}. @@ -45,13 +45,19 @@ dict_append_set(Key, Value, Dict) -> {multi, AppendSet}, Dict). store_multi_status(State, Service, Target, Variable, Status) -> - Statusreports = dict_append_set({Service, Target, Variable}, + Statusreports = dict_append_set({{status, Service}, {Target, Variable}}, Status, State#state.statusreports), State#state{statusreports = Statusreports}. +store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed) -> + Statusreports = dict:store({{bench, Service}, {Target, Tag, Seq}}, + {Starttime, Elapsed}, + State#state.statusreports), + State#state{statusreports = Statusreports}. + store_set_status(State, Service, Target, Variable, Statuses) -> - Statusreports = dict:store({Service, Target, Variable}, + Statusreports = dict:store({{status, Service}, {Target, Variable}}, {multi, Statuses}, State#state.statusreports), State#state{statusreports = Statusreports}. @@ -72,6 +78,9 @@ handle_cast({report, Service, Target, Variable, Status}, State) -> {noreply, try_send(NewState)}; handle_cast({report_multi, Service, Target, Variable, Status}, State) -> NewState = store_multi_status(State, Service, Target, Variable, Status), + {noreply, try_send(NewState)}; +handle_cast({bench, Service, Target, Tag, Seq, Starttime, Elapsed}, State) -> + NewState = store_bench(State, Service, Target, Tag, Seq, Starttime, Elapsed), {noreply, try_send(NewState)}. handle_info({timeout, _Timer, force_send}, State) -> @@ -123,8 +132,8 @@ terminate(Reason, State) -> group_by_service(Statusreports) -> dict:to_list( lists:foldl( - fun ({{Service, Target, Variable}, Status}, Acc) -> - dict:append(Service, {Target, Variable, Status}, Acc) + fun ({{Service, Group}, Status}, Acc) -> + dict:append(Service, {Group, Status}, Acc) end, dict:new(), dict:to_list(Statusreports))). encode_one_status(Status) when is_number(Status) -> @@ -141,22 +150,43 @@ encode_status({single, Status}) -> encode_status({multi, Statuses}) -> lists:map(fun encode_one_status/1, sets:to_list(Statuses)). -send(Service, Statusreports, Nodename) -> - lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), - [NodeAddress] = plopconfig:get_env(statusservers, []), +encode_report(status, Nodename, {{Target, Variable}, Status}) -> + {struct, + [{"target", list_to_binary(Target)}, + {"source", list_to_binary(Nodename)}, + {"key", list_to_binary(Variable)}, + {"value", encode_status(Status)}]}; + +encode_report(bench, Nodename, {{Target, Tag, Seq}, {Starttime, Elapsed}}) -> + {struct, + [{"target", list_to_binary(Target)}, + {"source", list_to_binary(Nodename)}, + {"tag", list_to_binary(Tag)}, + {"seq", Seq}, + {"starttime", Starttime}, + {"elapsed", Elapsed}]}. + +addresses_for_servicetype(status) -> + plopconfig:get_env(statusservers, []); +addresses_for_servicetype(bench) -> + plopconfig:get_env(benchservers, []). + +send({ServiceType, Service}, Statusreports, Nodename) -> + lager:debug("reporting status to ~p ~p: ~p", [ServiceType, Service, Statusreports]), + NodeAddresses = addresses_for_servicetype(ServiceType), DebugTag = "statusreport", - URL = NodeAddress ++ Service, Headers = [{"Content-Type", "text/json"}], RequestBody = list_to_binary( mochijson2:encode( [ - {struct, - [{"target", list_to_binary(Target)}, - {"source", list_to_binary(Nodename)}, - {"key", list_to_binary(Variable)}, - {"value", encode_status(Status)}]} - || {Target, Variable, Status} <- Statusreports + encode_report(ServiceType, Nodename, Statusreport) + || Statusreport <- Statusreports ])), + lists:foreach(fun (NodeAddress) -> + send_one_server(DebugTag, NodeAddress ++ Service, Headers, RequestBody) + end, NodeAddresses). + +send_one_server(DebugTag, URL, Headers, RequestBody) -> case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of {error, Err} -> lager:debug("request error ~p ~p", [DebugTag, Err]); @@ -187,7 +217,7 @@ try_send(State) -> lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), set_timer(State); true -> - lager:debug("status report send long enough ago"), + lager:debug("status report sent long enough ago"), force_send(State) end. @@ -198,3 +228,7 @@ report(Service, Target, Variable, Status) when is_number(Status); is_list(Status report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). + +bench(Service, Target, Tag, Seq, Starttime, Elapsed) -> + lager:debug("reporting bench ~p ~p ~p ~p ~p ~p", [Service, Target, Tag, Seq, Starttime, Elapsed]), + gen_server:cast(?MODULE, {bench, Service, Target, Tag, Seq, Starttime, Elapsed}). diff --git a/statusserver/src/statusserver.erl b/statusserver/src/statusserver.erl index 81fcd7a..ca11e7c 100644 --- a/statusserver/src/statusserver.erl +++ b/statusserver/src/statusserver.erl @@ -7,6 +7,7 @@ -export([request/4]). -define(APPURL_PLOP_STATUS, "plop/v1/status"). +-define(APPURL_PLOP_BENCH, "plop/v1/bench"). request(post, ?APPURL_PLOP_STATUS, Service, Input) -> case (catch mochijson2:decode(Input)) of @@ -22,7 +23,23 @@ request(post, ?APPURL_PLOP_STATUS, Service, Input) -> end, Entries) end, success({[{result, <<"ok">>}]}); -request(get, "", "status", Input) -> +request(post, ?APPURL_PLOP_BENCH, Service, Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("bad input:", E); + Entries when is_list(Entries) -> + lists:foreach(fun ({struct, PropList}) -> + Target = proplists:get_value(<<"target">>, PropList), + Source = proplists:get_value(<<"source">>, PropList), + Tag = proplists:get_value(<<"tag">>, PropList), + Seq = proplists:get_value(<<"seq">>, PropList), + Starttime = proplists:get_value(<<"starttime">>, PropList), + Elapsed = proplists:get_value(<<"elapsed">>, PropList), + add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed) + end, Entries) + end, + success({[{result, <<"ok">>}]}); +request(get, "", "status", _Input) -> Now = plop_compat:monotonic_time(millisecond), Variables = [{struct, [ {service, list_to_binary(Service)}, @@ -32,6 +49,16 @@ request(get, "", "status", Input) -> {status, Status}, {age, (Now - Timestamp) / 1000} ]} || {{Service, Source, Target, Variable}, Status, Timestamp} <- get_status()], + success({[{result, Variables}]}); +request(get, "", "bench", _Input) -> + Variables = [{struct, [ + {service, list_to_binary(Service)}, + {source, Source}, + {target, Target}, + {tag, Tag}, + {starttime, Starttime}, + {elapsed, Elapsed} + ]} || {{Service, Source, Target, Tag, _Seq, _Starttime}, Starttime, Elapsed} <- get_bench()], success({[{result, Variables}]}). @@ -47,25 +74,36 @@ html(Text, Input) -> "~n", [Text, Input])}. -define(STATUSDB_TABLE, statusserver_statusdb). +-define(BENCHDB_TABLE, statusserver_benchdb). init_module() -> - create_statusdb(). + create_table(?STATUSDB_TABLE), + create_table(?BENCHDB_TABLE). -create_statusdb() -> - case ets:info(?STATUSDB_TABLE) of +create_table(Table) -> + case ets:info(Table) of undefined -> ok; _ -> - ets:delete(?STATUSDB_TABLE) + ets:delete(Table) end, - ets:new(?STATUSDB_TABLE, [set, public, named_table]). + ets:new(Table, [set, public, named_table]). get_status() -> [E || [E] <- ets:match(?STATUSDB_TABLE, '$1')]. +get_bench() -> + [E || [E] <- ets:match(?BENCHDB_TABLE, '$1')]. + set_status(Service, Source, Target, Variable, Status) -> lager:info("status: ~p ~p ~p ~p ~p", [Service, Source, Target, Variable, Status]), Timestamp = plop_compat:monotonic_time(millisecond), true = ets:insert(?STATUSDB_TABLE, {{Service, Source, Target, Variable}, Status, Timestamp}), ok. + +add_bench(Service, Source, Target, Tag, Seq, Starttime, Elapsed) -> + lager:info("bench: ~p ~p ~p ~p ~p ~p ~p", [Service, Source, Target, Tag, Seq, Starttime, Elapsed]), + true = ets:insert(?BENCHDB_TABLE, + {{Service, Source, Target, Tag, Seq, Starttime}, Starttime, Elapsed}), + ok. -- cgit v1.1