summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2014-10-20 14:20:37 +0200
committerMagnus Ahltorp <map@kth.se>2014-10-24 15:36:35 +0200
commit653d3a1b047241ffda69cfc8601390591d63d295 (patch)
tree2f5cfab50235a84acb8e2d8b1c0a9cb80b277698
parent088e4de4f1e2499f6cc0e332ae8cb34b935a6425 (diff)
Make frontend send entries to storage nodes if storage_nodes configuration is set
-rw-r--r--src/plop.erl133
1 files changed, 127 insertions, 6 deletions
diff --git a/src/plop.erl b/src/plop.erl
index 0b101be..07042aa 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -45,7 +45,10 @@
-record(state, {pubkey :: public_key:rsa_public_key(),
privkey :: public_key:rsa_private_key(),
- logid :: binary()}).
+ logid :: binary(),
+ http_requests,
+ own_requests
+ }).
%%%%% moved from plop.hrl, maybe remove
-define(PLOPVERSION, 0).
@@ -70,6 +73,22 @@ start_link(Keyfile, Passphrase) ->
stop() ->
gen_server:call(?MODULE, stop).
+add_http_request(Plop, RequestId, Data) ->
+ Plop#state{http_requests = dict:store(RequestId, Data,
+ Plop#state.http_requests)}.
+
+add_own_request(Plop, RequestId, Data) ->
+ Plop#state{own_requests = dict:store(RequestId, Data,
+ Plop#state.own_requests)}.
+
+remove_http_request(Plop, RequestId) ->
+ Plop#state{http_requests = dict:erase(RequestId,
+ Plop#state.http_requests)}.
+
+remove_own_request(Plop, RequestId) ->
+ Plop#state{own_requests = dict:erase(RequestId,
+ Plop#state.own_requests)}.
+
%%%%%%%%%%%%%%%%%%%%
init([PrivKeyfile, PubKeyfile]) ->
%% Read RSA keypair.
@@ -81,11 +100,48 @@ init([PrivKeyfile, PubKeyfile]) ->
_Tree = ht:reset_tree([db:size() - 1]),
{ok, #state{pubkey = Public_key,
privkey = Private_key,
- logid = LogID}}.
+ logid = LogID,
+ http_requests = dict:new(),
+ own_requests = dict:new()}}.
handle_cast(_Request, State) ->
{noreply, State}.
+handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}},
+ StatusCode, Body) ->
+ {PropList} = (catch jiffy:decode(Body)),
+ Result = proplists:get_value(<<"result">>, PropList),
+ case dict:fetch(OwnRequestId, State#state.own_requests) of
+ undefined ->
+ {noreply, State};
+ {storage_sendentry, {From, Completion, RepliesUntilQuorum}}
+ when Result == <<"ok">>, StatusCode == 200 ->
+ case RepliesUntilQuorum - 1 of
+ 0 ->
+ %% reached quorum
+ gen_server:reply(From, ok),
+ StateWithCompletion = Completion(State),
+ {noreply, remove_own_request(StateWithCompletion,
+ OwnRequestId)};
+ NewRepliesUntilQuorum ->
+ {noreply, add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ NewRepliesUntilQuorum}})}
+ end
+ end.
+
+handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) ->
+ {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine,
+ case dict:fetch(RequestId, Plop#state.http_requests) of
+ undefined ->
+ {noreply, Plop};
+ ignore ->
+ {noreply, Plop};
+ HttpRequest ->
+ handle_http_reply(remove_http_request(Plop, RequestId),
+ HttpRequest, StatusCode, Body)
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -131,6 +187,64 @@ get_logid() ->
gen_server:call(?MODULE, {get, logid}).
testing_get_pubkey() ->
gen_server:call(?MODULE, {test, pubkey}).
+
+storage_nodes() ->
+ {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}),
+ Value.
+
+storage_nodes_quorum() ->
+ {ok, Value} = application:get_env(plop, storage_nodes_quorum),
+ Value.
+
+send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) ->
+ Request = jiffy:encode(
+ {[{plop_version, 1},
+ {entry, base64:encode(LogEntry)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ httpc:request(post, {URLBase ++ "sendentry", [],
+ "text/json", Request},
+ [], [{sync, false}]).
+
+send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
+ Request = jiffy:encode(
+ {[{plop_version, 1},
+ {entryhash, base64:encode(EntryHash)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ httpc:request(post, {URLBase ++ "entrycommitted", [],
+ "text/json", Request},
+ [], [{sync, false}]).
+
+store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
+ OwnRequestId = make_ref(),
+
+ Completion =
+ fun(CompletionState) ->
+ RequestIds = [send_storage_entrycommitted(URLBase, EntryHash,
+ TreeLeafHash)
+ || URLBase <- Nodes],
+ lists:foldl(fun({ok, RequestId}, StateAcc) ->
+ add_http_request(StateAcc, RequestId,
+ ignore)
+ end, CompletionState, RequestIds)
+ end,
+
+ PlopWithOwn = add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ storage_nodes_quorum()}}),
+
+ RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash)
+ || URLBase <- Nodes],
+ PlopWithRequests =
+ lists:foldl(fun({ok, RequestId}, PlopAcc) ->
+ add_http_request(PlopAcc, RequestId,
+ {storage_sendentry_http,
+ {OwnRequestId}})
+ end, PlopWithOwn, RequestIds),
+ PlopWithRequests.
+
%%%%%%%%%%%%%%%%%%%%
handle_call(stop, _From, Plop) ->
{stop, normal, stopped, Plop};
@@ -145,10 +259,17 @@ handle_call({get, logid}, _From,
Plop = #state{logid = LogID}) ->
{reply, LogID, Plop};
-handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) ->
- ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
- ok = ht:add(TreeLeafHash),
- {reply, ok, Plop};
+handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) ->
+ case storage_nodes() of
+ [] ->
+ ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
+ ok = ht:add(TreeLeafHash),
+ {reply, ok, Plop};
+ Nodes ->
+ {noreply,
+ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash},
+ From, Plop)}
+ end;
handle_call({sth, Data}, _From,
Plop = #state{privkey = PrivKey}) ->