diff options
author | Linus Nordberg <linus@sunet.se> | 2019-07-02 17:34:59 +0200 |
---|---|---|
committer | Linus Nordberg <linus@sunet.se> | 2019-07-02 17:34:59 +0200 |
commit | 74bf309efefb091dfeedc9c8a452f74e0385e984 (patch) | |
tree | e61555db9b8d6b36d2206265ff28c7de95ade3e7 | |
parent | 285606f3278e1396badf63eb21c052fc167418eb (diff) |
implement load balancing
-rw-r--r-- | p11p-daemon/config/sys.config | 4 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_app.erl | 6 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_config.erl | 25 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_remote_manager.erl | 81 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_server.erl | 7 |
5 files changed, 87 insertions, 36 deletions
diff --git a/p11p-daemon/config/sys.config b/p11p-daemon/config/sys.config index b04a3b7..bf7a93d 100644 --- a/p11p-daemon/config/sys.config +++ b/p11p-daemon/config/sys.config @@ -5,7 +5,9 @@ {loglevel, 3}, {groups, [{"vtoken0", - [{mode, failover, [timeout]}, % {mode, failover|balance, [timeout]|[TBD]} + [ + {mode, {balance, [2]}}, + %%{mode, failover, [timeout]}, {modules, [{"softhsm2", "/usr/lib/softhsm/libsofthsm2.so"}, {"bogusmod_0", "/usr/lib/softhsm/libsofthsm2.so"}]} diff --git a/p11p-daemon/src/p11p_app.erl b/p11p-daemon/src/p11p_app.erl index 5d53ec8..8bcb41a 100644 --- a/p11p-daemon/src/p11p_app.erl +++ b/p11p-daemon/src/p11p_app.erl @@ -10,13 +10,11 @@ start(_Type, _Args) -> % Args from 'mod' in application spec. -spec prep_stop(term()) -> term(). prep_stop(State) -> - lager:debug("p11p: cleaning up"), + lager:debug("p11p cleaning up"), p11p_server_sup:cleanup(), State. --spec stop([]) -> ok. stop(_State) -> - lager:info("p11p stopped"), - ok. + lager:info("p11p stopped"). %% Private. diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index a8fde91..23b79bb 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -7,6 +7,7 @@ -export([nameof/1]). -export([tokens/0]). -export([modules_for_token/1, module_path/1, token_mode/1]). +-export_type([token_mode_t/0]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -19,11 +20,11 @@ }). -type p11module() :: #p11module{}. --type token_mode() :: {failover | balance, [term()]}. +-type token_mode_t() :: {failover, [timeout]} | {balance, [non_neg_integer()]}. -record(token, { name :: string(), - mode :: token_mode(), + mode :: token_mode_t(), modules = #{} :: #{string() => p11module()} }). -type token() :: #token{}. @@ -45,7 +46,7 @@ start_link() -> tokens() -> gen_server:call(?MODULE, tokens). --spec token_mode(string()) -> token_mode(). +-spec token_mode(string()) -> token_mode_t(). token_mode(TokName) -> gen_server:call(?MODULE, {token_mode, TokName}). @@ -115,10 +116,13 @@ conf_tokens([H = {Name, _}|T], Acc) -> -spec new_token({string(), [tuple()]}) -> token(). new_token({Name, Settings}) -> + Modules = conf_modules(proplists:get_value(modules, Settings)), + Mode = mode(proplists:get_value(mode, Settings, {failover, [timeout]}), + maps:size(Modules)), #token{ name = Name, - mode = proplists:get_value(mode, Settings, {failover, [timeout]}), - modules = conf_modules(proplists:get_value(modules, Settings)) + mode = Mode, + modules = Modules }. conf_modules(L) -> @@ -134,6 +138,12 @@ new_module(Name, Path) -> path = Path }. +-spec mode(p11p_config:token_mode_t(), non_neg_integer()) -> p11p_config:token_mode_t(). +mode({balance, Args}, NModules) -> + {balance, Args ++ [1 || _ <- lists:seq(1, NModules - length(Args))]}; +mode(Conf, _) -> + Conf. + %%%%%%%%%%%%%% %% Unit tests. @@ -145,7 +155,8 @@ tokens_init_test_() -> conf_tokens( [ {"vtoken0", - [{modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"}, + [{mode, {balance, [3]}}, + {modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"}, {"bogusmod0_1", "/path/to/bogusmod0_1"} ]}]}, {"vtoken1", @@ -156,7 +167,7 @@ tokens_init_test_() -> [?_assertEqual( #{"vtoken0" => {token,"vtoken0", - {failover,[timeout]}, + {balance,[3,1]}, #{"bogusmod0_0" => {p11module,"bogusmod0_0", "/path/to/bogusmod0_0"}, "bogusmod0_1" => diff --git a/p11p-daemon/src/p11p_remote_manager.erl b/p11p-daemon/src/p11p_remote_manager.erl index 5fe5bc7..23c69a7 100644 --- a/p11p-daemon/src/p11p_remote_manager.erl +++ b/p11p-daemon/src/p11p_remote_manager.erl @@ -36,12 +36,14 @@ tokname :: string(), servid :: atom(), modpath :: string(), + balance :: integer(), pid :: pid() | undefined }). -record(token, { - remotes :: [#remote{}], % Active remote in hd(). - replay = <<>> :: binary() % FIXME: seems unfeasable, remove + mode :: p11p_config:token_mode_t(), + balance_count :: integer(), + remotes :: [#remote{}] % Active remote in hd(). }). -record(state, { @@ -68,13 +70,27 @@ init([]) -> handle_call({remote_for_token, TokName}, _From, #state{tokens = Tokens} = State) -> #{TokName := Token} = Tokens, - Remotes = Token#token.remotes, + lager:debug("all remotes: ~p", [Token#token.remotes]), + {Remotes, NewBC} = + case Token#token.balance_count of + 0 -> + lager:debug("~p: balancing: next remote", [self()]), + R0 = rotate_remotes(Token#token.remotes), + R = hd(R0), + {R0, R#remote.balance - 1}; + N when N > 0 -> + lager:debug("~p: balancing: ~B more invocations", [self(), N]), + {Token#token.remotes, N - 1}; + -1 -> + {Token#token.remotes, -1} + end, #remote{tokname = TokName, servid = ServId, modpath = ModPath, pid = Pid} = Remote = hd(Remotes), case Pid of undefined -> {ok, NewPid} = p11p_remote:start_link(ServId, TokName, ModPath), NewRemote = Remote#remote{pid = NewPid}, - NewToken = Token#token{remotes = [NewRemote | tl(Remotes)]}, + NewToken = Token#token{remotes = [NewRemote | tl(Remotes)], + balance_count = NewBC}, NewState = State#state{tokens = Tokens#{TokName := NewToken}}, {reply, NewPid, NewState}; _ -> @@ -135,23 +151,48 @@ init_tokens([], Acc)-> lager:debug("~p: created tokens from config: ~p", [self(), Acc]), Acc; init_tokens([H|T], Acc)-> - TokName = p11p_config:nameof(H), - init_tokens(T, Acc#{TokName => new_token(TokName, H)}). - -new_token(TokName, ConfToken) -> - #token{remotes = remotes(TokName, p11p_config:modules_for_token(p11p_config:nameof(ConfToken)))}. - -remotes(TokName, ConfModules) -> - remotes(TokName, ConfModules, []). -remotes(_, [], Acc) -> + init_tokens(T, Acc#{p11p_config:nameof(H) => new_token(H)}). + +new_token(Conf) -> + Name = p11p_config:nameof(Conf), + Mode = p11p_config:token_mode(Name), + Remotes = remotes(Name, + p11p_config:modules_for_token(Name), + Mode), + R0 = hd(Remotes), + #token{ + mode = p11p_config:token_mode(Name), + balance_count = R0#remote.balance, + remotes = Remotes + }. + +remotes(TokName, ConfModules, ConfMode) -> + remotes(TokName, ConfModules, ConfMode, []). +remotes(_, [], _, Acc) -> Acc; -remotes(TokName, [H|T], Acc) -> +remotes(TokName, [H|T], ConfMode, Acc) -> ModName = p11p_config:nameof(H), ServName = "p11p_remote:" ++ TokName ++ ":" ++ ModName, ModPath = p11p_config:module_path(H), - remotes(TokName, T, [#remote{ - tokname = TokName, - servid = list_to_atom(ServName), - modpath = ModPath, - pid = undefined - } | Acc]). + remotes(TokName, T, ConfMode, [#remote{ + tokname = TokName, + servid = list_to_atom(ServName), + modpath = ModPath, + balance = balance(ConfMode, length(T) + 1) + } + | Acc]). + +-spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). +balance({balance, Ratios}, N) -> + lists:nth(N, Ratios); +balance(_, _) -> + -1. + +%% -spec balance_count(p11p_config:token_mode_t()) -> integer(). +%% balance_count(#token{mode = {balance, _}, balance_count = C}) -> +%% C - 1; +%% balance_count(_) -> +%% -1. + +rotate_remotes(L) -> + lists:reverse([hd(L) | lists:reverse(tl(L))]). diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index e32a439..55893de 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -81,8 +81,8 @@ handle_info({tcp, _Port, Data}, #state{tokname = TokName, remote = Remote} = Sta <<Version:8, NewData/binary>> = Data, NewRemote = p11p_remote_manager:remote_for_token(TokName), p11p_remote:add_to_outbuf(NewRemote, <<Version>>), - NewState = handle_client_data(State, p11p_rpc:new(), NewData), - {noreply, NewState#state{remote = NewRemote}}; + NewState = State#state{remote = NewRemote}, + {noreply, handle_client_data(NewState, p11p_rpc:new(), NewData)}; handle_info({tcp, _Port, Data}, #state{msg = Msg} = State) -> %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), {noreply, handle_client_data(State, Msg, Data)}; @@ -103,10 +103,9 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. -handle_client_data(#state{tokname = TokName} = State, Msg, Data) -> +handle_client_data(#state{remote = Remote} = State, Msg, Data) -> case p11p_rpc:parse(Msg, Data) of {done, NewMsg} -> - Remote = p11p_remote_manager:remote_for_token(TokName), ok = p11p_remote:request(Remote, NewMsg), State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)}; {needmore, NewMsg} -> |