From 653d3a1b047241ffda69cfc8601390591d63d295 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 20 Oct 2014 14:20:37 +0200 Subject: Make frontend send entries to storage nodes if storage_nodes configuration is set --- src/plop.erl | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 127 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/plop.erl b/src/plop.erl index 0b101be..07042aa 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -45,7 +45,10 @@ -record(state, {pubkey :: public_key:rsa_public_key(), privkey :: public_key:rsa_private_key(), - logid :: binary()}). + logid :: binary(), + http_requests, + own_requests + }). %%%%% moved from plop.hrl, maybe remove -define(PLOPVERSION, 0). @@ -70,6 +73,22 @@ start_link(Keyfile, Passphrase) -> stop() -> gen_server:call(?MODULE, stop). +add_http_request(Plop, RequestId, Data) -> + Plop#state{http_requests = dict:store(RequestId, Data, + Plop#state.http_requests)}. + +add_own_request(Plop, RequestId, Data) -> + Plop#state{own_requests = dict:store(RequestId, Data, + Plop#state.own_requests)}. + +remove_http_request(Plop, RequestId) -> + Plop#state{http_requests = dict:erase(RequestId, + Plop#state.http_requests)}. + +remove_own_request(Plop, RequestId) -> + Plop#state{own_requests = dict:erase(RequestId, + Plop#state.own_requests)}. + %%%%%%%%%%%%%%%%%%%% init([PrivKeyfile, PubKeyfile]) -> %% Read RSA keypair. @@ -81,11 +100,48 @@ init([PrivKeyfile, PubKeyfile]) -> _Tree = ht:reset_tree([db:size() - 1]), {ok, #state{pubkey = Public_key, privkey = Private_key, - logid = LogID}}. + logid = LogID, + http_requests = dict:new(), + own_requests = dict:new()}}. handle_cast(_Request, State) -> {noreply, State}. +handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, + StatusCode, Body) -> + {PropList} = (catch jiffy:decode(Body)), + Result = proplists:get_value(<<"result">>, PropList), + case dict:fetch(OwnRequestId, State#state.own_requests) of + undefined -> + {noreply, State}; + {storage_sendentry, {From, Completion, RepliesUntilQuorum}} + when Result == <<"ok">>, StatusCode == 200 -> + case RepliesUntilQuorum - 1 of + 0 -> + %% reached quorum + gen_server:reply(From, ok), + StateWithCompletion = Completion(State), + {noreply, remove_own_request(StateWithCompletion, + OwnRequestId)}; + NewRepliesUntilQuorum -> + {noreply, add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + NewRepliesUntilQuorum}})} + end + end. + +handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) -> + {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, + case dict:fetch(RequestId, Plop#state.http_requests) of + undefined -> + {noreply, Plop}; + ignore -> + {noreply, Plop}; + HttpRequest -> + handle_http_reply(remove_http_request(Plop, RequestId), + HttpRequest, StatusCode, Body) + end; handle_info(_Info, State) -> {noreply, State}. @@ -131,6 +187,64 @@ get_logid() -> gen_server:call(?MODULE, {get, logid}). testing_get_pubkey() -> gen_server:call(?MODULE, {test, pubkey}). + +storage_nodes() -> + {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}), + Value. + +storage_nodes_quorum() -> + {ok, Value} = application:get_env(plop, storage_nodes_quorum), + Value. + +send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> + Request = jiffy:encode( + {[{plop_version, 1}, + {entry, base64:encode(LogEntry)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "sendentry", [], + "text/json", Request}, + [], [{sync, false}]). + +send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> + Request = jiffy:encode( + {[{plop_version, 1}, + {entryhash, base64:encode(EntryHash)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "entrycommitted", [], + "text/json", Request}, + [], [{sync, false}]). + +store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + OwnRequestId = make_ref(), + + Completion = + fun(CompletionState) -> + RequestIds = [send_storage_entrycommitted(URLBase, EntryHash, + TreeLeafHash) + || URLBase <- Nodes], + lists:foldl(fun({ok, RequestId}, StateAcc) -> + add_http_request(StateAcc, RequestId, + ignore) + end, CompletionState, RequestIds) + end, + + PlopWithOwn = add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + storage_nodes_quorum()}}), + + RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) + || URLBase <- Nodes], + PlopWithRequests = + lists:foldl(fun({ok, RequestId}, PlopAcc) -> + add_http_request(PlopAcc, RequestId, + {storage_sendentry_http, + {OwnRequestId}}) + end, PlopWithOwn, RequestIds), + PlopWithRequests. + %%%%%%%%%%%%%%%%%%%% handle_call(stop, _From, Plop) -> {stop, normal, stopped, Plop}; @@ -145,10 +259,17 @@ handle_call({get, logid}, _From, Plop = #state{logid = LogID}) -> {reply, LogID, Plop}; -handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) -> - ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), - ok = ht:add(TreeLeafHash), - {reply, ok, Plop}; +handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + case storage_nodes() of + [] -> + ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), + ok = ht:add(TreeLeafHash), + {reply, ok, Plop}; + Nodes -> + {noreply, + store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, + From, Plop)} + end; handle_call({sth, Data}, _From, Plop = #state{privkey = PrivKey}) -> -- cgit v1.1