%%% Copyright (c) 2019, Sunet. %%% See LICENSE for licensing information. %% A manager is a genserver for coordination of clients and vtokens. %% Provide a lookup service for servers in need of a client to send %% requests to, by keeping track of which module is current for a %% given vtoken and spawn a p11p_client genserver "on demand". %% %% Provide a client event and a server event API for servers and %% clients, respectively, where events like " token timed out" and %% "p11 app hung up" can be reported. %% %% Keep track of successful p11 requests which might cause state %% changes in a token, like logins. When switching token under the %% feet of the p11 app, replay whatever is needed to the new %% token. %% Certain state changing p11 requests cannot be replayed, like %% generation of a new key. Any such (successful) request invalidates %% all other clients for the given vtoken. -module(p11p_manager). -behaviour(gen_server). %% API. -export([start_link/0]). -export([client_for_token/1, server_event/2]). % For servers. -export([client_event/2]). % For clients. %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Records and types. -record(client, { tokname :: string(), servid :: atom(), modpath :: string(), modenv :: [], balance :: integer(), pid :: pid() | undefined }). -record(vtoken, { clients :: [#client{}], % Current client in hd(). %% Invokations left for current client or -1 for no %% balancing. balance_count = -1 :: integer(), timeout :: non_neg_integer(), retries :: non_neg_integer(), server :: pid() | undefined % Active server, if any. }). -record(state, { vtokens :: #{string() => #vtoken{}} }). %% API implementation. -spec start_link() -> {ok, pid()} | {error, term()}. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec client_for_token(string()) -> pid(). client_for_token(TokName) -> gen_server:call(?MODULE, {client_for_token, self(), TokName}). server_event(Event, Args) -> gen_server:cast(?MODULE, {server_event, Event, Args}). client_event(Event, Args) -> gen_server:cast(?MODULE, {client_event, Event, Args}). %% Genserver callbacks. init([]) -> {ok, #state{vtokens = init_vtokens(p11p_config:tokens())}}. handle_call({client_for_token, Server, TokNameIn}, _From, S = #state{vtokens = VTokensIn}) -> #{TokNameIn := VTokenIn} = VTokensIn, ClientsIn = VTokenIn#vtoken.clients, lager:debug("all clients: ~p", [ClientsIn]), {Clients, BalanceCount} = case VTokenIn#vtoken.balance_count of -1 -> {ClientsIn, -1}; 0 -> lager:debug("~p: balancing: next client", [self()]), Rotated = rotate_clients(ClientsIn), First = hd(Rotated), {Rotated, First#client.balance - 1}; N when N > 0 -> lager:debug("~p: balancing: ~B more invocations", [self(), N]), {ClientsIn, N - 1} end, Current = hd(Clients), case Current#client.pid of undefined -> Client = start_client(hd(Clients), Server, VTokenIn#vtoken.timeout), VToken = VTokenIn#vtoken{clients = [Client | tl(Clients)], server = Server, balance_count = BalanceCount}, {reply, Client#client.pid, S#state{vtokens = VTokensIn#{TokNameIn := VToken}}}; Pid -> {reply, Pid, S} end; handle_call(Call, _From, State) -> lager:debug("Unhandled call: ~p~n", [Call]), {reply, unhandled, State}. %% Server done with client. handle_cast({server_event, server_gone, TokNameIn}, S = #state{vtokens = Tokens}) -> #{TokNameIn := TokenIn} = Tokens, CurClient = hd(TokenIn#vtoken.clients), ClientPid = CurClient#client.pid, ok = p11p_client:stop(ClientPid, normal), Clients = lists:map(fun(E) -> % Find and update. case E#client.pid of ClientPid -> E#client{pid = undefined}; _ -> E end end, TokenIn#vtoken.clients), Token = TokenIn#vtoken{clients = Clients, server = undefined}, {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; %% Client reporting that a token has timed out. handle_cast({client_event, timeout, TokName}, State) -> #{TokName := VToken} = State#state.vtokens, client_timeout(TokName, VToken, State); handle_cast({start_client, VTokName}, State = #state{vtokens = VTokens}) -> #{VTokName := VToken} = VTokens, Client = start_client(hd(VToken#vtoken.clients), VToken#vtoken.server, VToken#vtoken.timeout), NewVToken = VToken#vtoken{clients = [Client | tl(VToken#vtoken.clients)]}, lager:debug("~p: vtoken updated: ~p", [self(), NewVToken]), {noreply, State#state{vtokens = VTokens#{VTokName := NewVToken}}}; handle_cast(Cast, State) -> lager:debug("Unhandled cast: ~p~n", [Cast]), {noreply, State}. handle_info({Port, {exit_status, Status}}, State) -> %% FIXME: do we need to be trapping exits explicitly? lager:info("~p: process ~p exited with ~p", [self(), Port, Status]), {noreply, State}; handle_info(Info, State) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private functions -spec init_vtokens([p11p_config:token()]) -> #{string() => #vtoken{}}. init_vtokens(ConfTokens) -> init_vtokens(ConfTokens, #{}). init_vtokens([], Acc)-> lager:debug("~p: created tokens from config: ~p", [self(), Acc]), Acc; init_vtokens([H|T], Acc)-> init_vtokens(T, Acc#{p11p_config:nameof(H) => new_vtoken(H)}). new_vtoken(Conf) -> Name = p11p_config:nameof(Conf), Balances = p11p_config:token_balance(Name), Clients = clients(Name, p11p_config:modules_for_token(Name), Balances), CurrentClient = hd(Clients), #vtoken{ clients = Clients, balance_count = CurrentClient#client.balance, timeout = p11p_config:token_timeout(Name), retries = p11p_config:token_retries(Name) }. clients(TokName, Modules, []) -> clients(TokName, Modules, [-1 || _ <- lists:seq(1, length(Modules))]); clients(TokName, ConfModules, ConfBalance) -> clients(TokName, ConfModules, ConfBalance, []). clients(_, [], _, Acc) -> Acc; clients(TokName, [Module|Modules], [Balance|Balances], Acc) -> ModName = p11p_config:nameof(Module), ServName = "p11p_client:" ++ TokName ++ ":" ++ ModName, clients(TokName, Modules, Balances, [#client{tokname = TokName, servid = list_to_atom(ServName), modpath = p11p_config:module_path(Module), modenv = p11p_config:module_env(Module), balance = Balance} | Acc]). %% -spec balance(p11p_config:token_mode_t(), non_neg_integer()) -> integer(). %% balance({balance, Ratios}, N) -> %% lists:nth(N, Ratios); %% balance(_, _) -> %% -1. %% -spec balance_count(p11p_config:token_mode_t()) -> integer(). %% balance_count(#vtoken{mode = {balance, _}, balance_count = C}) -> %% C - 1; %% balance_count(_) -> %% -1. rotate_clients(L) -> lists:reverse([hd(L) | lists:reverse(tl(L))]). rotate_clients(L, UpdatedCurr) -> lists:reverse([UpdatedCurr | lists:reverse(tl(L))]). next_client(VToken = #vtoken{clients = Clients}) -> OldC = hd(Clients), NewClients = rotate_clients(Clients, OldC#client{pid = undefined}), gen_server:cast(self(), {start_client, OldC#client.tokname}), VToken#vtoken{clients = NewClients}. %% Mark current client not running, inform its server, rotate client %% list and start a new client. client_timeout(TokName, VToken = #vtoken{retries = Retries}, State = #state{vtokens = VTokens}) when Retries > 0 -> lager:debug("~p: ~s: token timed out, switching token", [self(), TokName]), p11p_server:token_gone(VToken#vtoken.server, false), NewToken = next_client(VToken), NewVTokens = VTokens#{TokName := NewToken#vtoken{retries = Retries - 1}}, {noreply, State#state{vtokens = NewVTokens}}; client_timeout(TokName, VToken, State) -> lager:debug("~p: ~s: token timed out, disconnecting app", [self(), TokName]), p11p_server:token_gone(VToken#vtoken.server, true), {stop, normal, State}. start_client(Client, Server, Timeout) -> {ok, Pid} = p11p_client:start_link(Client#client.servid, Client#client.tokname, Server, Client#client.modpath, Client#client.modenv, Timeout), Client#client{pid = Pid}.