From 4034234ac618f92f22789dd922c777347881fb90 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Fri, 28 Jun 2019 23:33:26 +0200 Subject: parse rpc replies and timeout if they're not on time Not that it works though, demonstrated thanks to our attempts at reusing a remote for a new client which is sending that version byte before the rpc message. At least I think that's why. Seems like send (to remote) is blocking and therefore the timeout can't fire (same process). First things is that the timeout should probably be in the server instead, in case the remote is blocking like in this case. Second is that we'd at least seen something if the server was calling the remote genserver instead of casting, du to the default gen_server:call timeout of 5s. Why are we casting in the first place? Well, we had to back when we didn't collect the full rpc message before passing it on. That could change now. --- p11p-daemon/src/p11p_remote.erl | 26 ++++++++++++++++++++------ p11p-daemon/src/p11p_server.erl | 34 ++++++++++++++++++++++------------ 2 files changed, 42 insertions(+), 18 deletions(-) (limited to 'p11p-daemon') diff --git a/p11p-daemon/src/p11p_remote.erl b/p11p-daemon/src/p11p_remote.erl index c91f47c..3790971 100644 --- a/p11p-daemon/src/p11p_remote.erl +++ b/p11p-daemon/src/p11p_remote.erl @@ -28,7 +28,8 @@ replyto :: pid() | undefined, timer :: reference() | undefined, token :: string(), % Name - outbuf = <<>> :: binary() + outbuf = <<>> :: binary(), + msg :: p11rpc:msg() | undefined }). %% FIXME: move to config @@ -71,11 +72,14 @@ handle_cast(Request, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Request]), {noreply, State}. -handle_info({Port, {data, Data}}, #state{replyto = Pid, timer = Timer} = State) when Port == State#state.port -> - erlang:cancel_timer(Timer, [{async, true}, {info, false}]), - p11p_server:reply(Pid, Data), - {noreply, State}; -handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port andalso Timer == State#state.timer -> +%% TODO: dedup code w/ p11p_server +handle_info({Port, {data, Data}}, #state{replyto = Pid} = State) when Port == State#state.port, State#state.msg == undefined -> + Version = hd(Data), % First octet is version. + p11p_server:add_to_clientbuf(Pid, <>), + {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))}; +handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> + {noreply, handle_remote_data(State, Msg, Data)}; +handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port, Timer == State#state.timer -> p11p_remote_manager:timeout(TokName), NewState = State#state{timer = undefined}, {noreply, NewState}; @@ -99,3 +103,13 @@ do_send(#state{port = Port, outbuf = Buf} = State) -> lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]), port_command(Port, Buf), State#state{outbuf = <<>>}. + +handle_remote_data(#state{replyto = Pid, timer = Timer} = State, Msg, Data) -> + case p11p_rpc:parse(Msg, list_to_binary(Data)) of + {done, NewMsg} -> + erlang:cancel_timer(Timer, [{async, true}, {info, false}]), + ok = p11p_server:reply(Pid, NewMsg), + State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)}; + {needmore, NewMsg} -> + State#state{msg = NewMsg} + end. diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index accc86a..684c6e1 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -8,7 +8,7 @@ %% API. -export([start_link/1]). --export([reply/2]). +-export([add_to_clientbuf/2, reply/2]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -19,7 +19,8 @@ tokname :: string(), sockpath :: string(), % FIXME: filename(3erl) socket :: gen_tcp:socket(), - msg :: p11rpc_msg() | undefined + msg :: p11rpc_msg() | undefined, + clientbuf = <<>> :: binary() }). %% API. @@ -27,8 +28,13 @@ start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -reply(Pid, Data) -> - gen_server:cast(Pid, {response, Data}). +-spec add_to_clientbuf(pid(), binary()) -> binary(). +add_to_clientbuf(Pid, Data) -> + gen_server:call(Pid, {add_to_clientbuf, Data}). + +-spec reply(pid(), p11rpc_msg()) -> ok. +reply(Pid, Response) -> + gen_server:cast(Pid, {response, Response}). %% Genserver callbacks. init([Token, SocketPath, Socket]) -> @@ -37,9 +43,11 @@ init([Token, SocketPath, Socket]) -> gen_server:cast(self(), accept), % Perform accept in gen-server loop. {ok, #state{tokname = Token, sockpath = SocketPath, socket = Socket}}. -handle_call(Request, _From, State) -> - - lager:debug("~p: Unhandled call: ~p~n", [self(), Request]), +handle_call({add_to_clientbuf, Data}, _From, #state{clientbuf = Buf} = State) -> + NewBuf = <>, + {reply, NewBuf, State#state{clientbuf = NewBuf}}; +handle_call(Call, _From, State) -> + lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), {reply, unhandled, State}. handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, socket = ListenSocket}) -> @@ -57,10 +65,12 @@ handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, soc lager:debug("~p: listening socket closed", [self()]), {stop, normal, State} end; -handle_cast({response, Data}, #state{socket = ClientPort} = State) -> - lager:debug("~p: received ~B octets from remote", [self(), length(Data)]), - ok = gen_tcp:send(ClientPort, Data), - {noreply, State}; +handle_cast({response, Response}, #state{socket = ClientPort, clientbuf = Buf} = State) -> + %%lager:debug("~p: received ~B octets from remote", [self(), length(Data)]), + Data = p11p_rpc:serialise(Response), + NewBuf = <>, + ok = gen_tcp:send(ClientPort, NewBuf), % TODO: what about short writes? + {noreply, State#state{clientbuf = <<>>}}; handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. @@ -69,7 +79,7 @@ handle_info({tcp, Port, Data}, #state{tokname = TokName, msg = Msg} = State) whe lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), <> = Data, - p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <>), + p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <>), % FIXME: token reference needs to be cached, for consistancy at least across this and the next function head NewState = handle_client_data(State, p11p_rpc:new(), NewData), {noreply, NewState}; handle_info({tcp, Port, Data}, #state{msg = Msg} = State) -> -- cgit v1.1