summaryrefslogtreecommitdiff
path: root/src/ratelimit.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ratelimit.erl')
-rw-r--r--src/ratelimit.erl87
1 files changed, 87 insertions, 0 deletions
diff --git a/src/ratelimit.erl b/src/ratelimit.erl
new file mode 100644
index 0000000..16d0f30
--- /dev/null
+++ b/src/ratelimit.erl
@@ -0,0 +1,87 @@
+%%% Copyright (c) 2015, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+%%%
+
+-module(ratelimit).
+-behaviour(gen_server).
+
+-export([start_link/0, stop/0]).
+-export([get_token/1]).
+%% gen_server callbacks.
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE,
+ application:get_env(catlfish, ratelimits, []), []).
+
+stop() ->
+ gen_server:call(?MODULE, stop).
+
+get_token(Type) ->
+ gen_server:call(?MODULE, {get_token, Type}).
+
+init(Types) ->
+ lager:debug("starting ratelimit service"),
+ State = dict:from_list([{Type, {Rules, queue:new()}}
+ || {Type, Rules} <- Types]),
+ {ok, State}.
+
+rule_interval_atom([{_, Interval}]) ->
+ Interval.
+
+rule_interval([{_, second}]) ->
+ 1000;
+rule_interval([{_, minute}]) ->
+ 60*1000;
+rule_interval([{_, hour}]) ->
+ 60*60*1000.
+
+rule_times([{Times, _}]) when is_integer(Times) ->
+ Times.
+
+clean_queue(Interval, Queue) ->
+ Now = plop:generate_timestamp(),
+ case queue:peek(Queue) of
+ {value, Item} when Item + Interval < Now ->
+ clean_queue(Interval, queue:drop(Queue));
+ _ ->
+ Queue
+ end.
+
+get_token_for_type({none, Queue}) ->
+ {ok, {none, Queue}};
+get_token_for_type({Rules, Queue}) ->
+ CleanedQueue = clean_queue(rule_interval(Rules), Queue),
+ MaxTimes = rule_times(Rules),
+ QueueLength = queue:len(CleanedQueue),
+ if
+ QueueLength < MaxTimes ->
+ Now = plop:generate_timestamp(),
+ {ok, {Rules, queue:in(Now, CleanedQueue)}};
+ true ->
+ {overload, {Rules, CleanedQueue}}
+ end.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call({get_token, Type}, _From, State) ->
+ case dict:find(Type, State) of
+ {ok, TypeState} ->
+ {Result, NewTypeState} = get_token_for_type(TypeState),
+ {Rules, Queue} = NewTypeState,
+ lager:debug("current rate: ~p per ~p",
+ [queue:len(Queue), rule_interval_atom(Rules)]),
+ {reply, Result, dict:store(Type, NewTypeState, State)};
+ error ->
+ {reply, ok, State}
+ end.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVersion, State, _Extra) ->
+ {ok, State}.
+terminate(_Reason, _State) ->
+ ok.