From 9f50fa4e8d7d82605116e07ea376da7ebedb8a57 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Tue, 11 Feb 2020 11:03:47 +0100 Subject: WIP track p11 state and shortcut responses when needed --- p11p-daemon/src/p11p_client.erl | 117 +++++++++++++++++++++++++++++++-------- p11p-daemon/src/p11p_manager.erl | 8 +-- p11p-daemon/src/p11p_rpc.erl | 14 +++++ p11p-daemon/src/p11p_rpc.hrl | 4 +- p11p-daemon/src/p11p_server.erl | 46 +++++++++++---- 5 files changed, 149 insertions(+), 40 deletions(-) diff --git a/p11p-daemon/src/p11p_client.erl b/p11p-daemon/src/p11p_client.erl index 7dc3457..87c2949 100644 --- a/p11p-daemon/src/p11p_client.erl +++ b/p11p-daemon/src/p11p_client.erl @@ -23,14 +23,18 @@ code_change/3]). %% Records and types. +-type token_state() :: started | initialized | session | loggedin | opact | finalized. + -record(state, { token :: string(), % Token name. timeout :: non_neg_integer(), port :: port(), replyto :: pid() | undefined, + + p11state = started :: token_state(), timer :: reference() | undefined, - msg :: p11rpc:msg() | undefined, + response :: p11rpc:msg() | undefined, recv_count = 0 :: non_neg_integer(), send_count = 0 :: non_neg_integer() }). @@ -43,7 +47,7 @@ start_link(ServName, TokName, Server, ModPath, ModEnv, Timeout) -> gen_server:start_link({local, ServName}, ?MODULE, [TokName, Server, ModPath, ModEnv, Timeout], []). --spec request(pid(), p11rpc_msg()) -> {ok, non_neg_integer()}. +-spec request(pid(), p11rpc_msg()) -> ack | nack | {ok, non_neg_integer()}. request(Client, Request) -> gen_server:call(Client, {request, Request}). @@ -70,18 +74,46 @@ init([TokName, Server, ModPath, ModEnv, Timeout]) -> lager:debug("~p: ~s: module: ~s, env: ~p", [self(), ProxyAppBinPath, ModPath, ModEnv]), {ok, #state{port = Port, token = TokName, replyto = Server, timeout = Timeout}}. -handle_call({request, Request}, {FromPid, _Tag}, - #state{port = Port, send_count = Sent} = S) -> - %%lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), - D = p11p_rpc:serialise(Request), - Buf = case Sent of - 0 -> <>; - _ -> D - end, - {ok, _} = do_send(Port, Buf), - {reply, {ok, sizeBuf}, S#state{replyto = FromPid, - timer = start_timer(S#state.timeout, Port), - send_count = Sent + 1}}; +handle_call({request, Request}, + {FromPid, _Tag}, + S = #state{port = Port, send_count = Sent}) -> + case + case S#state.p11state of + started -> + case p11p_rpc:req_id(Request) of + ?P11_RPC_CALL_C_Logout -> ack; + ?P11_RPC_CALL_C_CloseSession -> ack; + ?P11_RPC_CALL_C_Finalize -> ack; + + ?P11_RPC_CALL_C_Initialize -> pass; + ?P11_RPC_CALL_C_OpenSession -> pass; + ?P11_RPC_CALL_C_Login -> pass; + + _ -> nack + end; + _ -> + pass + end + of + ack -> + {reply, ack, S}; + nack -> + {reply, nack, S}; + pass -> + lager:debug("~p: sending request from ~p to prxoy app ~p", [self(), FromPid, Port]), + D = p11p_rpc:serialise(Request), + Buf = case Sent of + 0 -> <>; + _ -> D + end, + {ok, _} = do_send(Port, Buf), + + {reply, + {ok, size(Buf)}, + S#state{replyto = FromPid, + timer = start_timer(S#state.timeout, Port), + send_count = Sent + 1}} + end; handle_call(Call, _From, State) -> lager:debug("~p: Unhandled call: ~p~n", [self(), Call]), @@ -96,10 +128,10 @@ handle_cast(Cast, State) -> %% Receiving the very first response from proxy app since it was started. handle_info({Port, {data, Data}}, State) - when Port == State#state.port, State#state.msg == undefined -> + when Port == State#state.port, State#state.response == undefined -> case hd(Data) of % First octet is RPC protocol version. ?RPC_VERSION -> - {noreply, handle_token_data(State, p11p_rpc:new(), tl(Data))}; + {noreply, response_in(State, p11p_rpc:new(), tl(Data))}; BadVersion -> lager:info("~p: ~p: invalid RPC version: ~p", [self(), Port, BadVersion]), @@ -107,9 +139,9 @@ handle_info({Port, {data, Data}}, State) end; %% Receiving more data from proxy app. -handle_info({Port, {data, Data}}, #state{msg = Msg} = State) +handle_info({Port, {data, Data}}, #state{response = Msg} = State) when Port == State#state.port -> - {noreply, handle_token_data(State, Msg, Data)}; + {noreply, response_in(State, Msg, Data)}; %% Proxy app timed out. handle_info({timeout, Timer, Port}, S = #state{token = Tok}) @@ -144,17 +176,18 @@ do_send(Port, Buf) -> end, {ok, size(Buf)}. -handle_token_data(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, - MsgIn, DataIn) -> +response_in(#state{replyto = Pid, timer = Timer, recv_count = Recv} = S, + MsgIn, DataIn) -> case p11p_rpc:parse(MsgIn, list_to_binary(DataIn)) of {needmore, Msg} -> - S#state{msg = Msg}; + S#state{response = Msg}; {done, Msg} -> cancel_timer(Timer), lager:debug("~p: <- ~s", [self(), p11p_rpc:dump(Msg)]), {ok, _BytesSent} = p11p_server:reply(Pid, Msg), %% Saving potential data not consumed by parse/2 in new message. - S#state{msg = p11p_rpc:new(Msg#p11rpc_msg.buffer), + S#state{response = p11p_rpc:new(Msg#p11rpc_msg.buffer), + p11state = runstate(S#state.p11state, p11p_rpc:req_id(Msg)), recv_count = Recv + 1} end. @@ -165,3 +198,43 @@ start_timer(Timeout, Port) -> cancel_timer(Timer) -> %%lager:debug("~p: canceling timer", [self()]), erlang:cancel_timer(Timer, [{async, true}, {info, false}]). + +-spec runstate(token_state(), non_neg_integer()) -> token_state(). +runstate(started, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Initialize -> + initialized; + _ -> + started + end; +runstate(initialized, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_OpenSession -> + session; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + initialized + end; +runstate(session, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Login -> + loggedin; + ?P11_RPC_CALL_C_CloseSession -> + initialized; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + session + end; +runstate(loggedin, ReqId) -> + case ReqId of + ?P11_RPC_CALL_C_Logout -> + session; + ?P11_RPC_CALL_C_CloseSession -> + initialized; + ?P11_RPC_CALL_C_Finalize -> + finalized; + _ -> + loggedin + end. diff --git a/p11p-daemon/src/p11p_manager.erl b/p11p-daemon/src/p11p_manager.erl index 2dbdf6c..209d08e 100644 --- a/p11p-daemon/src/p11p_manager.erl +++ b/p11p-daemon/src/p11p_manager.erl @@ -126,9 +126,7 @@ handle_cast({server_event, server_gone, TokNameIn}, S = #state{vtokens = Tokens} Token = TokenIn#vtoken{clients = Clients, server = undefined}, {noreply, S#state{vtokens = Tokens#{TokNameIn := Token}}}; -%% Client reporting that a token has timed out -- mark current client -%% not running, inform server, rotate client list and start new -%% client. +%% Client reporting that a token has timed out. handle_cast({client_event, timeout, TokName}, State) -> #{TokName := VToken} = State#state.vtokens, client_timeout(TokName, VToken, State); @@ -225,6 +223,8 @@ next_client(VToken = #vtoken{clients = Clients}) -> gen_server:cast(self(), {start_client, OldC#client.tokname}), VToken#vtoken{clients = NewClients}. +%% Mark current client not running, inform its server, rotate client +%% list and start a new client. client_timeout(TokName, VToken = #vtoken{retries = Retries}, State = #state{vtokens = VTokens}) @@ -240,7 +240,7 @@ client_timeout(TokName, State) -> lager:debug("~p: ~s: token timed out, disconnecting app", [self(), TokName]), p11p_server:token_gone(VToken#vtoken.server, true), - {stop, State}. + {stop, normal, State}. start_client(Client, Server, Timeout) -> {ok, Pid} = p11p_client:start_link(Client#client.servid, diff --git a/p11p-daemon/src/p11p_rpc.erl b/p11p-daemon/src/p11p_rpc.erl index 03a476c..0e52bc5 100644 --- a/p11p-daemon/src/p11p_rpc.erl +++ b/p11p-daemon/src/p11p_rpc.erl @@ -9,7 +9,9 @@ dump/1, error/2, new/0, new/1, + ok/1, parse/2, + req_id/1, serialise/1 ]). @@ -36,6 +38,13 @@ error(CallCode, ErrorCode) -> data_len = size(DataBuf), data = DataBuf}. +ok(CallCode) -> + #p11rpc_msg{ + state = done, + call_code = CallCode, + opt_len = 0, + data_len = 0}. + parse(M) -> parse(M, <<>>). @@ -71,6 +80,11 @@ parse(#p11rpc_msg{buffer = MsgBuf} = M, DataIn) {done, Msg} end. +req_id(Msg) + when Msg#p11rpc_msg.data_len >= 4 -> + {ReqId, _} = parse_req_id(Msg#p11rpc_msg.data), + ReqId. + -spec serialise(p11rpc_msg()) -> binary(). serialise(M) when M#p11rpc_msg.state == done, M#p11rpc_msg.call_code > -1, diff --git a/p11p-daemon/src/p11p_rpc.hrl b/p11p-daemon/src/p11p_rpc.hrl index 0014f57..9d2b3f8 100644 --- a/p11p-daemon/src/p11p_rpc.hrl +++ b/p11p-daemon/src/p11p_rpc.hrl @@ -9,8 +9,8 @@ opt_len = -1 :: integer(), % Length is 4 data_len = -1 :: integer(), % Length is 4 - options = <<>> :: binary(), % Length is header.opt_len - data = <<>> :: binary(), % Length is header.buf_len + options = <<>> :: binary(), % Length is opt_len + data = <<>> :: binary(), % Length is data_len buffer = <<>> :: binary(), state = header :: header | opts | data | done diff --git a/p11p-daemon/src/p11p_server.erl b/p11p-daemon/src/p11p_server.erl index ef8877d..7b05da7 100644 --- a/p11p-daemon/src/p11p_server.erl +++ b/p11p-daemon/src/p11p_server.erl @@ -84,14 +84,17 @@ handle_cast({token_gone, Hangup}, State = #state{send_count = Sent}) -> Resp = p11p_rpc:error(State#state.req_out#p11rpc_msg.call_code, ?CKR_DEVICE_ERROR), {ok, _} = send_response(State#state.socket, p11p_rpc:serialise(Resp), Sent), - NewState = State#state{client = undefined, - req_out = undefined, + NewState = State#state{req_out = undefined, send_count = Sent + 1}, case Hangup of true -> - {close, NewState}; + lager:info("~p: Token reported gone, no more retries, closing.", [self()]), + {stop, normal, NewState}; %FIXME: no need to update state, i think false -> - {noreply, NewState} + lager:info("~p: Token reported gone, retrying with new token.", [self()]), + {noreply, + NewState#state{client = + p11p_manager:client_for_token(State#state.tokname)}} end; handle_cast(Cast, State) -> @@ -101,7 +104,7 @@ handle_cast(Cast, State) -> %% First packet from P11 client. handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) when S#state.client == undefined -> - %%lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(Data), Port]), + lager:debug("~p: received ~B octets from client on socket ~p, from new client", [self(), size(DataIn), Port]), <> = DataIn, case RPCVersion of ?RPC_VERSION -> @@ -117,8 +120,8 @@ handle_info({tcp, Port, DataIn}, #state{tokname = TokName} = S) end; %% Subsequent packages from P11 client. -handle_info({tcp, _Port, DataIn}, #state{req_in = Msg} = S) -> - %%lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(Data), Port, size(Msg#p11rpc_msg.buffer)]), +handle_info({tcp, Port, DataIn}, #state{req_in = Msg} = S) -> + lager:debug("~p: received ~B octets from client on socket ~p, with ~B octets already in buffer", [self(), size(DataIn), Port, size(Msg#p11rpc_msg.buffer)]), {noreply, p11_app_data(S, Msg, DataIn)}; handle_info({tcp_closed, Port}, S) -> @@ -151,10 +154,29 @@ p11_app_data(#state{client = Client, recv_count = Recv} = S, MsgIn, S#state{req_in = Msg}; {done, Msg} -> lager:debug("~p: -> ~s", [self(), p11p_rpc:dump(Msg)]), - {ok, _BytesSent} = p11p_client:request(Client, Msg), - S#state{req_out = Msg, - req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), - recv_count = Recv + 1} + case p11p_client:request(Client, Msg) of + ack -> + lager:debug("~p: acking request", [self()]), + Resp = p11p_rpc:ok(Msg#p11rpc_msg.call_code), + {ok, _} = send_response(S#state.socket, + p11p_rpc:serialise(Resp), + S#state.send_count), + S#state{req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + send_count = S#state.send_count + 1}; + nack -> + lager:debug("~p: nacking request", [self()]), + Resp = p11p_rpc:error(Msg#p11rpc_msg.call_code, + ?CKR_DEVICE_ERROR), + {ok, _} = send_response(S#state.socket, + p11p_rpc:serialise(Resp), + S#state.send_count), + S#state{req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + send_count = S#state.send_count + 1}; + {ok, _BytesSent} -> + S#state{req_out = Msg, + req_in = p11p_rpc:new(Msg#p11rpc_msg.buffer), + recv_count = Recv + 1} + end end. send_response(Sock, Inbuf, Sent) -> @@ -162,6 +184,6 @@ send_response(Sock, Inbuf, Sent) -> 0 -> <>; _ -> Inbuf end, - %%lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), + lager:debug("~p: sending ~B octets as response", [self(), size(Inbuf)]), ok = gen_tcp:send(Sock, Buf), {ok, size(Inbuf)}. -- cgit v1.1