summaryrefslogtreecommitdiff
path: root/src/plop.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/plop.erl')
-rw-r--r--src/plop.erl143
1 files changed, 52 insertions, 91 deletions
diff --git a/src/plop.erl b/src/plop.erl
index 24c4b8d..6501105 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -40,10 +40,6 @@
-include_lib("public_key/include/public_key.hrl").
-include_lib("eunit/include/eunit.hrl").
--record(state, {http_requests,
- own_requests
- }).
-
%%%%% moved from plop.hrl, maybe remove
-define(PLOPVERSION, 0).
-type signature_type() :: certificate_timestamp | tree_hash | test. % uint8
@@ -67,69 +63,31 @@ start_link() ->
stop() ->
call(?MODULE, stop).
-add_http_request(Plop, RequestId, Data) ->
- Plop#state{http_requests = dict:store(RequestId, Data,
- Plop#state.http_requests)}.
-
-add_own_request(Plop, RequestId, Data) ->
- Plop#state{own_requests = dict:store(RequestId, Data,
- Plop#state.own_requests)}.
-
-remove_http_request(Plop, RequestId) ->
- Plop#state{http_requests = dict:erase(RequestId,
- Plop#state.http_requests)}.
-
-remove_own_request(Plop, RequestId) ->
- Plop#state{own_requests = dict:erase(RequestId,
- Plop#state.own_requests)}.
-
%%%%%%%%%%%%%%%%%%%%
init([]) ->
_Tree = ht:reset_tree([db:size() - 1]),
- {ok, #state{http_requests = dict:new(),
- own_requests = dict:new()}}.
+ {ok, []}.
handle_cast(_Request, State) ->
{noreply, State}.
-handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}},
+handle_http_reply(RepliesUntilQuorum,
StatusCode, Body) ->
lager:debug("http_reply: ~p", [Body]),
{struct, PropList} = mochijson2:decode(Body),
Result = proplists:get_value(<<"result">>, PropList),
- case dict:fetch(OwnRequestId, State#state.own_requests) of
- undefined ->
- {noreply, State};
- {storage_sendentry, {From, Completion, RepliesUntilQuorum}}
- when Result == <<"ok">>, StatusCode == 200 ->
+ if Result == <<"ok">>, StatusCode == 200 ->
case RepliesUntilQuorum - 1 of
0 ->
%% reached quorum
lager:debug("reached quorum"),
- gen_server:reply(From, ok),
- StateWithCompletion = Completion(State),
- {noreply, remove_own_request(StateWithCompletion,
- OwnRequestId)};
+ {ok};
NewRepliesUntilQuorum ->
lager:debug("replies until quorum: ~p", [NewRepliesUntilQuorum]),
- {noreply, add_own_request(State, OwnRequestId,
- {storage_sendentry,
- {From, Completion,
- NewRepliesUntilQuorum}})}
+ {continue, NewRepliesUntilQuorum}
end
end.
-handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) ->
- {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine,
- case dict:fetch(RequestId, Plop#state.http_requests) of
- undefined ->
- {noreply, Plop};
- ignore ->
- {noreply, Plop};
- HttpRequest ->
- handle_http_reply(remove_http_request(Plop, RequestId),
- HttpRequest, StatusCode, Body)
- end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -143,8 +101,15 @@ terminate(_Reason, _State) ->
%%%%%%%%%%%%%%%%%%%%
-spec add(binary(), binary(), binary()) -> ok.
add(LogEntry, TreeLeafHash, EntryHash) ->
- call(?MODULE,
- {add, {LogEntry, TreeLeafHash, EntryHash}}).
+ lager:debug("add leafhash ~p", [TreeLeafHash]),
+ case storage_nodes() of
+ [] ->
+ exit(internal_merge_not_supported);
+ Nodes ->
+ util:spawn_and_wait(fun () ->
+ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash})
+ end)
+ end.
sth() ->
sth([]).
@@ -193,9 +158,10 @@ send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) ->
{treeleafhash, base64:encode(TreeLeafHash)}
]}),
lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]),
- httpc:request(post, {URLBase ++ "sendentry", [],
- "text/json", list_to_binary(Request)},
- [], [{sync, false}]).
+ {ok, RequestId} = httpc:request(post, {URLBase ++ "sendentry", [],
+ "text/json", list_to_binary(Request)},
+ [], [{sync, false}]),
+ RequestId.
send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
Request = mochijson2:encode(
@@ -207,36 +173,41 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
"text/json", list_to_binary(Request)},
[], [{sync, false}]).
-store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
- lager:debug("leafhash ~p", [TreeLeafHash]),
- OwnRequestId = make_ref(),
-
- Completion =
- fun(CompletionState) ->
- RequestIds = [send_storage_entrycommitted(URLBase, EntryHash,
- TreeLeafHash)
- || URLBase <- Nodes],
- lists:foldl(fun({ok, RequestId}, StateAcc) ->
- add_http_request(StateAcc, RequestId,
- ignore)
- end, CompletionState, RequestIds)
- end,
+store_loop(Requests, RepliesUntilQuorum) ->
+ receive
+ {http, {RequestId, {StatusLine, _Headers, Body}}} ->
+ {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine,
+ case sets:is_element(RequestId, Requests) of
+ false ->
+ lager:info("stray storage reply: ~p", [{StatusLine, Body}]),
+ store_loop(Requests, RepliesUntilQuorum);
+ true ->
+ case handle_http_reply(RepliesUntilQuorum, StatusCode, Body) of
+ {ok} ->
+ ok;
+ {continue, NewRepliesUntilQuorum} ->
+ store_loop(Requests, NewRepliesUntilQuorum)
+ end
+ end
+ after
+ 2000 ->
+ error
+ end.
+
- PlopWithOwn = add_own_request(State, OwnRequestId,
- {storage_sendentry,
- {From, Completion,
- storage_nodes_quorum()}}),
-
- lager:debug("send requests to ~p", [Nodes]),
- RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash)
- || URLBase <- Nodes],
- PlopWithRequests =
- lists:foldl(fun({ok, RequestId}, PlopAcc) ->
- add_http_request(PlopAcc, RequestId,
- {storage_sendentry_http,
- {OwnRequestId}})
- end, PlopWithOwn, RequestIds),
- PlopWithRequests.
+store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}) ->
+ lager:debug("leafhash ~p: send requests to ~p", [TreeLeafHash, Nodes]),
+ Requests = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) || URLBase <- Nodes],
+ case store_loop(sets:from_list(Requests), storage_nodes_quorum()) of
+ ok ->
+ lists:foreach(fun (URLBase) ->
+ send_storage_entrycommitted(URLBase, EntryHash,
+ TreeLeafHash)
+ end, Nodes),
+ ok;
+ Any ->
+ Any
+ end.
fill_in_entry({_Index, LeafHash, notfetched}) ->
db:get_by_leaf_hash(LeafHash).
@@ -253,16 +224,6 @@ handle_call({get, {index, Start, End}}, _From, Plop) ->
handle_call({get, {hash, EntryHash}}, _From, Plop) ->
{reply, db:get_by_entry_hash(EntryHash), Plop};
-handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) ->
- lager:debug("add leafhash ~p", [TreeLeafHash]),
- case storage_nodes() of
- [] ->
- exit(internal_merge_not_supported);
- Nodes ->
- {noreply,
- store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash},
- From, Plop)}
- end;
handle_call({consistency, {First, Second}}, _From, Plop) ->
{reply, ht:consistency(First - 1, Second - 1), Plop};