From 4e26b3679e9743690a85c9f72f7f4fc8ea8fd3f0 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 11 Jun 2015 16:38:30 +0200 Subject: Implement rate limiting of add_chain --- src/ratelimit.erl | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 src/ratelimit.erl (limited to 'src/ratelimit.erl') diff --git a/src/ratelimit.erl b/src/ratelimit.erl new file mode 100644 index 0000000..16d0f30 --- /dev/null +++ b/src/ratelimit.erl @@ -0,0 +1,87 @@ +%%% Copyright (c) 2015, NORDUnet A/S. +%%% See LICENSE for licensing information. +%%% + +-module(ratelimit). +-behaviour(gen_server). + +-export([start_link/0, stop/0]). +-export([get_token/1]). +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, + application:get_env(catlfish, ratelimits, []), []). + +stop() -> + gen_server:call(?MODULE, stop). + +get_token(Type) -> + gen_server:call(?MODULE, {get_token, Type}). + +init(Types) -> + lager:debug("starting ratelimit service"), + State = dict:from_list([{Type, {Rules, queue:new()}} + || {Type, Rules} <- Types]), + {ok, State}. + +rule_interval_atom([{_, Interval}]) -> + Interval. + +rule_interval([{_, second}]) -> + 1000; +rule_interval([{_, minute}]) -> + 60*1000; +rule_interval([{_, hour}]) -> + 60*60*1000. + +rule_times([{Times, _}]) when is_integer(Times) -> + Times. + +clean_queue(Interval, Queue) -> + Now = plop:generate_timestamp(), + case queue:peek(Queue) of + {value, Item} when Item + Interval < Now -> + clean_queue(Interval, queue:drop(Queue)); + _ -> + Queue + end. + +get_token_for_type({none, Queue}) -> + {ok, {none, Queue}}; +get_token_for_type({Rules, Queue}) -> + CleanedQueue = clean_queue(rule_interval(Rules), Queue), + MaxTimes = rule_times(Rules), + QueueLength = queue:len(CleanedQueue), + if + QueueLength < MaxTimes -> + Now = plop:generate_timestamp(), + {ok, {Rules, queue:in(Now, CleanedQueue)}}; + true -> + {overload, {Rules, CleanedQueue}} + end. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call({get_token, Type}, _From, State) -> + case dict:find(Type, State) of + {ok, TypeState} -> + {Result, NewTypeState} = get_token_for_type(TypeState), + {Rules, Queue} = NewTypeState, + lager:debug("current rate: ~p per ~p", + [queue:len(Queue), rule_interval_atom(Rules)]), + {reply, Result, dict:store(Type, NewTypeState, State)}; + error -> + {reply, ok, State} + end. + +handle_cast(_Request, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +code_change(_OldVersion, State, _Extra) -> + {ok, State}. +terminate(_Reason, _State) -> + ok. -- cgit v1.1