diff options
author | Linus Nordberg <linus@sunet.se> | 2019-06-28 23:33:26 +0200 |
---|---|---|
committer | Linus Nordberg <linus@sunet.se> | 2019-06-28 23:33:26 +0200 |
commit | 4034234ac618f92f22789dd922c777347881fb90 (patch) | |
tree | ecc1db41f359b2637a781d1c21f2d963fc655e9f /p11p-daemon | |
parent | db9fa57d1c8858661e85e3ce40cf94de4931e4f9 (diff) |
parse rpc replies and timeout if they're not on time
Not that it works though, demonstrated thanks to our attempts at
reusing a remote for a new client which is sending that version byte
before the rpc message. At least I think that's why.
Seems like send (to remote) is blocking and therefore the timeout
can't fire (same process). First things is that the timeout should
probably be in the server instead, in case the remote is blocking like
in this case. Second is that we'd at least seen something if the
server was calling the remote genserver instead of casting, du to
the default gen_server:call timeout of 5s.
Why are we casting in the first place? Well, we had to back when we
didn't collect the full rpc message before passing it on. That could
change now.
Diffstat (limited to 'p11p-daemon')
-rw-r--r-- | p11p-daemon/src/p11p_remote.erl | 26 | ||||
-rw-r--r-- | p11p-daemon/src/p11p_server.erl | 34 |
2 files changed, 42 insertions, 18 deletions
diff --git a/p11p-daemon/src/p11p_remote.erl b/p11p-daemon/src/p11p_remote.erl index c91f47c..3790971 100644 --- a/p11p-daemon/src/p11p_remote.erl +++ b/p11p-daemon/src/p11p_remote.erl @@ -28,7 +28,8 @@ replyto :: pid() | undefined, timer :: reference() | undefined, token :: string(), % Name - outbuf = <<>> :: binary() + outbuf = <<>> :: binary(), + msg :: p11rpc:msg() | undefined }). %% FIXME: move to config @@ -71,11 +72,14 @@ handle_cast(Request, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Request]), {noreply, State}. -handle_info({Port, {data, Data}}, #state{replyto = Pid, timer = Timer} = State) when Port == State#state.port -> - erlang:cancel_timer(Timer, [{async, true}, {info, false}]), - p11p_server:reply(Pid, Data), - {noreply, State}; -handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port andalso Timer == State#state.timer -> +%% TODO: dedup code w/ p11p_server +handle_info({Port, {data, Data}}, #state{replyto = Pid} = State) when Port == State#state.port, State#state.msg == undefined -> + Version = hd(Data), % First octet is version. + p11p_server:add_to_clientbuf(Pid, <<Version>>), + {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))}; +handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port -> + {noreply, handle_remote_data(State, Msg, Data)}; +handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port, Timer == State#state.timer -> p11p_remote_manager:timeout(TokName), NewState = State#state{timer = undefined}, {noreply, NewState}; @@ -99,3 +103,13 @@ do_send(#state{port = Port, outbuf = Buf} = State) -> lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]), port_command(Port, Buf), State#state{outbuf = <<>>}. + +handle_remote_data(#state{replyto = Pid, timer = Timer} = State, Msg, Data) -> + case p11p_rpc:parse(Msg, list_to_binary(Data)) of + {done, NewMsg} -> + erlang:cancel_timer(Timer, [{async, true}, {info, false}]), + ok = p11p_server:reply(Pid, NewMsg), + State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)}; + {needmore, NewMsg} -> + State#state{msg = NewMsg} + end. diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index accc86a..684c6e1 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -8,7 +8,7 @@ %% API. -export([start_link/1]). --export([reply/2]). +-export([add_to_clientbuf/2, reply/2]). %% Genserver callbacks. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -19,7 +19,8 @@ tokname :: string(), sockpath :: string(), % FIXME: filename(3erl) socket :: gen_tcp:socket(), - msg :: p11rpc_msg() | undefined + msg :: p11rpc_msg() | undefined, + clientbuf = <<>> :: binary() }). %% API. @@ -27,8 +28,13 @@ start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -reply(Pid, Data) -> - gen_server:cast(Pid, {response, Data}). +-spec add_to_clientbuf(pid(), binary()) -> binary(). +add_to_clientbuf(Pid, Data) -> + gen_server:call(Pid, {add_to_clientbuf, Data}). + +-spec reply(pid(), p11rpc_msg()) -> ok. +reply(Pid, Response) -> + gen_server:cast(Pid, {response, Response}). %% Genserver callbacks. init([Token, SocketPath, Socket]) -> @@ -37,9 +43,11 @@ init([Token, SocketPath, Socket]) -> gen_server:cast(self(), accept), % Perform accept in gen-server loop. {ok, #state{tokname = Token, sockpath = SocketPath, socket = Socket}}. -handle_call(Request, _From, State) -> - - lager:debug("~p: Unhandled call: ~p~n", [self(), Request]), +handle_call({add_to_clientbuf, Data}, _From, #state{clientbuf = Buf} = State) -> + NewBuf = <<Buf/binary, Data/binary>>, + {reply, NewBuf, State#state{clientbuf = NewBuf}}; +handle_call(Call, _From, State) -> + lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), {reply, unhandled, State}. handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, socket = ListenSocket}) -> @@ -57,10 +65,12 @@ handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, soc lager:debug("~p: listening socket closed", [self()]), {stop, normal, State} end; -handle_cast({response, Data}, #state{socket = ClientPort} = State) -> - lager:debug("~p: received ~B octets from remote", [self(), length(Data)]), - ok = gen_tcp:send(ClientPort, Data), - {noreply, State}; +handle_cast({response, Response}, #state{socket = ClientPort, clientbuf = Buf} = State) -> + %%lager:debug("~p: received ~B octets from remote", [self(), length(Data)]), + Data = p11p_rpc:serialise(Response), + NewBuf = <<Buf/binary, Data/binary>>, + ok = gen_tcp:send(ClientPort, NewBuf), % TODO: what about short writes? + {noreply, State#state{clientbuf = <<>>}}; handle_cast(Cast, State) -> lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]), {noreply, State}. @@ -69,7 +79,7 @@ handle_info({tcp, Port, Data}, #state{tokname = TokName, msg = Msg} = State) whe lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), <<Version:8, NewData/binary>> = Data, - p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <<Version>>), + p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <<Version>>), % FIXME: token reference needs to be cached, for consistancy at least across this and the next function head NewState = handle_client_data(State, p11p_rpc:new(), NewData), {noreply, NewState}; handle_info({tcp, Port, Data}, #state{msg = Msg} = State) -> |