diff options
author | Linus Nordberg <linus@sunet.se> | 2020-01-30 14:00:50 +0100 |
---|---|---|
committer | Linus Nordberg <linus@sunet.se> | 2020-02-10 14:28:41 +0100 |
commit | ad84cb6f4d5d6e7154afd9eb05de9bdeb67ca753 (patch) | |
tree | e40173667f8c48e9bcf28c1638b0ec874993b845 /p11p-daemon/src | |
parent | bcf1816564b17aa0fb2a581d2887486212f8171a (diff) |
WIP parts from transparent-failover + half baked toml configdevel
The transparent failover experiment, see branch transparent-failover,
resulted in a bunch of changes that we want regardless of failover
implementation. This commit incorporates these.
This commit also has a half baked implementation of TOML file based
configuration, to not expose the operator for Erlang syntax when
configuring the daemon.
TODO: sort this out!
Diffstat (limited to 'p11p-daemon/src')
-rw-r--r-- | p11p-daemon/src/p11p_client.erl | 76 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_config.erl | 188 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_manager.erl | 193 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_rpc.erl | 68 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_rpc.hrl | 160 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_server.erl | 81 |
6 files changed, 547 insertions, 219 deletions
diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index 1222505..7dc3457 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -6,14 +6,14 @@ %% Receive p11 requests from p11p_server, forward them to the proxy app, %% wait for a reply. If a reply is received within a timeout period, -%% forward the reply to the requesting p11p_server. If the request +%% proxy the reply to the requesting p11p_server. If the request %% times out, inform the manager (our parent). -module(p11p_client). -behaviour(gen_server). %% API. --export([start_link/4]). +-export([start_link/6]). -export([request/2, stop/2]). -include("p11p_rpc.hrl"). @@ -24,38 +24,40 @@ %% Records and types. -record(state, { + token :: string(), % Token name. + timeout :: non_neg_integer(), + port :: port(), replyto :: pid() | undefined, timer :: reference() | undefined, - token :: string(), % Token name. msg :: p11rpc:msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() }). %% API. --spec start_link(atom(), string(), string(), list()) -> +-spec start_link(atom(), string(), pid(), string(), list(), non_neg_integer()) -> {ok, pid()} | {error, term()}. -start_link(ServName, TokName, ModPath, ModEnv) -> - lager:info("~p: p11p_client starting for ~s", [ServName, ModPath]), +start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> + lager:info("~p: starting p11p_client for ~s", [self(), TokName]), gen_server:start_link({local, ServName}, ?MODULE, - [TokName, ModPath, ModEnv], []). + [TokName, Server, ModPath, ModEnv, Timeout], []). -spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. request(Client, Request) -> gen_server:call(Client, {request, Request}). %% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether -%% we (Pid) are alive or not. An example of when that can happen is when the -%% manager receives a server_event about a lost p11 app. If the server -%% process terminated on request from us because we timed out on -%% an rpc call, chances are that we have already terminated by -%% the time the manager is to act on the lost app. +%% we (Pid) are alive or not. An example of when that can happen is +%% when the manager receives a server_event about a lost p11 app. If +%% the server process terminated on request from us because we timed +%% out on an rpc call, chances are that we have already terminated by +%% the time the manager acts on the information about the lost app. stop(Pid, Reason) -> gen_server:cast(Pid, {stop, Reason}). %% Genserver callbacks. -init([TokName, ModPath, ModEnv]) -> +init([TokName, Server, ModPath, ModEnv, Timeout]) -> ProxyAppBinPath = p11p_config:proxyapp_bin_path(), Port = open_port({spawn_executable, ProxyAppBinPath}, [stream, @@ -63,9 +65,10 @@ init([TokName, ModPath, ModEnv]) -> {env, ModEnv}, {args, [ModPath, "-v"]} % FIXME: Remove -v ]), + true = is_port(Port), lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]), lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), - {ok, #state{port = Port, token = TokName}}. + {ok, #state{port = Port, token = TokName, replyto = Server, timeout = Timeout}}. handle_call({request, Request}, {FromPid, _Tag}, #state{port = Port, send_count = Sent} = S) -> @@ -75,8 +78,9 @@ handle_call({request, Request}, {FromPid, _Tag}, 0 -> <<?RPC_VERSION:8, D/binary>>; _ -> D end, - ok = do_send(Port, Buf), - {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), + {ok, _} = do_send(Port, Buf), + {reply, {ok, sizeBuf}, S#state{replyto = FromPid, + timer = start_timer(S#state.timeout, Port), send_count = Sent + 1}}; handle_call(Call, _From, State) -> @@ -95,7 +99,7 @@ handle_info({Port, {data, Data}}, State) when Port == State#state.port, State#state.msg == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> - {noreply, handle_proxy_app_data(State, p11p_rpc:new(), tl(Data))}; + {noreply, handle_token_data(State, p11p_rpc:new(), tl(Data))}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), @@ -105,13 +109,13 @@ handle_info({Port, {data, Data}}, State) %% Receiving more data from proxy app. handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> - {noreply, handle_proxy_app_data(State, Msg, Data)}; + {noreply, handle_token_data(State, Msg, Data)}; %% Proxy app timed out. -handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) +handle_info({timeout, Timer, Port}, S = #state{token = Tok}) when Port == S#state.port, Timer == S#state.timer -> - lager:info("~p: rpc request timed out, exiting", [self()]), - p11p_manager:server_event(timeout, [Tok, Server]), + lager:info("~p: rpc request for ~s timed out, exiting", [self(), Tok]), + p11p_manager:client_event(timeout, Tok), State = S#state{timer = undefined}, {stop, normal, State}; @@ -129,34 +133,34 @@ code_change(_OldVersion, State, _Extra) -> %% Private do_send(Port, Buf) -> - %%lager:debug("~p: sending ~B octets to proxy app", [self(), size(Buf)]), - - %% case rand:uniform(15) of - %% 1 -> - %% lager:debug("~p: faking unresponsive proxy app (~p) by not sending it any.", [self(), Port]); - %% _ -> - %% port_command(Port, Buf) - %% end, - - true = port_command(Port, Buf), - ok. - -handle_proxy_app_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + Rand = rand:uniform(100), %% + 10, + if + Rand =< 10 -> + lager:debug("~p: faking unresponsive token (~p) by not sending", + [self(), Port]); + true -> + lager:debug("~p: sending ~B octets to token", [self(), size(Buf)]), + true = port_command(Port, Buf) + end, + {ok, size(Buf)}. + +handle_token_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> S#state{msg = Msg}; {done, Msg} -> cancel_timer(Timer), + lager:debug("~p: <- ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_server:reply(Pid, Msg), %% Saving potential data not consumed by parse/2 in new message. S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. -start_timer(Port) -> +start_timer(Timeout, Port) -> %%lager:debug("~p: starting timer", [self()]), - erlang:start_timer(3000, self(), Port). + erlang:start_timer(Timeout, self(), Port). cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index 330c490..d24aad6 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -4,50 +4,85 @@ -module(p11p_config). -behaviour(gen_server). -%% API +%%% API %%% +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -export([start_link/0]). -%%-export([config/0]). -export([nameof/1]). -export([tokens/0]). -export([proxyapp_bin_path/0, modules_for_token/1, module_path/1, module_env/1, - token_mode/1]). --export_type([token_mode_t/0]). - -%% Genserver callbacks. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + token_balance/1, token_retries/1, token_timeout/1]). -%% Records and types. +%%% Records and types %%% -record(p11module, { name :: string(), path :: string(), - env :: [{string(), string()}] %FIXME: maches [] too? + env :: [{string(), string()}] }). -type p11module() :: #p11module{}. --type token_mode_t() :: {failover, [timeout]} | {balance, [non_neg_integer()]}. - -record(token, { name :: string(), - mode :: token_mode_t(), + timeout :: non_neg_integer(), + failover :: non_neg_integer(), % How many failover attempts. + balance :: [non_neg_integer()], modules = #{} :: #{string() => p11module()} }). -type token() :: #token{}. -%% Genserver state. -record(state, { proxyapp_bin_path :: string(), tokens :: #{string() => token()} }). -%%%%%%%%%%%%%%%%%%%% -%% API. +%%% Genserver callbacks %%% +init(_Args) -> + case application:get_env(p11p, config_file) of + {ok, ConfigFile} -> + {ok, init_state(ConfigFile)}; + _ -> + {ok, init_state()} + end. + +handle_call(proxyapp_bin_path, _From, S = #state{proxyapp_bin_path = Path}) -> + {reply, Path, S}; +handle_call(tokens, _From, State = #state{tokens = Tokens}) -> + {reply, maps:values(Tokens), State}; +handle_call({modules_for_token, TokName}, _, S = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, maps:values(Token#token.modules), S}; +handle_call({token_balance, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.balance, State}; +handle_call({token_retries, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.failover, State}; +handle_call({token_timeout, TokName}, _, State = #state{tokens = Tokens}) -> + #{TokName := Token} = Tokens, + {reply, Token#token.timeout, State}; +handle_call(Request, _From, State) -> + lager:warning("Unhandled call: ~p", [Request]), + {reply, unhandled, State}. + +handle_cast(Message, State) -> + lager:warning("Unhandled cast: ~p", [Message]), + {noreply, State}. + +handle_info(Info, State) -> + lager:warning("Unhandled info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + + +%%% External functions %%% start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% config() -> -%% gen_server:call(?MODULE, config). - proxyapp_bin_path() -> gen_server:call(?MODULE, proxyapp_bin_path). @@ -55,9 +90,17 @@ proxyapp_bin_path() -> tokens() -> gen_server:call(?MODULE, tokens). --spec token_mode(string()) -> token_mode_t(). -token_mode(TokName) -> - gen_server:call(?MODULE, {token_mode, TokName}). +-spec token_balance(string()) -> [integer()]. +token_balance(TokName) -> + gen_server:call(?MODULE, {token_balance, TokName}). + +-spec token_retries(string()) -> non_neg_integer(). +token_retries(TokName) -> + gen_server:call(?MODULE, {token_retries, TokName}). + +-spec token_timeout(string()) -> non_neg_integer(). +token_timeout(TokName) -> + gen_server:call(?MODULE, {token_timeout, TokName}). -spec modules_for_token(string()) -> [p11module()]. modules_for_token(TokName) -> @@ -78,52 +121,28 @@ nameof(#p11module{name = Name}) -> nameof(List) -> [nameof(E) || E <- List]. -%%%%%%%%%%%%%%%%%%%% -%% Genserver callbacks. -init(_Args) -> - State = init_state(), - {ok, State}. - -%% handle_call(config, _From, State) -> -%% {reply, State, State}; -handle_call(proxyapp_bin_path, _From, #state{proxyapp_bin_path = Path} = State) -> - {reply, Path, State}; -handle_call(tokens, _From, #state{tokens = Tokens} = State) -> - {reply, maps:values(Tokens), State}; -handle_call({modules_for_token, TokName}, _, #state{tokens = Tokens} = S) -> - #{TokName := Token} = Tokens, - {reply, maps:values(Token#token.modules), S}; -handle_call({token_mode, TokName}, _, #state{tokens = Tokens} = State) -> - #{TokName := Token} = Tokens, - {reply, Token#token.mode, State}; -handle_call(Request, _From, State) -> - lager:warning("Unhandled call: ~p", [Request]), - {reply, unhandled, State}. - -handle_cast(Message, State) -> - lager:warning("Unhandled cast: ~p", [Message]), - {noreply, State}. - -handle_info(Info, State) -> - lager:warning("Unhandled info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVersion, State, _Extra) -> - {ok, State}. - -%%%%%%%%%%%%%%%%%%%% -%% Private. +%%% Private functions %%% +-define(PROXYAPP_DEFAULT, "/usr/local/libexec/p11-kit/p11-kit-remote"). init_state() -> - #state { - proxyapp_bin_path = - application:get_env(p11p, proxyapp_bin_path, - "/usr/local/libexec/p11-kit/p11-kit-remote"), - tokens = conf_tokens(application:get_env(p11p, groups, [])) - }. + #state{ + proxyapp_bin_path = application:get_env(p11p, + proxyapp_bin_path, + ?PROXYAPP_DEFAULT), + tokens = conf_tokens(application:get_env(p11p, + vtokens, + []))}. +init_state(Filename) -> + {ok, Config} = p11p_config_file:load_config(Filename), + #state{ + proxyapp_bin_path = p11p_config_file:get(Config, + string, + "proxyapp_bin_path", + ?PROXYAPP_DEFAULT), + tokens = conf_tokens(p11p_config_file:get(Config, + section, + "vtokens", + []))}. conf_tokens(L) -> conf_tokens(L, #{}). @@ -135,14 +154,20 @@ conf_tokens([H = {Name, _}|T], Acc) -> -spec new_token({string(), [tuple()]}) -> token(). new_token({Name, Settings}) -> Modules = conf_modules(proplists:get_value(modules, Settings)), - Mode = mode(proplists:get_value(mode, Settings, {failover, [timeout]}), %FIXME: s/[timeout]/[10]/g or some other sane default? - maps:size(Modules)), #token{ name = Name, - mode = Mode, + timeout = proplists:get_value(timeout, Settings, 25000), + failover = proplists:get_value(failover, Settings, maps:size(Modules) - 1), + balance = balance(proplists:get_value(balance, Settings, []), + maps:size(Modules)), modules = Modules }. +balance([], _) -> + []; +balance(List, NModules) -> + List ++ [1 || _ <- lists:seq(1, NModules - length(List))]. + conf_modules(L) -> conf_modules(L, #{}). conf_modules([], Acc) -> @@ -159,30 +184,23 @@ new_module(Name, Path, Env) -> env = Env }. --spec mode(p11p_config:token_mode_t(), non_neg_integer()) -> - p11p_config:token_mode_t(). -mode({balance, Args}, NModules) -> - {balance, Args ++ [1 || _ <- lists:seq(1, NModules - length(Args))]}; -mode(Conf, _) -> - Conf. - -%%%%%%%%%%%%%% -%% Unit tests. +%%% Unit tests %%% -include_lib("eunit/include/eunit.hrl"). - tokens_init_test_() -> {setup, fun() -> conf_tokens( [ {"vtoken0", - [{mode, {balance, [3]}}, + [{balance, [3]}, {modules, [{"bogusmod0_0", "/path/to/bogusmod0_0"}, {"bogusmod0_1", "/path/to/bogusmod0_1"} ]}]}, {"vtoken1", - [{modules, [{"bogusmod1_0", "/path/to/bogusmod1_0"}, + [{timeout, 12000}, + {failover, 3}, + {modules, [{"bogusmod1_0", "/path/to/bogusmod1_0"}, {"bogusmod1_1", "/path/to/bogusmod1_1", [{"MYENV", "myenv"}]} ]}]} ]) end, @@ -191,14 +209,18 @@ tokens_init_test_() -> [?_assertEqual( #{"vtoken0" => {token,"vtoken0", - {balance,[3,1]}, + 25000, + 1, + [3,1], #{"bogusmod0_0" => {p11module,"bogusmod0_0", "/path/to/bogusmod0_0", []}, "bogusmod0_1" => {p11module,"bogusmod0_1", "/path/to/bogusmod0_1", []}}}, "vtoken1" => {token,"vtoken1", - {failover,[timeout]}, + 12000, + 3, + [], #{"bogusmod1_0" => {p11module,"bogusmod1_0", "/path/to/bogusmod1_0", []}, "bogusmod1_1" => diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl index 7c3bdb9..2dbdf6c 100644 --- a/p11p-daemon/src/p11p_manager.erl +++ b/p11p-daemon/src/p11p_manager.erl @@ -25,8 +25,8 @@ %% API. -export([start_link/0]). --export([client_for_token/1, client_event/2]). % For servers. --export([server_event/2]). % For clients. +-export([client_for_token/1, server_event/2]). % For servers. +-export([client_event/2]). % For clients. %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -43,9 +43,16 @@ }). -record(vtoken, { - mode :: p11p_config:token_mode_t(), - balance_count :: integer(), - clients :: [#client{}] % Active client in hd(). + clients :: [#client{}], % Current client in hd(). + + %% Invokations left for current client or -1 for no + %% balancing. + balance_count = -1 :: integer(), + + timeout :: non_neg_integer(), + retries :: non_neg_integer(), + + server :: pid() | undefined % Active server, if any. }). -record(state, { @@ -59,23 +66,25 @@ start_link() -> -spec client_for_token(string()) -> pid(). client_for_token(TokName) -> - gen_server:call(?MODULE, {client_for_token, TokName}). -client_event(Event, Args) -> - gen_server:cast(?MODULE, {client_event, Event, Args}). + gen_server:call(?MODULE, {client_for_token, self(), TokName}). server_event(Event, Args) -> gen_server:cast(?MODULE, {server_event, Event, Args}). +client_event(Event, Args) -> + gen_server:cast(?MODULE, {client_event, Event, Args}). + %% Genserver callbacks. init([]) -> {ok, #state{vtokens = init_vtokens(p11p_config:tokens())}}. -handle_call({client_for_token, TokNameIn}, _, #state{vtokens = Tokens} = S) -> - #{TokNameIn := TokenIn} = Tokens, - ClientsIn = TokenIn#vtoken.clients, +handle_call({client_for_token, Server, TokNameIn}, _From, + S = #state{vtokens = VTokensIn}) -> + #{TokNameIn := VTokenIn} = VTokensIn, + ClientsIn = VTokenIn#vtoken.clients, lager:debug("all clients: ~p", [ClientsIn]), {Clients, BalanceCount} = - case TokenIn#vtoken.balance_count of + case VTokenIn#vtoken.balance_count of 0 -> lager:debug("~p: balancing: next client", [self()]), Rotated = rotate_clients(ClientsIn), @@ -87,52 +96,51 @@ handle_call({client_for_token, TokNameIn}, _, #state{vtokens = Tokens} = S) -> -1 -> {ClientsIn, -1} end, - #client{tokname = TokNameIn, - servid = ServId, - modpath = ModPath, - modenv = ModEnv, - pid = PidIn} = SelectedClient = hd(Clients), - case PidIn of + Current = hd(Clients), + case Current#client.pid of undefined -> - {ok, Pid} = - p11p_client:start_link(ServId, TokNameIn, ModPath, ModEnv), - Client = SelectedClient#client{pid = Pid}, - Token = TokenIn#vtoken{clients = [Client | tl(Clients)], - balance_count = BalanceCount}, - {reply, Pid, S#state{vtokens = Tokens#{TokNameIn := Token}}}; - _ -> - {reply, PidIn, S} + Client = start_client(hd(Clients), Server, VTokenIn#vtoken.timeout), + VToken = VTokenIn#vtoken{clients = [Client | tl(Clients)], + server = Server, + balance_count = BalanceCount}, + {reply, Client#client.pid, S#state{vtokens = VTokensIn#{TokNameIn := VToken}}}; + Pid -> + {reply, Pid, S} end; handle_call(Call, _From, State) -> lager:debug("Unhandled call: ~p~n", [Call]), {reply, unhandled, State}. -handle_cast({server_event, timeout, [TokNameIn, Server]}, - #state{vtokens = Tokens} = S) -> - lager:debug("~p: ~s: timed out, stopping ~p", [self(), TokNameIn, Server]), - gen_server:stop(Server), % Hang up on p11 client. - %% TODO: do some code dedup with client_for_token? +%% Server done with client. +handle_cast({server_event, server_gone, TokNameIn}, S = #state{vtokens = Tokens}) -> #{TokNameIn := TokenIn} = Tokens, - Clients = TokenIn#vtoken.clients, - SelectedClient = hd(Clients), - Client = SelectedClient#client{pid = undefined}, - Token = TokenIn#vtoken{clients = tl(Clients) ++ [Client]}, - lager:debug("~p: ~s: updated token: ~p", [self(), TokNameIn, Token]), - {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; - -handle_cast({client_event, client_gone, [TokName, Pid]}, - #state{vtokens = Tokens} = S) -> - lager:debug("~p: asking client ~p to stop", [self(), Pid]), - p11p_client:stop(Pid, normal), - #{TokName := TokenIn} = Tokens, - Clients = lists:map(fun(E) -> + CurClient = hd(TokenIn#vtoken.clients), + ClientPid = CurClient#client.pid, + ok = p11p_client:stop(ClientPid, normal), + Clients = lists:map(fun(E) -> % Find and update. case E#client.pid of - Pid -> E#client{pid = undefined}; + ClientPid -> E#client{pid = undefined}; _ -> E - end - end, TokenIn#vtoken.clients), - Token = TokenIn#vtoken{clients = Clients}, - {noreply, S#state{vtokens = Tokens#{TokName := Token}}}; + end end, + TokenIn#vtoken.clients), + Token = TokenIn#vtoken{clients = Clients, server = undefined}, + {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; + +%% Client reporting that a token has timed out -- mark current client +%% not running, inform server, rotate client list and start new +%% client. +handle_cast({client_event, timeout, TokName}, State) -> + #{TokName := VToken} = State#state.vtokens, + client_timeout(TokName, VToken, State); + +handle_cast({start_client, VTokName}, State = #state{vtokens = VTokens}) -> + #{VTokName := VToken} = VTokens, + Client = start_client(hd(VToken#vtoken.clients), + VToken#vtoken.server, + VToken#vtoken.timeout), + NewVToken = VToken#vtoken{clients = [Client | tl(VToken#vtoken.clients)]}, + lager:debug("~p: vtoken updated: ~p", [self(), NewVToken]), + {noreply, State#state{vtokens = VTokens#{VTokName := NewVToken}}}; handle_cast(Cast, State) -> lager:debug("Unhandled cast: ~p~n", [Cast]), @@ -164,40 +172,40 @@ init_vtokens([H|T], Acc)-> new_vtoken(Conf) -> Name = p11p_config:nameof(Conf), - Mode = p11p_config:token_mode(Name), + Balances = p11p_config:token_balance(Name), Clients = clients(Name, p11p_config:modules_for_token(Name), - Mode), - R0 = hd(Clients), + Balances), + CurrentClient = hd(Clients), #vtoken{ - mode = p11p_config:token_mode(Name), - balance_count = R0#client.balance, - clients = Clients + clients = Clients, + balance_count = CurrentClient#client.balance, + timeout = p11p_config:token_timeout(Name), + retries = p11p_config:token_retries(Name) }. -clients(TokName, ConfModules, ConfMode) -> - clients(TokName, ConfModules, ConfMode, []). +clients(TokName, Modules, []) -> + clients(TokName, Modules, [-1 || _ <- lists:seq(1, length(Modules))]); +clients(TokName, ConfModules, ConfBalance) -> + clients(TokName, ConfModules, ConfBalance, []). + clients(_, [], _, Acc) -> Acc; -clients(TokName, [H|T], ConfMode, Acc) -> - ModName = p11p_config:nameof(H), +clients(TokName, [Module|Modules], [Balance|Balances], Acc) -> + ModName = p11p_config:nameof(Module), ServName = "p11p_client:" ++ TokName ++ ":" ++ ModName, - ModPath = p11p_config:module_path(H), - ModEnv = p11p_config:module_env(H), - clients(TokName, T, ConfMode, [#client{ - tokname = TokName, - servid = list_to_atom(ServName), - modpath = ModPath, - modenv = ModEnv, - balance = balance(ConfMode, length(T) + 1) - } - | Acc]). - --spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). -balance({balance, Ratios}, N) -> - lists:nth(N, Ratios); -balance(_, _) -> - -1. + clients(TokName, Modules, Balances, + [#client{tokname = TokName, + servid = list_to_atom(ServName), + modpath = p11p_config:module_path(Module), + modenv = p11p_config:module_env(Module), + balance = Balance} | Acc]). + +%% -spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). +%% balance({balance, Ratios}, N) -> +%% lists:nth(N, Ratios); +%% balance(_, _) -> +%% -1. %% -spec balance_count(p11p_config:token_mode_t()) -> integer(). %% balance_count(#vtoken{mode = {balance, _}, balance_count = C}) -> @@ -207,3 +215,38 @@ balance(_, _) -> rotate_clients(L) -> lists:reverse([hd(L) | lists:reverse(tl(L))]). + +rotate_clients(L, UpdatedCurr) -> + lists:reverse([UpdatedCurr | lists:reverse(tl(L))]). + +next_client(VToken = #vtoken{clients = Clients}) -> + OldC = hd(Clients), + NewClients = rotate_clients(Clients, OldC#client{pid = undefined}), + gen_server:cast(self(), {start_client, OldC#client.tokname}), + VToken#vtoken{clients = NewClients}. + +client_timeout(TokName, + VToken = #vtoken{retries = Retries}, + State = #state{vtokens = VTokens}) + when Retries > 0 -> + lager:debug("~p: ~s: token timed out, switching token", [self(), TokName]), + p11p_server:token_gone(VToken#vtoken.server, false), + NewToken = next_client(VToken), + NewVTokens = VTokens#{TokName := NewToken#vtoken{retries = Retries - 1}}, + {noreply, State#state{vtokens = NewVTokens}}; + +client_timeout(TokName, + VToken, + State) -> + lager:debug("~p: ~s: token timed out, disconnecting app", [self(), TokName]), + p11p_server:token_gone(VToken#vtoken.server, true), + {stop, State}. + +start_client(Client, Server, Timeout) -> + {ok, Pid} = p11p_client:start_link(Client#client.servid, + Client#client.tokname, + Server, + Client#client.modpath, + Client#client.modenv, + Timeout), + Client#client{pid = Pid}. diff --git a/p11p-daemon/src/p11p_rpc.erl b/p11p-daemon/src/p11p_rpc.erl index a775d30..03a476c 100644 --- a/p11p-daemon/src/p11p_rpc.erl +++ b/p11p-daemon/src/p11p_rpc.erl @@ -5,10 +5,37 @@ -module(p11p_rpc). --export([parse/2, new/0, new/1, serialise/1]). +-export([ + dump/1, + error/2, + new/0, new/1, + parse/2, + serialise/1 + ]). -include("p11p_rpc.hrl"). +dump(Msg = #p11rpc_msg{data = Data}) -> + {ReqId, Data2} = parse_req_id(Data), + {ArgsDesc, Data3} = parse_args_desc(Data2), + {Name, _ReqArgs, _RespArgs} = lists:nth(ReqId + 1, ?REQIDS), + io_lib:format("RPC [~B]: ~s (~B), args \"~s\":~n~p", + [Msg#p11rpc_msg.call_code, + Name, + ReqId, + ArgsDesc, + Data3 + ]). + +error(CallCode, ErrorCode) -> + DataBuf = serialise_error(ErrorCode), + #p11rpc_msg{ + state = done, + call_code = CallCode, + opt_len = 0, + data_len = size(DataBuf), + data = DataBuf}. + parse(M) -> parse(M, <<>>). @@ -100,6 +127,44 @@ move_between_binaries(DstIn, SrcIn, NBytes) -> Src = binary:part(SrcIn, N, size(SrcIn) - N), {Dst, Src}. +serialise_byte_array(Bin) -> + Len = size(Bin), + <<Len:32, Bin/binary>>. + +serialise_error(ErrCode) -> + ReqId = ?P11_RPC_CALL_ERROR, + ArgsDescString = "u", % TODO: look this up and generalise. + + ReqIdBin = serialise_uint32(ReqId), + ArgsDescBin = serialise_byte_array(list_to_binary(ArgsDescString)), + ArgBin = serialise_uint64(ErrCode), + + <<ReqIdBin/binary, ArgsDescBin/binary, ArgBin/binary>>. + +serialise_uint32(U32) -> + <<U32:32>>. + +serialise_uint64(U64) -> + <<U64:64>>. + +-spec parse_req_id(binary()) -> {integer(), binary()}. +parse_req_id(Data) -> + {binary:decode_unsigned(binary:part(Data, 0, 4)), + binary:part(Data, 4, size(Data) - 4)}. + +parse_args_desc(Data) -> + parse_byte_array(Data). + +-spec parse_byte_array(binary()) -> {binary(), binary()}. +parse_byte_array(Data) -> + case binary:decode_unsigned(binary:part(Data, 0, 4)) of + 16#ffffffff -> + {<<>>, binary:part(Data, 4, size(Data) - 4)}; + Len -> % TODO: refuse Len >= 0x7fffffff. + {binary:part(Data, 4, Len), binary:part(Data, 4 + Len, + size(Data) - 4 - Len)} + end. + %%%%%%%%%%%%%% %% Unit tests. @@ -173,3 +238,4 @@ parse3_test_() -> {p11rpc_msg, 47, 2, 3, <<"o1">>, <<"d12">>, <<"rest">>, done}, Msg)] end}. + diff --git a/p11p-daemon/src/p11p_rpc.hrl b/p11p-daemon/src/p11p_rpc.hrl index c511e20..0014f57 100644 --- a/p11p-daemon/src/p11p_rpc.hrl +++ b/p11p-daemon/src/p11p_rpc.hrl @@ -16,3 +16,163 @@ state = header :: header | opts | data | done }). -type p11rpc_msg() :: #p11rpc_msg{}. + +%% From p11-kit/rpc-message.h. +-define(P11_RPC_CALL_ERROR, 0). +-define(P11_RPC_CALL_C_Initialize, 1). +-define(P11_RPC_CALL_C_Finalize, 2). +-define(P11_RPC_CALL_C_GetInfo, 3). +-define(P11_RPC_CALL_C_GetSlotList, 4). +-define(P11_RPC_CALL_C_GetSlotInfo, 5). +-define(P11_RPC_CALL_C_GetTokenInfo, 6). +-define(P11_RPC_CALL_C_GetMechanismList, 7). +-define(P11_RPC_CALL_C_GetMechanismInfo, 8). +-define(P11_RPC_CALL_C_InitToken, 9). +-define(P11_RPC_CALL_C_OpenSession, 10). +-define(P11_RPC_CALL_C_CloseSession, 11). +-define(P11_RPC_CALL_C_CloseAllSessions, 12). +-define(P11_RPC_CALL_C_GetSessionInfo, 13). +-define(P11_RPC_CALL_C_InitPIN, 14). +-define(P11_RPC_CALL_C_SetPIN, 15). +-define(P11_RPC_CALL_C_GetOperationState, 16). +-define(P11_RPC_CALL_C_SetOperationState, 17). +-define(P11_RPC_CALL_C_Login, 18). +-define(P11_RPC_CALL_C_Logout, 19). +-define(P11_RPC_CALL_C_CreateObject, 20). +-define(P11_RPC_CALL_C_CopyObject, 21). +-define(P11_RPC_CALL_C_DestroyObject, 22). +-define(P11_RPC_CALL_C_GetObjectSize, 23). +-define(P11_RPC_CALL_C_GetAttributeValue, 24). +-define(P11_RPC_CALL_C_SetAttributeValue, 25). +-define(P11_RPC_CALL_C_FindObjectsInit, 26). +-define(P11_RPC_CALL_C_FindObjects, 27). +-define(P11_RPC_CALL_C_FindObjectsFinal, 28). +-define(P11_RPC_CALL_C_EncryptInit, 29). +-define(P11_RPC_CALL_C_Encrypt, 30). +-define(P11_RPC_CALL_C_EncryptUpdate, 31). +-define(P11_RPC_CALL_C_EncryptFinal, 32). +-define(P11_RPC_CALL_C_DecryptInit, 33). +-define(P11_RPC_CALL_C_Decrypt, 34). +-define(P11_RPC_CALL_C_DecryptUpdate, 35). +-define(P11_RPC_CALL_C_DecryptFinal, 36). +-define(P11_RPC_CALL_C_DigestInit, 37). +-define(P11_RPC_CALL_C_Digest, 38). +-define(P11_RPC_CALL_C_DigestUpdate, 39). +-define(P11_RPC_CALL_C_DigestKey, 40). +-define(P11_RPC_CALL_C_DigestFinal, 41). +-define(P11_RPC_CALL_C_SignInit, 42). +-define(P11_RPC_CALL_C_Sign, 43). +-define(P11_RPC_CALL_C_SignUpdate, 44). +-define(P11_RPC_CALL_C_SignFinal, 45). +-define(P11_RPC_CALL_C_SignRecoverInit, 46). +-define(P11_RPC_CALL_C_SignRecover, 47). +-define(P11_RPC_CALL_C_VerifyInit, 48). +-define(P11_RPC_CALL_C_Verify, 49). +-define(P11_RPC_CALL_C_VerifyUpdate, 50). +-define(P11_RPC_CALL_C_VerifyFinal, 51). +-define(P11_RPC_CALL_C_VerifyRecoverInit, 52). +-define(P11_RPC_CALL_C_VerifyRecover, 53). +-define(P11_RPC_CALL_C_DigestEncryptUpdate, 54). +-define(P11_RPC_CALL_C_DecryptDigestUpdate, 55). +-define(P11_RPC_CALL_C_SignEncryptUpdate, 60). +-define(P11_RPC_CALL_C_DecryptVerifyUpdate, 61). +-define(P11_RPC_CALL_C_GenerateKey, 62). +-define(P11_RPC_CALL_C_GenerateKeyPair, 63). +-define(P11_RPC_CALL_C_WrapKey, 64). +-define(P11_RPC_CALL_C_UnwrapKey, 65). +-define(P11_RPC_CALL_C_DeriveKey, 66). +-define(P11_RPC_CALL_C_SeedRandom, 67). +-define(P11_RPC_CALL_C_GenerateRandom, 68). +-define(P11_RPC_CALL_C_WaitForSlotEvent, 69). +-define(P11_RPC_CALL_MAX, 70). + +%% Return values, some of them. From pcks11.h. +-define(CKR_OK, 0). +-define(CKR_GENERAL_ERROR, 5). +-define(CKR_FUNCTION_FAILED, 6). +-define(CKR_DEVICE_ERROR, 16#30). % 48 +-define(CKR_SESSION_CLOSED, 16#B0). % 176 + +%% Argument descriptions. From p11-kit/rpc-message.h p11_rpc_calls[]. +%% * a_ = prefix denotes array of _ +%% * A = CK_ATTRIBUTE +%% * f_ = prefix denotes buffer for _ +%% * M = CK_MECHANISM +%% * u = CK_ULONG +%% * s = space padded string +%% * v = CK_VERSION +%% * y = CK_BYTE +%% * z = null terminated string +%% Needed for generating our own messages, like ERROR. +%% They're being sent in the messages, after the request id. +%% TOOD: Complete argument descrptions, at least for messages +%% we generate. +-define(REQIDS, + [ % {name, request argdesc, response argdesc} + {"ERROR", "", "u"}, + {"C_Initialize", "ayyay", ""}, + {"C_Finalize", "", ""}, + {"C_GetInfo", "", "vsusv"}, + {"C_GetSlotList", "TODO", "TODO"}, + {"C_GetSlotInfo", "TODO", "TODO"}, + {"C_GetTokenInfo", "TODO", "TODO"}, + {"C_GetMechanismList", "TODO", "TODO"}, + {"C_GetMechanismInfo", "TODO", "TODO"}, + {"C_InitToken", "TODO", "TODO"}, + {"C_OpenSession", "uu", "u"}, + {"C_CloseSession", "u", ""}, + {"C_CloseAllSessions", "TODO", "TODO"}, + {"C_GetSessionInfo", "TODO", "TODO"}, + {"C_InitPIN", "TODO", "TODO"}, + {"C_SetPIN", "TODO", "TODO"}, + {"C_GetOperationState", "TODO", "TODO"}, + {"C_SetOperationState", "TODO", "TODO"}, + {"C_Login", "uuay", ""}, + {"C_Logout", "u", ""}, + {"C_CreateObject", "TODO", "TODO"}, + {"C_CopyObject", "TODO", "TODO"}, + {"C_DestroyObject", "TODO", "TODO"}, + {"C_GetObjectSize", "TODO", "TODO"}, + {"C_GetAttributeValue", "TODO", "TODO"}, + {"C_SetAttributeValue", "TODO", "TODO"}, + {"C_FindObjectsInit", "TODO", "TODO"}, + {"C_FindObjects", "TODO", "TODO"}, + {"C_FindObjectsFinal", "TODO", "TODO"}, + {"C_EncryptInit", "TODO", "TODO"}, + {"C_Encrypt", "TODO", "TODO"}, + {"C_EncryptUpdate", "TODO", "TODO"}, + {"C_EncryptFinal", "TODO", "TODO"}, + {"C_DecryptInit", "TODO", "TODO"}, + {"C_Decrypt", "TODO", "TODO"}, + {"C_DecryptUpdate", "TODO", "TODO"}, + {"C_DecryptFinal", "TODO", "TODO"}, + {"C_DigestInit", "TODO", "TODO"}, + {"C_Digest", "TODO", "TODO"}, + {"C_DigestUpdate", "TODO", "TODO"}, + {"C_DigestKey", "TODO", "TODO"}, + {"C_DigestFinal", "TODO", "TODO"}, + {"C_SignInit", "TODO", "TODO"}, + {"C_Sign", "TODO", "TODO"}, + {"C_SignUpdate", "TODO", "TODO"}, + {"C_SignFinal", "TODO", "TODO"}, + {"C_SignRecoverInit", "TODO", "TODO"}, + {"C_SignRecover", "TODO", "TODO"}, + {"C_VerifyInit", "TODO", "TODO"}, + {"C_Verify", "TODO", "TODO"}, + {"C_VerifyUpdate", "TODO", "TODO"}, + {"C_VerifyFinal", "TODO", "TODO"}, + {"C_VerifyRecoverInit", "TODO", "TODO"}, + {"C_VerifyRecover", "TODO", "TODO"}, + {"C_DigestEncryptUpdate", "TODO", "TODO"}, + {"C_DecryptDigestUpdate", "TODO", "TODO"}, + {"C_SignEncryptUpdate", "TODO", "TODO"}, + {"C_DecryptVerifyUpdate", "TODO", "TODO"}, + {"C_GenerateKey", "TODO", "TODO"}, + {"C_GenerateKeyPair", "TODO", "TODO"}, + {"C_WrapKey", "TODO", "TODO"}, + {"C_UnwrapKey", "TODO", "TODO"}, + {"C_DeriveKey", "TODO", "TODO"}, + {"C_SeedRandom", "TODO", "TODO"}, + {"C_GenerateRandom", "TODO", "TODO"}, + {"C_WaitForSlotEvent" "TODO", "TODO"} + ]). diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index cbc00df..ef8877d 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -11,7 +11,7 @@ %% API. -export([start_link/1]). --export([reply/2]). +-export([reply/2, token_gone/2]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -22,10 +22,10 @@ tokname :: string(), client :: pid() | undefined, socket :: gen_tcp:socket(), - msg :: p11rpc_msg() | undefined, - recv_count = 0 :: non_neg_integer(), - send_count = 0 :: non_neg_integer() - %%clientbuf = <<>> :: binary() + req_in :: p11rpc_msg() | undefined, + req_out :: p11rpc_msg() | undefined, + recv_count = 0 :: non_neg_integer(), % received from app + send_count = 0 :: non_neg_integer() % sent to token }). %% API. @@ -37,22 +37,24 @@ start_link(Args) -> reply(Pid, Response) -> gen_server:call(Pid, {respond, Response}). +-spec token_gone(pid(), boolean()) -> ok. +token_gone(Pid, Hangup) -> + case process_info(Pid) of undefined -> error(bad_server_pid); _ -> nop end, + gen_server:cast(Pid, {token_gone, Hangup}). + + %% Genserver callbacks. init([Token, Socket]) -> - lager:debug("~p: p11p_server:init", [self()]), + lager:debug("~p: p11p_server starting for ~s", [self(), Token]), process_flag(trap_exit, true), % Need terminate/2. gen_server:cast(self(), accept), % Invoke accept, returning a socket in state. {ok, #state{tokname = Token, socket = Socket}}. -handle_call({respond, R}, _, #state{socket = Sock, send_count = Sent} = S) -> - D = p11p_rpc:serialise(R), - Buf = case Sent of - 0 -> <<?RPC_VERSION:8, D/binary>>; - _ -> D - end, - %%lager:debug("~p: sending ~B octets as response", [self(), size(Buf)]), - ok = gen_tcp:send(Sock, Buf), % TODO: what about short writes? - {reply, {ok, size(Buf)}, S#state{send_count = Sent + 1}}; +%% FIXME: make this a cast +handle_call({respond, Resp}, _, State = #state{send_count = Sent}) -> + N = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), + {reply, {ok, N}, State#state{req_out = undefined, + send_count = Sent + 1}}; handle_call(Call, _, S) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), @@ -77,6 +79,21 @@ handle_cast(accept, State = #state{tokname = TokName, socket = ListenSocket}) -> {stop, normal, State} end; + +handle_cast({token_gone, Hangup}, State = #state{send_count = Sent}) -> + Resp = p11p_rpc:error(State#state.req_out#p11rpc_msg.call_code, + ?CKR_DEVICE_ERROR), + {ok, _} = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), + NewState = State#state{client = undefined, + req_out = undefined, + send_count = Sent + 1}, + case Hangup of + true -> + {close, NewState}; + false -> + {noreply, NewState} + end; + handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. @@ -89,7 +106,7 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) case RPCVersion of ?RPC_VERSION -> {noreply, - p11_client_data( + p11_app_data( S#state{client = p11p_manager:client_for_token(TokName)}, p11p_rpc:new(), Data)}; @@ -100,9 +117,9 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) end; %% Subsequent packages from P11 client. -handle_info({tcp, _Port, DataIn}, #state{msg = Msg} = S) -> +handle_info({tcp, _Port, DataIn}, #state{req_in = Msg} = S) -> %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), - {noreply, p11_client_data(S, Msg, DataIn)}; + {noreply, p11_app_data(S, Msg, DataIn)}; handle_info({tcp_closed, Port}, S) -> lager:debug("~p: socket ~p closed", [self(), Port]), @@ -112,9 +129,14 @@ handle_info(Info, S) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, S}. -terminate(Reason, #state{socket = Sock, tokname = TokName, client = Client}) -> - gen_tcp:close(Sock), - p11p_manager:client_event(client_gone, [TokName, Client]), +terminate(Reason, #state{socket = Sock, tokname = TokName}) -> + ok = gen_tcp:close(Sock), + + %% FIXME: tell manager, so that the client can be stopped. we + %% don't want to risk that another app (socket client) uses it + + p11p_manager:server_event(server_gone, TokName), + lager:debug("~p: terminated with reason ~p", [self(), Reason]), ignored. @@ -122,13 +144,24 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. -p11_client_data(#state{client = Client, recv_count = Recv} = S, MsgIn, +p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, DataIn) of {needmore, Msg} -> - S#state{msg = Msg}; + S#state{req_in = Msg}; {done, Msg} -> + lager:debug("~p: -> ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_client:request(Client, Msg), - S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + S#state{req_out = Msg, + req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. + +send_response(Sock, Inbuf, Sent) -> + Buf = case Sent of + 0 -> <<?RPC_VERSION:8, Inbuf/binary>>; + _ -> Inbuf + end, + %%lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), + ok = gen_tcp:send(Sock, Buf), + {ok, size(Inbuf)}. |