From 12e08090358383c5678417ae8929fca1f03ca8bc Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 00:27:59 +0100 Subject: Statusserver --- src/statusreport.erl | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) create mode 100644 src/statusreport.erl (limited to 'src/statusreport.erl') diff --git a/src/statusreport.erl b/src/statusreport.erl new file mode 100644 index 0000000..cd5bb5a --- /dev/null +++ b/src/statusreport.erl @@ -0,0 +1,169 @@ +%%% Copyright (c) 2017, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(statusreport). +-behaviour(gen_server). + +-export([start_link/0]). +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). +-export([report/4]). +-export([report_multi/4]). + +-record(state, { + timer :: none|reference(), + nodename :: string(), + statusreports :: dict:dict(), + lastsent :: integer() + }). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + process_flag(trap_exit, true), + ReportInterval = application:get_env(plop, status_report_interval, 1000), + lager:info("~p: starting", [?MODULE]), + %Timer = erlang:start_timer(1000, self(), dist), + {ok, #state{timer = none, + nodename = http_auth:own_name(), + statusreports = dict:new(), + lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}. + +store_status(State, Service, Target, Variable, Status) -> + Statusreports = dict:store({Service, Target, Variable}, + {single, Status}, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +dict_append_set(Key, Value, Dict) -> + AppendSet = sets:from_list([Value]), + dict:update(Key, fun ({multi, Old}) -> + {multi, sets:union(Old, AppendSet)} + end, + {multi, AppendSet}, Dict). + +store_multi_status(State, Service, Target, Variable, Status) -> + Statusreports = dict_append_set({Service, Target, Variable}, + Status, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +handle_call(_, _From, State) -> + {noreply, State}. + +handle_cast({report, Service, Target, Variable, Status}, State) -> + NewState = store_status(State, Service, Target, Variable, Status), + {noreply, try_send(NewState)}; +handle_cast({report_multi, Service, Target, Variable, Status}, State) -> + NewState = store_multi_status(State, Service, Target, Variable, Status), + {noreply, try_send(NewState)}. + +handle_info({timeout, _Timer, force_send}, State) -> + lager:debug("statusreport timer timeout"), + {noreply, force_send(State)}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +cancel_timer(State) -> + case State#state.timer of + none -> + none; + _ -> + erlang:cancel_timer(State#state.timer) + end, + State#state{timer = none}. + +set_timer(State) -> + case State#state.timer of + none -> + ReportInterval = application:get_env(plop, status_report_interval, 1000), + Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), + State#state{timer = Timer}; + _ -> + State + end. + +terminate(Reason, State) -> + lager:info("~p terminating: ~p", [?MODULE, Reason]), + NewState = cancel_timer(State), + case Reason of + shutdown -> + force_send(NewState); + _ -> + none + end, + ok. + + + +group_by_service(Statusreports) -> + dict:to_list( + lists:foldl( + fun ({{Service, Target, Variable}, Status}, Acc) -> + dict:append(Service, {Target, Variable, Status}, Acc) + end, dict:new(), dict:to_list(Statusreports))). + +encode_status({single, Status}) -> + Status; +encode_status({multi, Statuses}) -> + sets:to_list(Statuses). + +send(Service, Statusreports, Nodename) -> + lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), + [NodeAddress] = plopconfig:get_env(statsservers, []), + DebugTag = "statusreport", + URL = NodeAddress ++ Service, + Headers = [{"Content-Type", "text/json"}], + RequestBody = list_to_binary( + mochijson2:encode( + [ + {struct, + [{"target", list_to_binary(Target)}, + {"source", list_to_binary(Nodename)}, + {"key", list_to_binary(Variable)}, + {"value", encode_status(Status)}]} + || {Target, Variable, Status} <- Statusreports + ])), + case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of + {error, Err} -> + lager:debug("request error ~p ~p", [DebugTag, Err]); + {failure, {none, StatusCode, none}, _RespHeaders, _Body} -> + lager:debug("request failure ~p ~p", [DebugTag, StatusCode]); + {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 -> + case (catch mochijson2:decode(Body)) of + {error, Err} -> + lager:debug("error returned ~p ~p", [DebugTag, Err]); + {struct, _PropList} -> + none + end + end. + +force_send(State) -> + lists:foreach(fun ({Service, Statusreports}) -> + send(Service, Statusreports, State#state.nodename) + end, group_by_service(State#state.statusreports)), + NewState = cancel_timer(State), + NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}. + +try_send(State) -> + ReportInterval = application:get_env(plop, status_report_interval, 1000), + NextSend = State#state.lastsent + ReportInterval, + Now = erlang:monotonic_time(millisecond), + if + NextSend > Now -> + lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), + set_timer(State); + true -> + lager:debug("status report send long enough ago"), + force_send(State) + end. + +report(Service, Target, Variable, Status) -> + lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), + gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). + +report_multi(Service, Target, Variable, Status) -> + lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), + gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). -- cgit v1.1 From 64daaf148cd59bf19942014bc754992b6bc6d86d Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 12:52:16 +0100 Subject: Rename to statusserver --- src/statusreport.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/statusreport.erl') diff --git a/src/statusreport.erl b/src/statusreport.erl index cd5bb5a..63414cd 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -112,7 +112,7 @@ encode_status({multi, Statuses}) -> send(Service, Statusreports, Nodename) -> lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), - [NodeAddress] = plopconfig:get_env(statsservers, []), + [NodeAddress] = plopconfig:get_env(statusservers, []), DebugTag = "statusreport", URL = NodeAddress ++ Service, Headers = [{"Content-Type", "text/json"}], -- cgit v1.1 From 27b809c9525a876ecde0a5346e0264643197d934 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Wed, 8 Mar 2017 23:20:36 +0100 Subject: Added heartbeat service. Add source. Send better messages. --- src/statusreport.erl | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) (limited to 'src/statusreport.erl') diff --git a/src/statusreport.erl b/src/statusreport.erl index 63414cd..f0b7503 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -24,7 +24,8 @@ init([]) -> process_flag(trap_exit, true), ReportInterval = application:get_env(plop, status_report_interval, 1000), lager:info("~p: starting", [?MODULE]), - %Timer = erlang:start_timer(1000, self(), dist), + HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), + erlang:start_timer(HeartbeatInterval, self(), heartbeat), {ok, #state{timer = none, nodename = http_auth:own_name(), statusreports = dict:new(), @@ -49,6 +50,20 @@ store_multi_status(State, Service, Target, Variable, Status) -> State#state.statusreports), State#state{statusreports = Statusreports}. +store_set_status(State, Service, Target, Variable, Statuses) -> + Statusreports = dict:store({Service, Target, Variable}, + {multi, Statuses}, + State#state.statusreports), + State#state{statusreports = Statusreports}. + +heartbeat(State) -> + {ok, ConfigVersion} = plopconfig:get_env(version), + RunningApps = [atom_to_list(App) ++ " " ++ Vsn || {App, _Desc, Vsn} <- application:which_applications()], + + NewState1 = store_status(State, "heartbeat", "", "configversion", ConfigVersion), + NewState2 = store_set_status(NewState1, "heartbeat", "", "applications", sets:from_list(RunningApps)), + NewState2. + handle_call(_, _From, State) -> {noreply, State}. @@ -61,7 +76,14 @@ handle_cast({report_multi, Service, Target, Variable, Status}, State) -> handle_info({timeout, _Timer, force_send}, State) -> lager:debug("statusreport timer timeout"), - {noreply, force_send(State)}. + {noreply, force_send(State)}; + +handle_info({timeout, _Timer, heartbeat}, State) -> + lager:debug("statusreport timer timeout"), + HeartbeatInterval = application:get_env(plop, heartbeat_interval, 1000), + erlang:start_timer(HeartbeatInterval, self(), heartbeat), + NewState = heartbeat(State), + {noreply, try_send(NewState)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -105,10 +127,17 @@ group_by_service(Statusreports) -> dict:append(Service, {Target, Variable, Status}, Acc) end, dict:new(), dict:to_list(Statusreports))). -encode_status({single, Status}) -> +encode_one_status(Status) when is_number(Status) -> Status; +encode_one_status(Status) when is_list(Status) -> + list_to_binary(Status); +encode_one_status(Status) when is_binary(Status) -> + Status. + +encode_status({single, Status}) -> + encode_one_status(Status); encode_status({multi, Statuses}) -> - sets:to_list(Statuses). + lists:map(fun encode_one_status/1, sets:to_list(Statuses)). send(Service, Statusreports, Nodename) -> lager:debug("reporting status to ~p: ~p", [Service, Statusreports]), @@ -160,10 +189,10 @@ try_send(State) -> force_send(State) end. -report(Service, Target, Variable, Status) -> +report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). -report_multi(Service, Target, Variable, Status) -> +report_multi(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}). -- cgit v1.1 From 39cc6c19e79c19000d5e2174aa3b5f5c2ed2545b Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 15 Mar 2017 17:16:06 +0100 Subject: Add compat functions for timing functionality missing in R17. --- src/statusreport.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/statusreport.erl') diff --git a/src/statusreport.erl b/src/statusreport.erl index f0b7503..a9fef7f 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -29,7 +29,7 @@ init([]) -> {ok, #state{timer = none, nodename = http_auth:own_name(), statusreports = dict:new(), - lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}. + lastsent = plop_compat:monotonic_time(millisecond) - ReportInterval}}. store_status(State, Service, Target, Variable, Status) -> Statusreports = dict:store({Service, Target, Variable}, @@ -101,7 +101,7 @@ set_timer(State) -> case State#state.timer of none -> ReportInterval = application:get_env(plop, status_report_interval, 1000), - Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), + Timer = plop_compat:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]), State#state{timer = Timer}; _ -> State @@ -174,12 +174,12 @@ force_send(State) -> send(Service, Statusreports, State#state.nodename) end, group_by_service(State#state.statusreports)), NewState = cancel_timer(State), - NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}. + NewState#state{statusreports = dict:new(), lastsent = plop_compat:monotonic_time(millisecond)}. try_send(State) -> ReportInterval = application:get_env(plop, status_report_interval, 1000), NextSend = State#state.lastsent + ReportInterval, - Now = erlang:monotonic_time(millisecond), + Now = plop_compat:monotonic_time(millisecond), if NextSend > Now -> lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]), -- cgit v1.1 From 8bb572816040a8ecda50be9687cd1ddc76436f65 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Tue, 14 Mar 2017 14:58:41 +0100 Subject: Handle 'null' case in statusreport. --- src/statusreport.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/statusreport.erl') diff --git a/src/statusreport.erl b/src/statusreport.erl index a9fef7f..db85b84 100644 --- a/src/statusreport.erl +++ b/src/statusreport.erl @@ -132,7 +132,9 @@ encode_one_status(Status) when is_number(Status) -> encode_one_status(Status) when is_list(Status) -> list_to_binary(Status); encode_one_status(Status) when is_binary(Status) -> - Status. + Status; +encode_one_status(null) -> + null. encode_status({single, Status}) -> encode_one_status(Status); @@ -189,7 +191,7 @@ try_send(State) -> force_send(State) end. -report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status) -> +report(Service, Target, Variable, Status) when is_number(Status); is_list(Status); is_binary(Status); Status == null -> lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]), gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}). -- cgit v1.1