From bcf1816564b17aa0fb2a581d2887486212f8171a Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Fri, 10 Jan 2020 13:54:38 +0100 Subject: Rename remote -> client Also rename token -> vtoken where appropriate. --- p11p-daemon/src/p11p_client.erl | 163 +++++++++++++++++++++++++ p11p-daemon/src/p11p_config.erl | 14 +-- p11p-daemon/src/p11p_manager.erl | 209 +++++++++++++++++++++++++++++++ p11p-daemon/src/p11p_remote.erl | 166 ------------------------- p11p-daemon/src/p11p_remote_manager.erl | 210 -------------------------------- p11p-daemon/src/p11p_server.erl | 14 +-- p11p-daemon/src/p11p_sup.erl | 2 +- 7 files changed, 387 insertions(+), 391 deletions(-) create mode 100644 p11p-daemon/src/p11p_client.erl create mode 100644 p11p-daemon/src/p11p_manager.erl delete mode 100644 p11p-daemon/src/p11p_remote.erl delete mode 100644 p11p-daemon/src/p11p_remote_manager.erl (limited to 'p11p-daemon/src') diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl new file mode 100644 index 0000000..1222505 --- /dev/null +++ b/p11p-daemon/src/p11p_client.erl @@ -0,0 +1,163 @@ +%%% Copyright (c) 2019, Sunet. +%%% See LICENSE for licensing information. + +%% A client spawns an Erlang port running a proxy app, i.e. the +%% 'remote' program from p11-kit. + +%% Receive p11 requests from p11p_server, forward them to the proxy app, +%% wait for a reply. If a reply is received within a timeout period, +%% forward the reply to the requesting p11p_server. If the request +%% times out, inform the manager (our parent). + +-module(p11p_client). +-behaviour(gen_server). + +%% API. +-export([start_link/4]). +-export([request/2, stop/2]). + +-include("p11p_rpc.hrl"). + +%% Genserver callbacks. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +%% Records and types. +-record(state, { + port :: port(), + replyto :: pid() | undefined, + timer :: reference() | undefined, + token :: string(), % Token name. + msg :: p11rpc:msg() | undefined, + recv_count = 0 :: non_neg_integer(), + send_count = 0 :: non_neg_integer() + }). + +%% API. +-spec start_link(atom(), string(), string(), list()) -> + {ok, pid()} | {error, term()}. +start_link(ServName, TokName, ModPath, ModEnv) -> + lager:info("~p: p11p_client starting for ~s", [ServName, ModPath]), + gen_server:start_link({local, ServName}, ?MODULE, + [TokName, ModPath, ModEnv], []). + +-spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. +request(Client, Request) -> + gen_server:call(Client, {request, Request}). + +%% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether +%% we (Pid) are alive or not. An example of when that can happen is when the +%% manager receives a server_event about a lost p11 app. If the server +%% process terminated on request from us because we timed out on +%% an rpc call, chances are that we have already terminated by +%% the time the manager is to act on the lost app. +stop(Pid, Reason) -> + gen_server:cast(Pid, {stop, Reason}). + +%% Genserver callbacks. +init([TokName, ModPath, ModEnv]) -> + ProxyAppBinPath = p11p_config:proxyapp_bin_path(), + Port = open_port({spawn_executable, ProxyAppBinPath}, + [stream, + exit_status, + {env, ModEnv}, + {args, [ModPath, "-v"]} % FIXME: Remove -v + ]), + lager:debug("~p: ~s: new proxy app port: ~p", [self(), ProxyAppBinPath, Port]), + lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), + {ok, #state{port = Port, token = TokName}}. + +handle_call({request, Request}, {FromPid, _Tag}, + #state{port = Port, send_count = Sent} = S) -> + %%lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), + D = p11p_rpc:serialise(Request), + Buf = case Sent of + 0 -> <>; + _ -> D + end, + ok = do_send(Port, Buf), + {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), + send_count = Sent + 1}}; + +handle_call(Call, _From, State) -> + lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), + {reply, unhandled, State}. + +handle_cast({stop, Reason}, State) -> + {stop, Reason, State}; + +handle_cast(Cast, State) -> + lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), + {noreply, State}. + +%% Receiving the very first response from proxy app since it was started. +handle_info({Port, {data, Data}}, State) + when Port == State#state.port, State#state.msg == undefined -> + case hd(Data) of % First octet is RPC protocol version. + ?RPC_VERSION -> + {noreply, handle_proxy_app_data(State, p11p_rpc:new(), tl(Data))}; + BadVersion -> + lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, + BadVersion]), + {noreply, State} + end; + +%% Receiving more data from proxy app. +handle_info({Port, {data, Data}}, #state{msg = Msg} = State) + when Port == State#state.port -> + {noreply, handle_proxy_app_data(State, Msg, Data)}; + +%% Proxy app timed out. +handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) + when Port == S#state.port, Timer == S#state.timer -> + lager:info("~p: rpc request timed out, exiting", [self()]), + p11p_manager:server_event(timeout, [Tok, Server]), + State = S#state{timer = undefined}, + {stop, normal, State}; + +handle_info(Info, State) -> + lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), + {noreply, State}. + +terminate(Reason, #state{port = Port}) -> + lager:debug("~p: client terminating with reason ~p", [self(), Reason]), + port_close(Port), + ok. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + +%% Private +do_send(Port, Buf) -> + %%lager:debug("~p: sending ~B octets to proxy app", [self(), size(Buf)]), + + %% case rand:uniform(15) of + %% 1 -> + %% lager:debug("~p: faking unresponsive proxy app (~p) by not sending it any.", [self(), Port]); + %% _ -> + %% port_command(Port, Buf) + %% end, + + true = port_command(Port, Buf), + ok. + +handle_proxy_app_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + MsgIn, DataIn) -> + case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of + {needmore, Msg} -> + S#state{msg = Msg}; + {done, Msg} -> + cancel_timer(Timer), + {ok, _BytesSent} = p11p_server:reply(Pid, Msg), + %% Saving potential data not consumed by parse/2 in new message. + S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + recv_count = Recv + 1} + end. + +start_timer(Port) -> + %%lager:debug("~p: starting timer", [self()]), + erlang:start_timer(3000, self(), Port). + +cancel_timer(Timer) -> + %%lager:debug("~p: canceling timer", [self()]), + erlang:cancel_timer(Timer, [{async, true}, {info, false}]). diff --git a/p11p-daemon/src/p11p_config.erl b/p11p-daemon/src/p11p_config.erl index 9c7749c..330c490 100644 --- a/p11p-daemon/src/p11p_config.erl +++ b/p11p-daemon/src/p11p_config.erl @@ -9,7 +9,7 @@ %%-export([config/0]). -export([nameof/1]). -export([tokens/0]). --export([remotebin_path/0, modules_for_token/1, module_path/1, module_env/1, +-export([proxyapp_bin_path/0, modules_for_token/1, module_path/1, module_env/1, token_mode/1]). -export_type([token_mode_t/0]). @@ -36,7 +36,7 @@ %% Genserver state. -record(state, { - remotebin_path :: string(), + proxyapp_bin_path :: string(), tokens :: #{string() => token()} }). @@ -48,8 +48,8 @@ start_link() -> %% config() -> %% gen_server:call(?MODULE, config). -remotebin_path() -> - gen_server:call(?MODULE, remotebin_path). +proxyapp_bin_path() -> + gen_server:call(?MODULE, proxyapp_bin_path). -spec tokens() -> [token()]. tokens() -> @@ -86,7 +86,7 @@ init(_Args) -> %% handle_call(config, _From, State) -> %% {reply, State, State}; -handle_call(remotebin_path, _From, #state{remotebin_path = Path} = State) -> +handle_call(proxyapp_bin_path, _From, #state{proxyapp_bin_path = Path} = State) -> {reply, Path, State}; handle_call(tokens, _From, #state{tokens = Tokens} = State) -> {reply, maps:values(Tokens), State}; @@ -119,8 +119,8 @@ code_change(_OldVersion, State, _Extra) -> init_state() -> #state { - remotebin_path = - application:get_env(p11p, remotebin_path, + proxyapp_bin_path = + application:get_env(p11p, proxyapp_bin_path, "/usr/local/libexec/p11-kit/p11-kit-remote"), tokens = conf_tokens(application:get_env(p11p, groups, [])) }. diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl new file mode 100644 index 0000000..7c3bdb9 --- /dev/null +++ b/p11p-daemon/src/p11p_manager.erl @@ -0,0 +1,209 @@ +%%% Copyright (c) 2019, Sunet. +%%% See LICENSE for licensing information. + +%% A manager is a genserver for coordination of clients and vtokens. + +%% Provide a lookup service for servers in need of a client to send +%% requests to, by keeping track of which module is current for a +%% given vtoken and spawn a p11p_client genserver "on demand". +%% +%% Provide a client event and a server event API for servers and +%% clients, respectively, where events like " token timed out" and +%% "p11 app hung up" can be reported. +%% +%% Keep track of successful p11 requests which might cause state +%% changes in a token, like logins. When switching token under the +%% feet of the p11 app, replay whatever is needed to the new +%% token. +%% Certain state changing p11 requests cannot be replayed, like +%% generation of a new key. Any such (successful) request invalidates +%% all other clients for the given vtoken. + +-module(p11p_manager). + +-behaviour(gen_server). + +%% API. +-export([start_link/0]). +-export([client_for_token/1, client_event/2]). % For servers. +-export([server_event/2]). % For clients. + +%% Genserver callbacks. +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +%% Records and types. +-record(client, { + tokname :: string(), + servid :: atom(), + modpath :: string(), + modenv :: [], + balance :: integer(), + pid :: pid() | undefined + }). + +-record(vtoken, { + mode :: p11p_config:token_mode_t(), + balance_count :: integer(), + clients :: [#client{}] % Active client in hd(). + }). + +-record(state, { + vtokens :: #{string() => #vtoken{}} + }). + +%% API implementation. +-spec start_link() -> {ok, pid()} | {error, term()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec client_for_token(string()) -> pid(). +client_for_token(TokName) -> + gen_server:call(?MODULE, {client_for_token, TokName}). +client_event(Event, Args) -> + gen_server:cast(?MODULE, {client_event, Event, Args}). + +server_event(Event, Args) -> + gen_server:cast(?MODULE, {server_event, Event, Args}). + +%% Genserver callbacks. +init([]) -> + {ok, #state{vtokens = init_vtokens(p11p_config:tokens())}}. + +handle_call({client_for_token, TokNameIn}, _, #state{vtokens = Tokens} = S) -> + #{TokNameIn := TokenIn} = Tokens, + ClientsIn = TokenIn#vtoken.clients, + lager:debug("all clients: ~p", [ClientsIn]), + {Clients, BalanceCount} = + case TokenIn#vtoken.balance_count of + 0 -> + lager:debug("~p: balancing: next client", [self()]), + Rotated = rotate_clients(ClientsIn), + First = hd(Rotated), + {Rotated, First#client.balance - 1}; + N when N > 0 -> + lager:debug("~p: balancing: ~B more invocations", [self(), N]), + {ClientsIn, N - 1}; + -1 -> + {ClientsIn, -1} + end, + #client{tokname = TokNameIn, + servid = ServId, + modpath = ModPath, + modenv = ModEnv, + pid = PidIn} = SelectedClient = hd(Clients), + case PidIn of + undefined -> + {ok, Pid} = + p11p_client:start_link(ServId, TokNameIn, ModPath, ModEnv), + Client = SelectedClient#client{pid = Pid}, + Token = TokenIn#vtoken{clients = [Client | tl(Clients)], + balance_count = BalanceCount}, + {reply, Pid, S#state{vtokens = Tokens#{TokNameIn := Token}}}; + _ -> + {reply, PidIn, S} + end; +handle_call(Call, _From, State) -> + lager:debug("Unhandled call: ~p~n", [Call]), + {reply, unhandled, State}. + +handle_cast({server_event, timeout, [TokNameIn, Server]}, + #state{vtokens = Tokens} = S) -> + lager:debug("~p: ~s: timed out, stopping ~p", [self(), TokNameIn, Server]), + gen_server:stop(Server), % Hang up on p11 client. + %% TODO: do some code dedup with client_for_token? + #{TokNameIn := TokenIn} = Tokens, + Clients = TokenIn#vtoken.clients, + SelectedClient = hd(Clients), + Client = SelectedClient#client{pid = undefined}, + Token = TokenIn#vtoken{clients = tl(Clients) ++ [Client]}, + lager:debug("~p: ~s: updated token: ~p", [self(), TokNameIn, Token]), + {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; + +handle_cast({client_event, client_gone, [TokName, Pid]}, + #state{vtokens = Tokens} = S) -> + lager:debug("~p: asking client ~p to stop", [self(), Pid]), + p11p_client:stop(Pid, normal), + #{TokName := TokenIn} = Tokens, + Clients = lists:map(fun(E) -> + case E#client.pid of + Pid -> E#client{pid = undefined}; + _ -> E + end + end, TokenIn#vtoken.clients), + Token = TokenIn#vtoken{clients = Clients}, + {noreply, S#state{vtokens = Tokens#{TokName := Token}}}; + +handle_cast(Cast, State) -> + lager:debug("Unhandled cast: ~p~n", [Cast]), + {noreply, State}. + +handle_info({Port, {exit_status, Status}}, State) -> + %% FIXME: do we need to be trapping exits explicitly? + lager:info("~p: process ~p exited with ~p", [self(), Port, Status]), + {noreply, State}; +handle_info(Info, State) -> + lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + +%% Private functions +-spec init_vtokens([p11p_config:token()]) -> #{string() => #vtoken{}}. +init_vtokens(ConfTokens) -> + init_vtokens(ConfTokens, #{}). +init_vtokens([], Acc)-> + lager:debug("~p: created tokens from config: ~p", [self(), Acc]), + Acc; +init_vtokens([H|T], Acc)-> + init_vtokens(T, Acc#{p11p_config:nameof(H) => new_vtoken(H)}). + +new_vtoken(Conf) -> + Name = p11p_config:nameof(Conf), + Mode = p11p_config:token_mode(Name), + Clients = clients(Name, + p11p_config:modules_for_token(Name), + Mode), + R0 = hd(Clients), + #vtoken{ + mode = p11p_config:token_mode(Name), + balance_count = R0#client.balance, + clients = Clients + }. + +clients(TokName, ConfModules, ConfMode) -> + clients(TokName, ConfModules, ConfMode, []). +clients(_, [], _, Acc) -> + Acc; +clients(TokName, [H|T], ConfMode, Acc) -> + ModName = p11p_config:nameof(H), + ServName = "p11p_client:" ++ TokName ++ ":" ++ ModName, + ModPath = p11p_config:module_path(H), + ModEnv = p11p_config:module_env(H), + clients(TokName, T, ConfMode, [#client{ + tokname = TokName, + servid = list_to_atom(ServName), + modpath = ModPath, + modenv = ModEnv, + balance = balance(ConfMode, length(T) + 1) + } + | Acc]). + +-spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). +balance({balance, Ratios}, N) -> + lists:nth(N, Ratios); +balance(_, _) -> + -1. + +%% -spec balance_count(p11p_config:token_mode_t()) -> integer(). +%% balance_count(#vtoken{mode = {balance, _}, balance_count = C}) -> +%% C - 1; +%% balance_count(_) -> +%% -1. + +rotate_clients(L) -> + lists:reverse([hd(L) | lists:reverse(tl(L))]). diff --git a/p11p-daemon/src/p11p_remote.erl b/p11p-daemon/src/p11p_remote.erl deleted file mode 100644 index b27b333..0000000 --- a/p11p-daemon/src/p11p_remote.erl +++ /dev/null @@ -1,166 +0,0 @@ -%%% Copyright (c) 2019, Sunet. -%%% See LICENSE for licensing information. - -%% A remote spawns an Erlang port running the 'remote' program from -%% p11-kit. - -%% Receive p11 requests from p11p_server, forward them to the remote, -%% wait for a reply. If a reply is received within a timeout period, -%% forward the reply to the requesting p11p_server. If the request -%% times out, inform the remote manager (our parent). - -%% TODO: "remote" is not a great name and we shouldn't just inherit it -%% from p11p-kit. Let's use "client" or "proxy_client". - --module(p11p_remote). --behaviour(gen_server). - -%% API. --export([start_link/4]). --export([request/2, stop/2]). - --include("p11p_rpc.hrl"). - -%% Genserver callbacks. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -%% Records and types. --record(state, { - port :: port(), - replyto :: pid() | undefined, - timer :: reference() | undefined, - token :: string(), % Token name. - msg :: p11rpc:msg() | undefined, - recv_count = 0 :: non_neg_integer(), - send_count = 0 :: non_neg_integer() - }). - -%% API. --spec start_link(atom(), string(), string(), list()) -> - {ok, pid()} | {error, term()}. -start_link(ServName, TokName, ModPath, ModEnv) -> - lager:info("~p: p11p_remote starting for ~s", [ServName, ModPath]), - gen_server:start_link({local, ServName}, ?MODULE, - [TokName, ModPath, ModEnv], []). - --spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. -request(Remote, Request) -> - gen_server:call(Remote, {request, Request}). - -%% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether -%% Pid is alive or not. An example of when that can happen is when the -%% manager receiving a server_event about a lost client. If the server -%% process terminated on request from a remote which has timed out on -%% an rpc call, chances are that the remote has already terminated by -%% the time the manager is to act on the lost client. -stop(Pid, Reason) -> - gen_server:cast(Pid, {stop, Reason}). - -%% Genserver callbacks. -init([TokName, ModPath, ModEnv]) -> - RemoteBinPath = p11p_config:remotebin_path(), - Port = open_port({spawn_executable, RemoteBinPath}, - [stream, - exit_status, - {env, ModEnv}, - {args, [ModPath, "-v"]} % FIXME: Remove -v - ]), - lager:debug("~p: ~s: new remote port: ~p", [self(), RemoteBinPath, Port]), - lager:debug("~p: ~s: module: ~s, env: ~p", [self(), RemoteBinPath, ModPath, ModEnv]), - {ok, #state{port = Port, token = TokName}}. - -handle_call({request, Request}, {FromPid, _Tag}, - #state{port = Port, send_count = Sent} = S) -> - %%lager:debug("~p: sending request from ~p to remote ~p", [self(), FromPid, Port]), - D = p11p_rpc:serialise(Request), - Buf = case Sent of - 0 -> <>; - _ -> D - end, - ok = do_send(Port, Buf), - {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), - send_count = Sent + 1}}; - -handle_call(Call, _From, State) -> - lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), - {reply, unhandled, State}. - -handle_cast({stop, Reason}, State) -> - {stop, Reason, State}; - -handle_cast(Cast, State) -> - lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), - {noreply, State}. - -%% Receiving the very first response from remote since it was started. -handle_info({Port, {data, Data}}, State) - when Port == State#state.port, State#state.msg == undefined -> - case hd(Data) of % First octet is RPC protocol version. - ?RPC_VERSION -> - {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))}; - BadVersion -> - lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, - BadVersion]), - {noreply, State} - end; - -%% Receiving more data from remote. -handle_info({Port, {data, Data}}, #state{msg = Msg} = State) - when Port == State#state.port -> - {noreply, handle_remote_data(State, Msg, Data)}; - -%% Remote timed out. -handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) - when Port == S#state.port, Timer == S#state.timer -> - lager:info("~p: rpc request timed out, exiting", [self()]), - p11p_remote_manager:server_event(timeout, [Tok, Server]), - State = S#state{timer = undefined}, - {stop, normal, State}; - -handle_info(Info, State) -> - lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), - {noreply, State}. - -terminate(Reason, #state{port = Port}) -> - lager:debug("~p: remote terminating with reason ~p", [self(), Reason]), - port_close(Port), - ok. - -code_change(_OldVersion, State, _Extra) -> - {ok, State}. - -%% Private -do_send(Port, Buf) -> - %%lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]), - - %% case rand:uniform(15) of - %% 1 -> - %% lager:debug("~p: faking unresponsive remote (~p) by not sending it any.", [self(), Port]); - %% _ -> - %% port_command(Port, Buf) - %% end, - - true = port_command(Port, Buf), - ok. - -handle_remote_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, - MsgIn, DataIn) -> - case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of - {needmore, Msg} -> - S#state{msg = Msg}; - {done, Msg} -> - cancel_timer(Timer), - {ok, _BytesSent} = p11p_server:reply(Pid, Msg), - %% Saving potential data not consumed by parse/2 in new message. - S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), - recv_count = Recv + 1} - end. - -start_timer(Port) -> - %%lager:debug("~p: starting timer", [self()]), - erlang:start_timer(3000, self(), Port). - -cancel_timer(Timer) -> - %%lager:debug("~p: canceling timer", [self()]), - erlang:cancel_timer(Timer, [{async, true}, {info, false}]). diff --git a/p11p-daemon/src/p11p_remote_manager.erl b/p11p-daemon/src/p11p_remote_manager.erl deleted file mode 100644 index ad7fbaf..0000000 --- a/p11p-daemon/src/p11p_remote_manager.erl +++ /dev/null @@ -1,210 +0,0 @@ -%%% Copyright (c) 2019, Sunet. -%%% See LICENSE for licensing information. - -%% A remote manager is a genserver for coordination of remotes for all -%% tokens. - -%% Provide a lookup service for servers in need of a remote to send -%% requests to, by keeping track of which module is current for a -%% given vtoken and spawn a p11p_remote genserver "on demand". -%% -%% Provide a client event and a server event API for servers and -%% remotes, respectively, where events like "remote timed out" and -%% "p11 client hung up" can be reported. -%% -%% Keep track of successful p11 requests which might cause state -%% changes in a token, like logins. When switching token under the -%% feet of the p11 client, replay whatever is needed to the new -%% token. This includes the p11-kit RPC protocol version octet. -%% Certain state changing p11 requests cannot be replayed, like -%% generation of a new key. Any such (successful) request invalidates -%% all other remotes for the given vtoken. - --module(p11p_remote_manager). - --behaviour(gen_server). - -%% API. --export([start_link/0]). --export([remote_for_token/1, client_event/2]). % For servers. --export([server_event/2]). % For remotes. - -%% Genserver callbacks. --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - -%% Records and types. --record(remote, { - tokname :: string(), - servid :: atom(), - modpath :: string(), - modenv :: [], - balance :: integer(), - pid :: pid() | undefined - }). - --record(token, { - mode :: p11p_config:token_mode_t(), - balance_count :: integer(), - remotes :: [#remote{}] % Active remote in hd(). - }). - --record(state, { - tokens :: #{string() => #token{}} - }). - -%% API implementation. --spec start_link() -> {ok, pid()} | {error, term()}. -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec remote_for_token(string()) -> pid(). -remote_for_token(TokName) -> - gen_server:call(?MODULE, {remote_for_token, TokName}). -client_event(Event, Args) -> - gen_server:cast(?MODULE, {client_event, Event, Args}). - -server_event(Event, Args) -> - gen_server:cast(?MODULE, {server_event, Event, Args}). - -%% Genserver callbacks. -init([]) -> - {ok, #state{tokens = init_tokens(p11p_config:tokens())}}. - -handle_call({remote_for_token, TokNameIn}, _, #state{tokens = Tokens} = S) -> - #{TokNameIn := TokenIn} = Tokens, - RemotesIn = TokenIn#token.remotes, - lager:debug("all remotes: ~p", [RemotesIn]), - {Remotes, BalanceCount} = - case TokenIn#token.balance_count of - 0 -> - lager:debug("~p: balancing: next remote", [self()]), - Rotated = rotate_remotes(RemotesIn), - First = hd(Rotated), - {Rotated, First#remote.balance - 1}; - N when N > 0 -> - lager:debug("~p: balancing: ~B more invocations", [self(), N]), - {RemotesIn, N - 1}; - -1 -> - {RemotesIn, -1} - end, - #remote{tokname = TokNameIn, - servid = ServId, - modpath = ModPath, - modenv = ModEnv, - pid = PidIn} = SelectedRemote = hd(Remotes), - case PidIn of - undefined -> - {ok, Pid} = - p11p_remote:start_link(ServId, TokNameIn, ModPath, ModEnv), - Remote = SelectedRemote#remote{pid = Pid}, - Token = TokenIn#token{remotes = [Remote | tl(Remotes)], - balance_count = BalanceCount}, - {reply, Pid, S#state{tokens = Tokens#{TokNameIn := Token}}}; - _ -> - {reply, PidIn, S} - end; -handle_call(Call, _From, State) -> - lager:debug("Unhandled call: ~p~n", [Call]), - {reply, unhandled, State}. - -handle_cast({server_event, timeout, [TokNameIn, Server]}, - #state{tokens = Tokens} = S) -> - lager:debug("~p: ~s: timed out, stopping ~p", [self(), TokNameIn, Server]), - gen_server:stop(Server), % Hang up on p11 client. - %% TODO: do some code dedup with remote_for_token? - #{TokNameIn := TokenIn} = Tokens, - Remotes = TokenIn#token.remotes, - SelectedRemote = hd(Remotes), - Remote = SelectedRemote#remote{pid = undefined}, - Token = TokenIn#token{remotes = tl(Remotes) ++ [Remote]}, - lager:debug("~p: ~s: updated token: ~p", [self(), TokNameIn, Token]), - {noreply, S#state{tokens = Tokens#{TokNameIn := Token}}}; - -handle_cast({client_event, client_gone, [TokName, Pid]}, - #state{tokens = Tokens} = S) -> - lager:debug("~p: asking remote ~p to stop", [self(), Pid]), - p11p_remote:stop(Pid, normal), - #{TokName := TokenIn} = Tokens, - Remotes = lists:map(fun(E) -> - case E#remote.pid of - Pid -> E#remote{pid = undefined}; - _ -> E - end - end, TokenIn#token.remotes), - Token = TokenIn#token{remotes = Remotes}, - {noreply, S#state{tokens = Tokens#{TokName := Token}}}; - -handle_cast(Cast, State) -> - lager:debug("Unhandled cast: ~p~n", [Cast]), - {noreply, State}. - -handle_info({Port, {exit_status, Status}}, State) -> - %% FIXME: do we need to be trapping exits explicitly? - lager:info("~p: process ~p exited with ~p", [self(), Port, Status]), - {noreply, State}; -handle_info(Info, State) -> - lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVersion, State, _Extra) -> - {ok, State}. - -%% Private functions --spec init_tokens([p11p_config:token()]) -> #{string() => #token{}}. -init_tokens(ConfTokens) -> - init_tokens(ConfTokens, #{}). -init_tokens([], Acc)-> - lager:debug("~p: created tokens from config: ~p", [self(), Acc]), - Acc; -init_tokens([H|T], Acc)-> - init_tokens(T, Acc#{p11p_config:nameof(H) => new_token(H)}). - -new_token(Conf) -> - Name = p11p_config:nameof(Conf), - Mode = p11p_config:token_mode(Name), - Remotes = remotes(Name, - p11p_config:modules_for_token(Name), - Mode), - R0 = hd(Remotes), - #token{ - mode = p11p_config:token_mode(Name), - balance_count = R0#remote.balance, - remotes = Remotes - }. - -remotes(TokName, ConfModules, ConfMode) -> - remotes(TokName, ConfModules, ConfMode, []). -remotes(_, [], _, Acc) -> - Acc; -remotes(TokName, [H|T], ConfMode, Acc) -> - ModName = p11p_config:nameof(H), - ServName = "p11p_remote:" ++ TokName ++ ":" ++ ModName, - ModPath = p11p_config:module_path(H), - ModEnv = p11p_config:module_env(H), - remotes(TokName, T, ConfMode, [#remote{ - tokname = TokName, - servid = list_to_atom(ServName), - modpath = ModPath, - modenv = ModEnv, - balance = balance(ConfMode, length(T) + 1) - } - | Acc]). - --spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). -balance({balance, Ratios}, N) -> - lists:nth(N, Ratios); -balance(_, _) -> - -1. - -%% -spec balance_count(p11p_config:token_mode_t()) -> integer(). -%% balance_count(#token{mode = {balance, _}, balance_count = C}) -> -%% C - 1; -%% balance_count(_) -> -%% -1. - -rotate_remotes(L) -> - lists:reverse([hd(L) | lists:reverse(tl(L))]). diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index b3ffa5c..cbc00df 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -20,7 +20,7 @@ %% Records and types. -record(state, { tokname :: string(), - remote :: pid() | undefined, + client :: pid() | undefined, socket :: gen_tcp:socket(), msg :: p11rpc_msg() | undefined, recv_count = 0 :: non_neg_integer(), @@ -83,14 +83,14 @@ handle_cast(Cast, State) -> %% First packet from P11 client. handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) - when S#state.remote == undefined -> + when S#state.client == undefined -> %%lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), <> = DataIn, case RPCVersion of ?RPC_VERSION -> {noreply, p11_client_data( - S#state{remote = p11p_remote_manager:remote_for_token(TokName)}, + S#state{client = p11p_manager:client_for_token(TokName)}, p11p_rpc:new(), Data)}; BadVersion -> @@ -112,9 +112,9 @@ handle_info(Info, S) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, S}. -terminate(Reason, #state{socket = Sock, tokname = TokName, remote = Remote}) -> +terminate(Reason, #state{socket = Sock, tokname = TokName, client = Client}) -> gen_tcp:close(Sock), - p11p_remote_manager:client_event(client_gone, [TokName, Remote]), + p11p_manager:client_event(client_gone, [TokName, Client]), lager:debug("~p: terminated with reason ~p", [self(), Reason]), ignored. @@ -122,13 +122,13 @@ code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions. -p11_client_data(#state{remote = Remote, recv_count = Recv} = S, MsgIn, +p11_client_data(#state{client = Client, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, DataIn) of {needmore, Msg} -> S#state{msg = Msg}; {done, Msg} -> - {ok, _BytesSent} = p11p_remote:request(Remote, Msg), + {ok, _BytesSent} = p11p_client:request(Client, Msg), S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. diff --git a/p11p-daemon/src/p11p_sup.erl b/p11p-daemon/src/p11p_sup.erl index 314b958..7f7025f 100644 --- a/p11p-daemon/src/p11p_sup.erl +++ b/p11p-daemon/src/p11p_sup.erl @@ -24,6 +24,6 @@ init([]) -> {ok, {{rest_for_one, 1, 5}, [ ?CHILD(p11p_config, worker), - ?CHILD(p11p_remote_manager, worker), + ?CHILD(p11p_manager, worker), ?CHILD(p11p_server_sup, supervisor) ]}}. -- cgit v1.1