diff options
Diffstat (limited to 'src/plop.erl')
-rw-r--r-- | src/plop.erl | 159 |
1 files changed, 142 insertions, 17 deletions
diff --git a/src/plop.erl b/src/plop.erl index 5244144..d363582 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -35,6 +35,7 @@ -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). +-import(stacktrace, [call/2]). -include("plop.hrl"). %%-include("db.hrl"). -include_lib("public_key/include/public_key.hrl"). @@ -45,7 +46,10 @@ -record(state, {pubkey :: public_key:rsa_public_key(), privkey :: public_key:rsa_private_key(), - logid :: binary()}). + logid :: binary(), + http_requests, + own_requests + }). %%%%% moved from plop.hrl, maybe remove -define(PLOPVERSION, 0). @@ -68,7 +72,23 @@ start_link(Keyfile, Passphrase) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Keyfile, Passphrase], []). stop() -> - gen_server:call(?MODULE, stop). + call(?MODULE, stop). + +add_http_request(Plop, RequestId, Data) -> + Plop#state{http_requests = dict:store(RequestId, Data, + Plop#state.http_requests)}. + +add_own_request(Plop, RequestId, Data) -> + Plop#state{own_requests = dict:store(RequestId, Data, + Plop#state.own_requests)}. + +remove_http_request(Plop, RequestId) -> + Plop#state{http_requests = dict:erase(RequestId, + Plop#state.http_requests)}. + +remove_own_request(Plop, RequestId) -> + Plop#state{own_requests = dict:erase(RequestId, + Plop#state.own_requests)}. %%%%%%%%%%%%%%%%%%%% init([PrivKeyfile, PubKeyfile]) -> @@ -81,11 +101,49 @@ init([PrivKeyfile, PubKeyfile]) -> _Tree = ht:reset_tree([db:size() - 1]), {ok, #state{pubkey = Public_key, privkey = Private_key, - logid = LogID}}. + logid = LogID, + http_requests = dict:new(), + own_requests = dict:new()}}. handle_cast(_Request, State) -> {noreply, State}. +handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, + StatusCode, Body) -> + lager:debug("http_reply: ~p", [Body]), + {struct, PropList} = mochijson2:decode(Body), + Result = proplists:get_value(<<"result">>, PropList), + case dict:fetch(OwnRequestId, State#state.own_requests) of + undefined -> + {noreply, State}; + {storage_sendentry, {From, Completion, RepliesUntilQuorum}} + when Result == <<"ok">>, StatusCode == 200 -> + case RepliesUntilQuorum - 1 of + 0 -> + %% reached quorum + gen_server:reply(From, ok), + StateWithCompletion = Completion(State), + {noreply, remove_own_request(StateWithCompletion, + OwnRequestId)}; + NewRepliesUntilQuorum -> + {noreply, add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + NewRepliesUntilQuorum}})} + end + end. + +handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) -> + {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, + case dict:fetch(RequestId, Plop#state.http_requests) of + undefined -> + {noreply, Plop}; + ignore -> + {noreply, Plop}; + HttpRequest -> + handle_http_reply(remove_http_request(Plop, RequestId), + HttpRequest, StatusCode, Body) + end; handle_info(_Info, State) -> {noreply, State}. @@ -99,38 +157,97 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% -spec add(binary(), binary(), binary()) -> ok. add(LogEntry, TreeLeafHash, EntryHash) -> - gen_server:call(?MODULE, + call(?MODULE, {add, {LogEntry, TreeLeafHash, EntryHash}}). sth() -> - gen_server:call(?MODULE, {sth, []}). + call(?MODULE, {sth, []}). -spec get(non_neg_integer(), non_neg_integer()) -> [{non_neg_integer(), binary(), binary()}]. get(Start, End) -> - gen_server:call(?MODULE, {get, {index, Start, End}}). + call(?MODULE, {get, {index, Start, End}}). get(Hash) -> - gen_server:call(?MODULE, {get, {hash, Hash}}). + call(?MODULE, {get, {hash, Hash}}). spt(Data) -> - gen_server:call(?MODULE, {spt, Data}). + call(?MODULE, {spt, Data}). consistency(TreeSizeFirst, TreeSizeSecond) -> - gen_server:call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}). + call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}). -spec inclusion(binary(), non_neg_integer()) -> {ok, {binary(), binary()}} | {notfound, string()}. inclusion(Hash, TreeSize) -> - gen_server:call(?MODULE, {inclusion, {Hash, TreeSize}}). + call(?MODULE, {inclusion, {Hash, TreeSize}}). -spec inclusion_and_entry(non_neg_integer(), non_neg_integer()) -> {ok, {binary(), binary()}} | {notfound, string()}. inclusion_and_entry(Index, TreeSize) -> - gen_server:call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}). + call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}). get_logid() -> - gen_server:call(?MODULE, {get, logid}). + call(?MODULE, {get, logid}). testing_get_pubkey() -> - gen_server:call(?MODULE, {test, pubkey}). + call(?MODULE, {test, pubkey}). + +storage_nodes() -> + application:get_env(plop, storage_nodes, []). + +storage_nodes_quorum() -> + {ok, Value} = application:get_env(plop, storage_nodes_quorum), + Value. + +send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> + Request = mochijson2:encode( + {[{plop_version, 1}, + {entry, base64:encode(LogEntry)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]), + httpc:request(post, {URLBase ++ "sendentry", [], + "text/json", list_to_binary(Request)}, + [], [{sync, false}]). + +send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> + Request = mochijson2:encode( + {[{plop_version, 1}, + {entryhash, base64:encode(EntryHash)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "entrycommitted", [], + "text/json", list_to_binary(Request)}, + [], [{sync, false}]). + +store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + lager:debug("leafhash ~p", [TreeLeafHash]), + OwnRequestId = make_ref(), + + Completion = + fun(CompletionState) -> + RequestIds = [send_storage_entrycommitted(URLBase, EntryHash, + TreeLeafHash) + || URLBase <- Nodes], + lists:foldl(fun({ok, RequestId}, StateAcc) -> + add_http_request(StateAcc, RequestId, + ignore) + end, CompletionState, RequestIds) + end, + + PlopWithOwn = add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + storage_nodes_quorum()}}), + + lager:debug("send requests to ~p", [Nodes]), + RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) + || URLBase <- Nodes], + PlopWithRequests = + lists:foldl(fun({ok, RequestId}, PlopAcc) -> + add_http_request(PlopAcc, RequestId, + {storage_sendentry_http, + {OwnRequestId}}) + end, PlopWithOwn, RequestIds), + PlopWithRequests. fill_in_entry({_Index, LeafHash, notfetched}) -> db:get_by_leaf_hash(LeafHash). @@ -151,10 +268,18 @@ handle_call({get, logid}, _From, Plop = #state{logid = LogID}) -> {reply, LogID, Plop}; -handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) -> - ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), - ok = ht:add(TreeLeafHash), - {reply, ok, Plop}; +handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + lager:debug("add leafhash ~p", [TreeLeafHash]), + case storage_nodes() of + [] -> + ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), + ok = ht:add(TreeLeafHash), + {reply, ok, Plop}; + Nodes -> + {noreply, + store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, + From, Plop)} + end; handle_call({sth, Data}, _From, Plop = #state{privkey = PrivKey}) -> |