From 4c4f904ee1a7fe2b61137532de3668ba5ad3a6d8 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Tue, 18 Nov 2014 00:17:28 +0100 Subject: Move plop:add out of gen_server --- src/plop.erl | 143 ++++++++++++++++++++++------------------------------------- 1 file changed, 52 insertions(+), 91 deletions(-) (limited to 'src/plop.erl') diff --git a/src/plop.erl b/src/plop.erl index 24c4b8d..6501105 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -40,10 +40,6 @@ -include_lib("public_key/include/public_key.hrl"). -include_lib("eunit/include/eunit.hrl"). --record(state, {http_requests, - own_requests - }). - %%%%% moved from plop.hrl, maybe remove -define(PLOPVERSION, 0). -type signature_type() :: certificate_timestamp | tree_hash | test. % uint8 @@ -67,69 +63,31 @@ start_link() -> stop() -> call(?MODULE, stop). -add_http_request(Plop, RequestId, Data) -> - Plop#state{http_requests = dict:store(RequestId, Data, - Plop#state.http_requests)}. - -add_own_request(Plop, RequestId, Data) -> - Plop#state{own_requests = dict:store(RequestId, Data, - Plop#state.own_requests)}. - -remove_http_request(Plop, RequestId) -> - Plop#state{http_requests = dict:erase(RequestId, - Plop#state.http_requests)}. - -remove_own_request(Plop, RequestId) -> - Plop#state{own_requests = dict:erase(RequestId, - Plop#state.own_requests)}. - %%%%%%%%%%%%%%%%%%%% init([]) -> _Tree = ht:reset_tree([db:size() - 1]), - {ok, #state{http_requests = dict:new(), - own_requests = dict:new()}}. + {ok, []}. handle_cast(_Request, State) -> {noreply, State}. -handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, +handle_http_reply(RepliesUntilQuorum, StatusCode, Body) -> lager:debug("http_reply: ~p", [Body]), {struct, PropList} = mochijson2:decode(Body), Result = proplists:get_value(<<"result">>, PropList), - case dict:fetch(OwnRequestId, State#state.own_requests) of - undefined -> - {noreply, State}; - {storage_sendentry, {From, Completion, RepliesUntilQuorum}} - when Result == <<"ok">>, StatusCode == 200 -> + if Result == <<"ok">>, StatusCode == 200 -> case RepliesUntilQuorum - 1 of 0 -> %% reached quorum lager:debug("reached quorum"), - gen_server:reply(From, ok), - StateWithCompletion = Completion(State), - {noreply, remove_own_request(StateWithCompletion, - OwnRequestId)}; + {ok}; NewRepliesUntilQuorum -> lager:debug("replies until quorum: ~p", [NewRepliesUntilQuorum]), - {noreply, add_own_request(State, OwnRequestId, - {storage_sendentry, - {From, Completion, - NewRepliesUntilQuorum}})} + {continue, NewRepliesUntilQuorum} end end. -handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) -> - {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, - case dict:fetch(RequestId, Plop#state.http_requests) of - undefined -> - {noreply, Plop}; - ignore -> - {noreply, Plop}; - HttpRequest -> - handle_http_reply(remove_http_request(Plop, RequestId), - HttpRequest, StatusCode, Body) - end; handle_info(_Info, State) -> {noreply, State}. @@ -143,8 +101,15 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% -spec add(binary(), binary(), binary()) -> ok. add(LogEntry, TreeLeafHash, EntryHash) -> - call(?MODULE, - {add, {LogEntry, TreeLeafHash, EntryHash}}). + lager:debug("add leafhash ~p", [TreeLeafHash]), + case storage_nodes() of + [] -> + exit(internal_merge_not_supported); + Nodes -> + util:spawn_and_wait(fun () -> + store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}) + end) + end. sth() -> sth([]). @@ -193,9 +158,10 @@ send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> {treeleafhash, base64:encode(TreeLeafHash)} ]}), lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]), - httpc:request(post, {URLBase ++ "sendentry", [], - "text/json", list_to_binary(Request)}, - [], [{sync, false}]). + {ok, RequestId} = httpc:request(post, {URLBase ++ "sendentry", [], + "text/json", list_to_binary(Request)}, + [], [{sync, false}]), + RequestId. send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> Request = mochijson2:encode( @@ -207,36 +173,41 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> "text/json", list_to_binary(Request)}, [], [{sync, false}]). -store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> - lager:debug("leafhash ~p", [TreeLeafHash]), - OwnRequestId = make_ref(), - - Completion = - fun(CompletionState) -> - RequestIds = [send_storage_entrycommitted(URLBase, EntryHash, - TreeLeafHash) - || URLBase <- Nodes], - lists:foldl(fun({ok, RequestId}, StateAcc) -> - add_http_request(StateAcc, RequestId, - ignore) - end, CompletionState, RequestIds) - end, +store_loop(Requests, RepliesUntilQuorum) -> + receive + {http, {RequestId, {StatusLine, _Headers, Body}}} -> + {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, + case sets:is_element(RequestId, Requests) of + false -> + lager:info("stray storage reply: ~p", [{StatusLine, Body}]), + store_loop(Requests, RepliesUntilQuorum); + true -> + case handle_http_reply(RepliesUntilQuorum, StatusCode, Body) of + {ok} -> + ok; + {continue, NewRepliesUntilQuorum} -> + store_loop(Requests, NewRepliesUntilQuorum) + end + end + after + 2000 -> + error + end. + - PlopWithOwn = add_own_request(State, OwnRequestId, - {storage_sendentry, - {From, Completion, - storage_nodes_quorum()}}), - - lager:debug("send requests to ~p", [Nodes]), - RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) - || URLBase <- Nodes], - PlopWithRequests = - lists:foldl(fun({ok, RequestId}, PlopAcc) -> - add_http_request(PlopAcc, RequestId, - {storage_sendentry_http, - {OwnRequestId}}) - end, PlopWithOwn, RequestIds), - PlopWithRequests. +store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}) -> + lager:debug("leafhash ~p: send requests to ~p", [TreeLeafHash, Nodes]), + Requests = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) || URLBase <- Nodes], + case store_loop(sets:from_list(Requests), storage_nodes_quorum()) of + ok -> + lists:foreach(fun (URLBase) -> + send_storage_entrycommitted(URLBase, EntryHash, + TreeLeafHash) + end, Nodes), + ok; + Any -> + Any + end. fill_in_entry({_Index, LeafHash, notfetched}) -> db:get_by_leaf_hash(LeafHash). @@ -253,16 +224,6 @@ handle_call({get, {index, Start, End}}, _From, Plop) -> handle_call({get, {hash, EntryHash}}, _From, Plop) -> {reply, db:get_by_entry_hash(EntryHash), Plop}; -handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> - lager:debug("add leafhash ~p", [TreeLeafHash]), - case storage_nodes() of - [] -> - exit(internal_merge_not_supported); - Nodes -> - {noreply, - store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, - From, Plop)} - end; handle_call({consistency, {First, Second}}, _From, Plop) -> {reply, ht:consistency(First - 1, Second - 1), Plop}; -- cgit v1.1