summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@sunet.se>2019-07-02 17:34:59 +0200
committerLinus Nordberg <linus@sunet.se>2019-07-02 17:34:59 +0200
commit74bf309efefb091dfeedc9c8a452f74e0385e984 (patch)
treee61555db9b8d6b36d2206265ff28c7de95ade3e7
parent285606f3278e1396badf63eb21c052fc167418eb (diff)
implement load balancing
-rw-r--r--p11p-daemon/config/sys.config4
-rw-r--r--p11p-daemon/src/p11p_app.erl6
-rw-r--r--p11p-daemon/src/p11p_config.erl25
-rw-r--r--p11p-daemon/src/p11p_remote_manager.erl81
-rw-r--r--p11p-daemon/src/p11p_server.erl7
5 files changed, 87 insertions, 36 deletions
diff --git a/p11p-daemon/config/sys.config b/p11p-daemon/config/sys.config
index b04a3b7..bf7a93d 100644
--- a/p11p-daemon/config/sys.config
+++ b/p11p-daemon/config/sys.config
@@ -5,7 +5,9 @@
{loglevel, 3},
{groups,
[{"vtoken0",
- [{mode, failover, [timeout]}, % {mode, failover|balance, [timeout]|[TBD]}
+ [
+ {mode, {balance, [2]}},
+ %%{mode, failover, [timeout]},
{modules,
[{"softhsm2", "/usr/lib/softhsm/libsofthsm2.so"},
{"bogusmod_0", "/usr/lib/softhsm/libsofthsm2.so"}]}
diff --git a/p11p-daemon/src/p11p_app.erl b/p11p-daemon/src/p11p_app.erl
index 5d53ec8..8bcb41a 100644
--- a/p11p-daemon/src/p11p_app.erl
+++ b/p11p-daemon/src/p11p_app.erl
@@ -10,13 +10,11 @@ start(_Type, _Args) -> % Args from 'mod' in application spec.
-spec prep_stop(term()) -> term().
prep_stop(State) ->
- lager:debug("p11p: cleaning up"),
+ lager:debug("p11p cleaning up"),
p11p_server_sup:cleanup(),
State.
--spec stop([]) -> ok.
stop(_State) ->
- lager:info("p11p stopped"),
- ok.
+ lager:info("p11p stopped").
%% Private.
diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl
index a8fde91..23b79bb 100644
--- a/p11p-daemon/src/p11p_config.erl
+++ b/p11p-daemon/src/p11p_config.erl
@@ -7,6 +7,7 @@
-export([nameof/1]).
-export([tokens/0]).
-export([modules_for_token/1, module_path/1, token_mode/1]).
+-export_type([token_mode_t/0]).
%% Genserver callbacks.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -19,11 +20,11 @@
}).
-type p11module() :: #p11module{}.
--type token_mode() :: {failover | balance, [term()]}.
+-type token_mode_t() :: {failover, [timeout]} | {balance, [non_neg_integer()]}.
-record(token, {
name :: string(),
- mode :: token_mode(),
+ mode :: token_mode_t(),
modules = #{} :: #{string() => p11module()}
}).
-type token() :: #token{}.
@@ -45,7 +46,7 @@ start_link() ->
tokens() ->
gen_server:call(?MODULE, tokens).
--spec token_mode(string()) -> token_mode().
+-spec token_mode(string()) -> token_mode_t().
token_mode(TokName) ->
gen_server:call(?MODULE, {token_mode, TokName}).
@@ -115,10 +116,13 @@ conf_tokens([H = {Name, _}|T], Acc) ->
-spec new_token({string(), [tuple()]}) -> token().
new_token({Name, Settings}) ->
+ Modules = conf_modules(proplists:get_value(modules, Settings)),
+ Mode = mode(proplists:get_value(mode, Settings, {failover, [timeout]}),
+ maps:size(Modules)),
#token{
name = Name,
- mode = proplists:get_value(mode, Settings, {failover, [timeout]}),
- modules = conf_modules(proplists:get_value(modules, Settings))
+ mode = Mode,
+ modules = Modules
}.
conf_modules(L) ->
@@ -134,6 +138,12 @@ new_module(Name, Path) ->
path = Path
}.
+-spec mode(p11p_config:token_mode_t(), non_neg_integer()) -> p11p_config:token_mode_t().
+mode({balance, Args}, NModules) ->
+ {balance, Args ++ [1 || _ <- lists:seq(1, NModules - length(Args))]};
+mode(Conf, _) ->
+ Conf.
+
%%%%%%%%%%%%%%
%% Unit tests.
@@ -145,7 +155,8 @@ tokens_init_test_() ->
conf_tokens(
[
{"vtoken0",
- [{modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"},
+ [{mode, {balance, [3]}},
+ {modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"},
{"bogusmod0_1", "/path/to/bogusmod0_1"}
]}]},
{"vtoken1",
@@ -156,7 +167,7 @@ tokens_init_test_() ->
[?_assertEqual(
#{"vtoken0" =>
{token,"vtoken0",
- {failover,[timeout]},
+ {balance,[3,1]},
#{"bogusmod0_0" =>
{p11module,"bogusmod0_0", "/path/to/bogusmod0_0"},
"bogusmod0_1" =>
diff --git a/p11p-daemon/src/p11p_remote_manager.erl b/p11p-daemon/src/p11p_remote_manager.erl
index 5fe5bc7..23c69a7 100644
--- a/p11p-daemon/src/p11p_remote_manager.erl
+++ b/p11p-daemon/src/p11p_remote_manager.erl
@@ -36,12 +36,14 @@
tokname :: string(),
servid :: atom(),
modpath :: string(),
+ balance :: integer(),
pid :: pid() | undefined
}).
-record(token, {
- remotes :: [#remote{}], % Active remote in hd().
- replay = <<>> :: binary() % FIXME: seems unfeasable, remove
+ mode :: p11p_config:token_mode_t(),
+ balance_count :: integer(),
+ remotes :: [#remote{}] % Active remote in hd().
}).
-record(state, {
@@ -68,13 +70,27 @@ init([]) ->
handle_call({remote_for_token, TokName}, _From, #state{tokens = Tokens} = State) ->
#{TokName := Token} = Tokens,
- Remotes = Token#token.remotes,
+ lager:debug("all remotes: ~p", [Token#token.remotes]),
+ {Remotes, NewBC} =
+ case Token#token.balance_count of
+ 0 ->
+ lager:debug("~p: balancing: next remote", [self()]),
+ R0 = rotate_remotes(Token#token.remotes),
+ R = hd(R0),
+ {R0, R#remote.balance - 1};
+ N when N > 0 ->
+ lager:debug("~p: balancing: ~B more invocations", [self(), N]),
+ {Token#token.remotes, N - 1};
+ -1 ->
+ {Token#token.remotes, -1}
+ end,
#remote{tokname = TokName, servid = ServId, modpath = ModPath, pid = Pid} = Remote = hd(Remotes),
case Pid of
undefined ->
{ok, NewPid} = p11p_remote:start_link(ServId, TokName, ModPath),
NewRemote = Remote#remote{pid = NewPid},
- NewToken = Token#token{remotes = [NewRemote | tl(Remotes)]},
+ NewToken = Token#token{remotes = [NewRemote | tl(Remotes)],
+ balance_count = NewBC},
NewState = State#state{tokens = Tokens#{TokName := NewToken}},
{reply, NewPid, NewState};
_ ->
@@ -135,23 +151,48 @@ init_tokens([], Acc)->
lager:debug("~p: created tokens from config: ~p", [self(), Acc]),
Acc;
init_tokens([H|T], Acc)->
- TokName = p11p_config:nameof(H),
- init_tokens(T, Acc#{TokName => new_token(TokName, H)}).
-
-new_token(TokName, ConfToken) ->
- #token{remotes = remotes(TokName, p11p_config:modules_for_token(p11p_config:nameof(ConfToken)))}.
-
-remotes(TokName, ConfModules) ->
- remotes(TokName, ConfModules, []).
-remotes(_, [], Acc) ->
+ init_tokens(T, Acc#{p11p_config:nameof(H) => new_token(H)}).
+
+new_token(Conf) ->
+ Name = p11p_config:nameof(Conf),
+ Mode = p11p_config:token_mode(Name),
+ Remotes = remotes(Name,
+ p11p_config:modules_for_token(Name),
+ Mode),
+ R0 = hd(Remotes),
+ #token{
+ mode = p11p_config:token_mode(Name),
+ balance_count = R0#remote.balance,
+ remotes = Remotes
+ }.
+
+remotes(TokName, ConfModules, ConfMode) ->
+ remotes(TokName, ConfModules, ConfMode, []).
+remotes(_, [], _, Acc) ->
Acc;
-remotes(TokName, [H|T], Acc) ->
+remotes(TokName, [H|T], ConfMode, Acc) ->
ModName = p11p_config:nameof(H),
ServName = "p11p_remote:" ++ TokName ++ ":" ++ ModName,
ModPath = p11p_config:module_path(H),
- remotes(TokName, T, [#remote{
- tokname = TokName,
- servid = list_to_atom(ServName),
- modpath = ModPath,
- pid = undefined
- } | Acc]).
+ remotes(TokName, T, ConfMode, [#remote{
+ tokname = TokName,
+ servid = list_to_atom(ServName),
+ modpath = ModPath,
+ balance = balance(ConfMode, length(T) + 1)
+ }
+ | Acc]).
+
+-spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer().
+balance({balance, Ratios}, N) ->
+ lists:nth(N, Ratios);
+balance(_, _) ->
+ -1.
+
+%% -spec balance_count(p11p_config:token_mode_t()) -> integer().
+%% balance_count(#token{mode = {balance, _}, balance_count = C}) ->
+%% C - 1;
+%% balance_count(_) ->
+%% -1.
+
+rotate_remotes(L) ->
+ lists:reverse([hd(L) | lists:reverse(tl(L))]).
diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl
index e32a439..55893de 100644
--- a/p11p-daemon/src/p11p_server.erl
+++ b/p11p-daemon/src/p11p_server.erl
@@ -81,8 +81,8 @@ handle_info({tcp, _Port, Data}, #state{tokname = TokName, remote = Remote} = Sta
<<Version:8, NewData/binary>> = Data,
NewRemote = p11p_remote_manager:remote_for_token(TokName),
p11p_remote:add_to_outbuf(NewRemote, <<Version>>),
- NewState = handle_client_data(State, p11p_rpc:new(), NewData),
- {noreply, NewState#state{remote = NewRemote}};
+ NewState = State#state{remote = NewRemote},
+ {noreply, handle_client_data(NewState, p11p_rpc:new(), NewData)};
handle_info({tcp, _Port, Data}, #state{msg = Msg} = State) ->
%%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]),
{noreply, handle_client_data(State, Msg, Data)};
@@ -103,10 +103,9 @@ code_change(_OldVersion, State, _Extra) ->
{ok, State}.
%% Private functions.
-handle_client_data(#state{tokname = TokName} = State, Msg, Data) ->
+handle_client_data(#state{remote = Remote} = State, Msg, Data) ->
case p11p_rpc:parse(Msg, Data) of
{done, NewMsg} ->
- Remote = p11p_remote_manager:remote_for_token(TokName),
ok = p11p_remote:request(Remote, NewMsg),
State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)};
{needmore, NewMsg} ->