summaryrefslogtreecommitdiff
path: root/p11p-daemon/src
diff options
context:
space:
mode:
authorLinus Nordberg <linus@sunet.se>2019-06-28 23:33:26 +0200
committerLinus Nordberg <linus@sunet.se>2019-06-28 23:33:26 +0200
commit4034234ac618f92f22789dd922c777347881fb90 (patch)
treeecc1db41f359b2637a781d1c21f2d963fc655e9f /p11p-daemon/src
parentdb9fa57d1c8858661e85e3ce40cf94de4931e4f9 (diff)
parse rpc replies and timeout if they're not on time
Not that it works though, demonstrated thanks to our attempts at reusing a remote for a new client which is sending that version byte before the rpc message. At least I think that's why. Seems like send (to remote) is blocking and therefore the timeout can't fire (same process). First things is that the timeout should probably be in the server instead, in case the remote is blocking like in this case. Second is that we'd at least seen something if the server was calling the remote genserver instead of casting, du to the default gen_server:call timeout of 5s. Why are we casting in the first place? Well, we had to back when we didn't collect the full rpc message before passing it on. That could change now.
Diffstat (limited to 'p11p-daemon/src')
-rw-r--r--p11p-daemon/src/p11p_remote.erl26
-rw-r--r--p11p-daemon/src/p11p_server.erl34
2 files changed, 42 insertions, 18 deletions
diff --git a/p11p-daemon/src/p11p_remote.erl b/p11p-daemon/src/p11p_remote.erl
index c91f47c..3790971 100644
--- a/p11p-daemon/src/p11p_remote.erl
+++ b/p11p-daemon/src/p11p_remote.erl
@@ -28,7 +28,8 @@
replyto :: pid() | undefined,
timer :: reference() | undefined,
token :: string(), % Name
- outbuf = <<>> :: binary()
+ outbuf = <<>> :: binary(),
+ msg :: p11rpc:msg() | undefined
}).
%% FIXME: move to config
@@ -71,11 +72,14 @@ handle_cast(Request, State) ->
lager:debug("~p: Unhandled cast: ~p~n", [self(), Request]),
{noreply, State}.
-handle_info({Port, {data, Data}}, #state{replyto = Pid, timer = Timer} = State) when Port == State#state.port ->
- erlang:cancel_timer(Timer, [{async, true}, {info, false}]),
- p11p_server:reply(Pid, Data),
- {noreply, State};
-handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port andalso Timer == State#state.timer ->
+%% TODO: dedup code w/ p11p_server
+handle_info({Port, {data, Data}}, #state{replyto = Pid} = State) when Port == State#state.port, State#state.msg == undefined ->
+ Version = hd(Data), % First octet is version.
+ p11p_server:add_to_clientbuf(Pid, <<Version>>),
+ {noreply, handle_remote_data(State, p11p_rpc:new(), tl(Data))};
+handle_info({Port, {data, Data}}, #state{msg = Msg} = State) when Port == State#state.port ->
+ {noreply, handle_remote_data(State, Msg, Data)};
+handle_info({timeout, Timer, Port}, #state{token = TokName} = State) when Port == State#state.port, Timer == State#state.timer ->
p11p_remote_manager:timeout(TokName),
NewState = State#state{timer = undefined},
{noreply, NewState};
@@ -99,3 +103,13 @@ do_send(#state{port = Port, outbuf = Buf} = State) ->
lager:debug("~p: sending ~B octets to remote", [self(), size(Buf)]),
port_command(Port, Buf),
State#state{outbuf = <<>>}.
+
+handle_remote_data(#state{replyto = Pid, timer = Timer} = State, Msg, Data) ->
+ case p11p_rpc:parse(Msg, list_to_binary(Data)) of
+ {done, NewMsg} ->
+ erlang:cancel_timer(Timer, [{async, true}, {info, false}]),
+ ok = p11p_server:reply(Pid, NewMsg),
+ State#state{msg = p11p_rpc:new(NewMsg#p11rpc_msg.buffer)};
+ {needmore, NewMsg} ->
+ State#state{msg = NewMsg}
+ end.
diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl
index accc86a..684c6e1 100644
--- a/p11p-daemon/src/p11p_server.erl
+++ b/p11p-daemon/src/p11p_server.erl
@@ -8,7 +8,7 @@
%% API.
-export([start_link/1]).
--export([reply/2]).
+-export([add_to_clientbuf/2, reply/2]).
%% Genserver callbacks.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -19,7 +19,8 @@
tokname :: string(),
sockpath :: string(), % FIXME: filename(3erl)
socket :: gen_tcp:socket(),
- msg :: p11rpc_msg() | undefined
+ msg :: p11rpc_msg() | undefined,
+ clientbuf = <<>> :: binary()
}).
%% API.
@@ -27,8 +28,13 @@
start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
-reply(Pid, Data) ->
- gen_server:cast(Pid, {response, Data}).
+-spec add_to_clientbuf(pid(), binary()) -> binary().
+add_to_clientbuf(Pid, Data) ->
+ gen_server:call(Pid, {add_to_clientbuf, Data}).
+
+-spec reply(pid(), p11rpc_msg()) -> ok.
+reply(Pid, Response) ->
+ gen_server:cast(Pid, {response, Response}).
%% Genserver callbacks.
init([Token, SocketPath, Socket]) ->
@@ -37,9 +43,11 @@ init([Token, SocketPath, Socket]) ->
gen_server:cast(self(), accept), % Perform accept in gen-server loop.
{ok, #state{tokname = Token, sockpath = SocketPath, socket = Socket}}.
-handle_call(Request, _From, State) ->
-
- lager:debug("~p: Unhandled call: ~p~n", [self(), Request]),
+handle_call({add_to_clientbuf, Data}, _From, #state{clientbuf = Buf} = State) ->
+ NewBuf = <<Buf/binary, Data/binary>>,
+ {reply, NewBuf, State#state{clientbuf = NewBuf}};
+handle_call(Call, _From, State) ->
+ lager:debug("~p: Unhandled call: ~p~n", [self(), Call]),
{reply, unhandled, State}.
handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, socket = ListenSocket}) ->
@@ -57,10 +65,12 @@ handle_cast(accept, State = #state{tokname = TokName, sockpath = SocketPath, soc
lager:debug("~p: listening socket closed", [self()]),
{stop, normal, State}
end;
-handle_cast({response, Data}, #state{socket = ClientPort} = State) ->
- lager:debug("~p: received ~B octets from remote", [self(), length(Data)]),
- ok = gen_tcp:send(ClientPort, Data),
- {noreply, State};
+handle_cast({response, Response}, #state{socket = ClientPort, clientbuf = Buf} = State) ->
+ %%lager:debug("~p: received ~B octets from remote", [self(), length(Data)]),
+ Data = p11p_rpc:serialise(Response),
+ NewBuf = <<Buf/binary, Data/binary>>,
+ ok = gen_tcp:send(ClientPort, NewBuf), % TODO: what about short writes?
+ {noreply, State#state{clientbuf = <<>>}};
handle_cast(Cast, State) ->
lager:debug("~p: Unhandled cast: ~p~n", [self(), Cast]),
{noreply, State}.
@@ -69,7 +79,7 @@ handle_info({tcp, Port, Data}, #state{tokname = TokName, msg = Msg} = State) whe
lager:debug("~p: received ~B octets from client on socket ~p, from new client",
[self(), size(Data), Port]),
<<Version:8, NewData/binary>> = Data,
- p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <<Version>>),
+ p11p_remote:add_to_outbuf(p11p_remote_manager:remote_for_token(TokName), <<Version>>), % FIXME: token reference needs to be cached, for consistancy at least across this and the next function head
NewState = handle_client_data(State, p11p_rpc:new(), NewData),
{noreply, NewState};
handle_info({tcp, Port, Data}, #state{msg = Msg} = State) ->