%%% Copyright (c) 2019, Sunet. %%% See LICENSE for licensing information. %% A remote spawns an Erlang port running the 'remote' program from %% p11-kit. %% Receive p11 requests from p11p_server, forward them to the remote, %% wait for a reply. If a reply is received within a timeout period, %% forward the reply to the requesting p11p_server. If the request %% times out, inform the remote manager (our parent). %% TODO: "remote" is not a great name and we shouldn't just inherit it %% from p11p-kit. Let's use "client" or "proxy_client". -module(p11p_remote). -behaviour(gen_server). %% API. -export([start_link/4]). -export([request/2, stop/2]). -include("p11p_rpc.hrl"). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Records and types. -record(state, { port :: port(), replyto :: pid() | undefined, timer :: reference() | undefined, token :: string(), % Token name. msg :: p11rpc:msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() }). %% API. -spec start_link(atom(), string(), string(), list()) -> {ok, pid()} | {error, term()}. start_link(ServName, TokName, ModPath, ModEnv) -> lager:info("~p: p11p_remote starting for ~s", [ServName, ModPath]), gen_server:start_link({local, ServName}, ?MODULE, [TokName, ModPath, ModEnv], []). -spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. request(Remote, Request) -> gen_server:call(Remote, {request, Request}). %% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether %% Pid is alive or not. An example of when that can happen is when the %% manager receiving a server_event about a lost client. If the server %% process terminated on request from a remote which has timed out on %% an rpc call, chances are that the remote has already terminated by %% the time the manager is to act on the lost client. stop(Pid, Reason) -> gen_server:cast(Pid, {stop, Reason}). %% Genserver callbacks. init([TokName, ModPath, ModEnv]) -> RemoteBinPath = p11p_config:remotebin_path(), Port = open_port({spawn_executable, RemoteBinPath}, [stream, exit_status, {env, ModEnv}, {args, [ModPath, "-v"]} % FIXME: Remove -v ]), lager:debug("~p: ~s: new remote port: ~p", [self(), RemoteBinPath, Port]), lager:debug("~p: ~s: module: ~s, env: ~p", [self(), RemoteBinPath, ModPath, ModEnv]), {ok, #state{port = Port, token = TokName}}. handle_call({request, Request}, {FromPid, _Tag}, #state{port = Port, send_count = Sent} = S) -> %%lager:debug("~p: sending request from ~p to remote ~p", [self(), FromPid, Port]), D = p11p_rpc:serialise(Request), Buf = case Sent of 0 -> <>; _ -> D end, ok = do_send(Port, Buf), {reply, {ok, sizeBuf}, S#state{replyto = FromPid, timer = start_timer(Port), send_count = Sent + 1}}; handle_call(Call, _From, State) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), {reply, unhandled, State}. handle_cast({stop, Reason}, State) -> {stop, Reason, State}; handle_cast(Cast, State) -> lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. %% Receiving the very first response from remote since it was started. handle_info({Port, {data, Data}}, State) when Port == State#state.port, State#state.msg == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), {noreply, State} end; %% Receiving more data from remote. handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> {noreply, handle_remote_data(State, Msg, Data)}; %% Remote timed out. handle_info({timeout, Timer, Port}, #state{token = Tok, replyto = Server} = S) when Port == S#state.port, Timer == S#state.timer -> lager:info("~p: rpc request timed out, exiting", [self()]), p11p_remote_manager:server_event(timeout, [Tok, Server]), State = S#state{timer = undefined}, {stop, normal, State}; handle_info(Info, State) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, State}. terminate(Reason, #state{port = Port}) -> lager:debug("~p: remote terminating with reason ~p", [self(), Reason]), port_close(Port), ok. code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private do_send(Port, Buf) -> %%lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]), %% case rand:uniform(15) of %% 1 -> %% lager:debug("~p: faking unresponsive remote (~p) by not sending it any.", [self(), Port]); %% _ -> %% port_command(Port, Buf) %% end, true = port_command(Port, Buf), ok. handle_remote_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> S#state{msg = Msg}; {done, Msg} -> cancel_timer(Timer), {ok, _BytesSent} = p11p_server:reply(Pid, Msg), %% Saving potential data not consumed by parse/2 in new message. S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), recv_count = Recv + 1} end. start_timer(Port) -> %%lager:debug("~p: starting timer", [self()]), erlang:start_timer(3000, self(), Port). cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), erlang:cancel_timer(Timer, [{async, true}, {info, false}]).