summaryrefslogtreecommitdiff
path: root/src/statusreport.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/statusreport.erl')
-rw-r--r--src/statusreport.erl169
1 files changed, 169 insertions, 0 deletions
diff --git a/src/statusreport.erl b/src/statusreport.erl
new file mode 100644
index 0000000..cd5bb5a
--- /dev/null
+++ b/src/statusreport.erl
@@ -0,0 +1,169 @@
+%%% Copyright (c) 2017, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(statusreport).
+-behaviour(gen_server).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+-export([report/4]).
+-export([report_multi/4]).
+
+-record(state, {
+ timer :: none|reference(),
+ nodename :: string(),
+ statusreports :: dict:dict(),
+ lastsent :: integer()
+ }).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ lager:info("~p: starting", [?MODULE]),
+ %Timer = erlang:start_timer(1000, self(), dist),
+ {ok, #state{timer = none,
+ nodename = http_auth:own_name(),
+ statusreports = dict:new(),
+ lastsent = erlang:monotonic_time(millisecond) - ReportInterval}}.
+
+store_status(State, Service, Target, Variable, Status) ->
+ Statusreports = dict:store({Service, Target, Variable},
+ {single, Status},
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
+dict_append_set(Key, Value, Dict) ->
+ AppendSet = sets:from_list([Value]),
+ dict:update(Key, fun ({multi, Old}) ->
+ {multi, sets:union(Old, AppendSet)}
+ end,
+ {multi, AppendSet}, Dict).
+
+store_multi_status(State, Service, Target, Variable, Status) ->
+ Statusreports = dict_append_set({Service, Target, Variable},
+ Status,
+ State#state.statusreports),
+ State#state{statusreports = Statusreports}.
+
+handle_call(_, _From, State) ->
+ {noreply, State}.
+
+handle_cast({report, Service, Target, Variable, Status}, State) ->
+ NewState = store_status(State, Service, Target, Variable, Status),
+ {noreply, try_send(NewState)};
+handle_cast({report_multi, Service, Target, Variable, Status}, State) ->
+ NewState = store_multi_status(State, Service, Target, Variable, Status),
+ {noreply, try_send(NewState)}.
+
+handle_info({timeout, _Timer, force_send}, State) ->
+ lager:debug("statusreport timer timeout"),
+ {noreply, force_send(State)}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+cancel_timer(State) ->
+ case State#state.timer of
+ none ->
+ none;
+ _ ->
+ erlang:cancel_timer(State#state.timer)
+ end,
+ State#state{timer = none}.
+
+set_timer(State) ->
+ case State#state.timer of
+ none ->
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ Timer = erlang:start_timer(State#state.lastsent + ReportInterval, self(), force_send, [{abs, true}]),
+ State#state{timer = Timer};
+ _ ->
+ State
+ end.
+
+terminate(Reason, State) ->
+ lager:info("~p terminating: ~p", [?MODULE, Reason]),
+ NewState = cancel_timer(State),
+ case Reason of
+ shutdown ->
+ force_send(NewState);
+ _ ->
+ none
+ end,
+ ok.
+
+
+
+group_by_service(Statusreports) ->
+ dict:to_list(
+ lists:foldl(
+ fun ({{Service, Target, Variable}, Status}, Acc) ->
+ dict:append(Service, {Target, Variable, Status}, Acc)
+ end, dict:new(), dict:to_list(Statusreports))).
+
+encode_status({single, Status}) ->
+ Status;
+encode_status({multi, Statuses}) ->
+ sets:to_list(Statuses).
+
+send(Service, Statusreports, Nodename) ->
+ lager:debug("reporting status to ~p: ~p", [Service, Statusreports]),
+ [NodeAddress] = plopconfig:get_env(statsservers, []),
+ DebugTag = "statusreport",
+ URL = NodeAddress ++ Service,
+ Headers = [{"Content-Type", "text/json"}],
+ RequestBody = list_to_binary(
+ mochijson2:encode(
+ [
+ {struct,
+ [{"target", list_to_binary(Target)},
+ {"source", list_to_binary(Nodename)},
+ {"key", list_to_binary(Variable)},
+ {"value", encode_status(Status)}]}
+ || {Target, Variable, Status} <- Statusreports
+ ])),
+ case plop_httputil:request(DebugTag, URL, Headers, RequestBody) of
+ {error, Err} ->
+ lager:debug("request error ~p ~p", [DebugTag, Err]);
+ {failure, {none, StatusCode, none}, _RespHeaders, _Body} ->
+ lager:debug("request failure ~p ~p", [DebugTag, StatusCode]);
+ {success, {_, StatusCode, _}, _, Body} when StatusCode == 200 ->
+ case (catch mochijson2:decode(Body)) of
+ {error, Err} ->
+ lager:debug("error returned ~p ~p", [DebugTag, Err]);
+ {struct, _PropList} ->
+ none
+ end
+ end.
+
+force_send(State) ->
+ lists:foreach(fun ({Service, Statusreports}) ->
+ send(Service, Statusreports, State#state.nodename)
+ end, group_by_service(State#state.statusreports)),
+ NewState = cancel_timer(State),
+ NewState#state{statusreports = dict:new(), lastsent = erlang:monotonic_time(millisecond)}.
+
+try_send(State) ->
+ ReportInterval = application:get_env(plop, status_report_interval, 1000),
+ NextSend = State#state.lastsent + ReportInterval,
+ Now = erlang:monotonic_time(millisecond),
+ if
+ NextSend > Now ->
+ lager:debug("status report sent ~p ms ago, setting timer", [NextSend - Now]),
+ set_timer(State);
+ true ->
+ lager:debug("status report send long enough ago"),
+ force_send(State)
+ end.
+
+report(Service, Target, Variable, Status) ->
+ lager:debug("reporting status ~p ~p ~p ~p", [Service, Target, Variable, Status]),
+ gen_server:cast(?MODULE, {report, Service, Target, Variable, Status}).
+
+report_multi(Service, Target, Variable, Status) ->
+ lager:debug("reporting multi status ~p ~p ~p ~p", [Service, Target, Variable, Status]),
+ gen_server:cast(?MODULE, {report_multi, Service, Target, Variable, Status}).