%% A remote spawns an Erlang port running the 'remote' program from %% p11-kit. %% Receive p11 requests from p11p_server, forward them to the remote, %% wait for a reply. If a reply is received within a timeout period, %% forward the reply to the requesting p11p_server. If the request %% times out, inform the remote manager (our parent). %% TODO: "remote" is not a great name and we shouldn't just inherit it %% from p11p-kit -module(p11p_remote). -behaviour(gen_server). %% API. -export([start_link/3]). -export([request/2, add_to_outbuf/2, stop/2]). -include("p11p_rpc.hrl"). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% Records and types. -record(state, { port :: port(), replyto :: pid() | undefined, timer :: reference() | undefined, token :: string(), % Name outbuf = <<>> :: binary(), msg :: p11rpc:msg() | undefined }). %% FIXME: move to config -define(P11KITREMOTE_PATH, "/home/linus/usr/libexec/p11-kit/p11-kit-remote"). %% API. -spec start_link(atom(), string(), string()) -> {ok, pid()} | {error, term()}. start_link(ServName, TokName, ModPath) -> lager:info("~p: p11p_remote starting for ~s", [ServName, ModPath]), gen_server:start_link({local, ServName}, ?MODULE, [TokName, ModPath], []). -spec request(pid(), p11rpc_msg()) -> ok. request(Remote, Request) -> gen_server:call(Remote, {request, Request}). add_to_outbuf(Remote, Data) -> gen_server:call(Remote, {add_to_outbuf, Data}). %% Use stop/1 instead of gen_server:stop/1 if you're uncertain whether %% Pid is alive or not. An example of when that can happen is when the %% manager receiving a server_event about a lost client. If the server %% process terminated on request from a remote which has timed out on %% an rpc call, chances are that the remote has already terminated by %% the time the manager is to act on the lost client. stop(Pid, Reason) -> gen_server:cast(Pid, {stop, Reason}). %% Genserver callbacks. init([TokName, ModPath]) -> Port = open_port({spawn_executable, ?P11KITREMOTE_PATH}, [stream, exit_status, {args, [ModPath, "-v"]}]), lager:debug("~p: ~s: new remote port: ~p", [self(), ?P11KITREMOTE_PATH, Port]), {ok, #state{port = Port, token = TokName}}. handle_call({add_to_outbuf, Data}, _From, State) -> {reply, ok, do_add_to_outbuf(Data, State)}; handle_call({request, Request}, {FromPid, _Tag}, #state{port = Port} = State) -> %%lager:debug("~p: sending request from ~p to remote ~p", [self(), FromPid, Port]), NewState = do_send(do_add_to_outbuf(p11p_rpc:serialise(Request), State)), {reply, ok, NewState#state{replyto = FromPid, timer = start_timer(Port)}}; handle_call(Request, _From, State) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Request]), {reply, unhandled, State}. handle_cast({stop, Reason}, State) -> {stop, Reason, State}; handle_cast(Cast, State) -> lager:debug("~p: unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. %% TODO: dedup code w/ p11p_server handle_info({Port, {data, Data}}, #state{replyto = Pid} = State) when Port == State#state.port, State#state.msg == undefined -> Version = hd(Data), % First octet is version. {ok, _BytesAdded} = p11p_server:add_to_clientbuf(Pid, <>), {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))}; handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> {noreply, handle_remote_data(State, Msg, Data)}; handle_info({timeout, Timer, Port}, #state{token = TokName, replyto = Server} = State) when Port == State#state.port, Timer == State#state.timer -> lager:info("~p: rpc request timed out, exiting", [self()]), p11p_remote_manager:server_event(timeout, [TokName, Server]), NewState = State#state{timer = undefined}, {stop, normal, NewState}; handle_info(Info, State) -> lager:debug("~p: Unhandled info: ~p~n", [self(), Info]), {noreply, State}. terminate(Reason, #state{port = Port}) -> lager:debug("~p: remote terminating with reason ~p", [self(), Reason]), port_close(Port), ok. code_change(_OldVersion, State, _Extra) -> {ok, State}. %% Private do_add_to_outbuf(Data, #state{outbuf = Buf} = State) -> %%lager:debug("~p: adding ~B octets to outbuf", [self(), size(Data)]), NewBuf = <>, State#state{outbuf = NewBuf}. do_send(#state{port = Port, outbuf = Buf} = State) -> %%lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]), port_command(Port, Buf), State#state{outbuf = <<>>}. handle_remote_data(#state{replyto = Pid, timer = Timer} = State, Msg, Data) -> case p11p_rpc:parse(Msg, list_to_binary(Data)) of {done, NewMsg} -> cancel_timer(Timer), {ok, _BytesSent} = p11p_server:reply(Pid, NewMsg), State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)}; {needmore, NewMsg} -> State#state{msg = NewMsg} end. start_timer(Port) -> %%lager:debug("~p: starting timer", [self()]), erlang:start_timer(3000, self(), Port). cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), erlang:cancel_timer(Timer, [{async, true}, {info, false}]).