diff options
Diffstat (limited to 'src/plop.erl')
-rw-r--r-- | src/plop.erl | 52 |
1 files changed, 37 insertions, 15 deletions
diff --git a/src/plop.erl b/src/plop.erl index 38d7047..821fe99 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -71,19 +71,23 @@ init([]) -> handle_cast(_Request, State) -> {noreply, State}. -handle_http_reply(RepliesUntilQuorum, +handle_http_reply(TreeLeafHash, RepliesUntilQuorum, StatusCode, Body) -> - lager:debug("http_reply: ~p", [Body]), + lager:debug("leafhash ~s: http_reply: ~p", + [mochihex:to_hex(TreeLeafHash), Body]), {struct, PropList} = mochijson2:decode(Body), Result = proplists:get_value(<<"result">>, PropList), if Result == <<"ok">>, StatusCode == 200 -> case RepliesUntilQuorum - 1 of 0 -> %% reached quorum - lager:debug("reached quorum"), + lager:debug("leafhash ~s: reached quorum", + [mochihex:to_hex(TreeLeafHash)]), {ok}; NewRepliesUntilQuorum -> - lager:debug("replies until quorum: ~p", [NewRepliesUntilQuorum]), + lager:debug("leafhash ~s: replies until quorum: ~p", + [mochihex:to_hex(TreeLeafHash), + NewRepliesUntilQuorum]), {continue, NewRepliesUntilQuorum} end end. @@ -101,7 +105,7 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% -spec add(binary(), binary(), binary()) -> ok. add(LogEntry, TreeLeafHash, EntryHash) -> - lager:debug("add leafhash ~p", [TreeLeafHash]), + lager:debug("add leafhash ~s", [mochihex:to_hex(TreeLeafHash)]), case storage_nodes() of [] -> exit(internal_merge_not_supported); @@ -195,11 +199,12 @@ send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> {entry, base64:encode(LogEntry)}, {treeleafhash, base64:encode(TreeLeafHash)} ]}), - lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]), + lager:debug("leafhash ~s: send sendentry to storage node ~p", + [mochihex:to_hex(TreeLeafHash), URLBase]), {ok, RequestId} = httpc:request(post, {URLBase ++ "sendentry", [], "text/json", list_to_binary(Request)}, [], [{sync, false}]), - RequestId. + {RequestId, URLBase}. send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> Request = mochijson2:encode( @@ -211,39 +216,56 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> "text/json", list_to_binary(Request)}, [], [{sync, false}]). -store_loop(Requests, RepliesUntilQuorum) -> +store_loop(TreeLeafHash, Requests, RepliesUntilQuorum) -> receive {http, {RequestId, {StatusLine, _Headers, Body}}} -> {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, - case sets:is_element(RequestId, Requests) of + case dict:is_key(RequestId, Requests) of false -> - lager:info("stray storage reply: ~p", [{StatusLine, Body}]), - store_loop(Requests, RepliesUntilQuorum); + lager:info("leafhash ~s: stray storage reply: ~p", + [mochihex:to_hex(TreeLeafHash), + {StatusLine, Body}]), + store_loop(TreeLeafHash, Requests, RepliesUntilQuorum); true -> - case handle_http_reply(RepliesUntilQuorum, StatusCode, Body) of + NewRequests = dict:erase(RequestId, Requests), + case handle_http_reply(TreeLeafHash, RepliesUntilQuorum, + StatusCode, Body) of {ok} -> ok; {continue, NewRepliesUntilQuorum} -> - store_loop(Requests, NewRepliesUntilQuorum) + store_loop(TreeLeafHash, NewRequests, + NewRepliesUntilQuorum) end end after 2000 -> + lager:error("leafhash ~s: storage failed: " ++ + "~p replies until quorum, nodes left: ~p", + [mochihex:to_hex(TreeLeafHash), RepliesUntilQuorum, + lists:map(fun({_Key, Value}) -> + Value + end, dict:to_list(Requests))]), error end. store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}) -> - lager:debug("leafhash ~p: send requests to ~p", [TreeLeafHash, Nodes]), + lager:debug("leafhash ~s: send requests to ~p", + [mochihex:to_hex(TreeLeafHash), Nodes]), Requests = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) || URLBase <- Nodes], - case store_loop(sets:from_list(Requests), storage_nodes_quorum()) of + case store_loop(TreeLeafHash, dict:from_list(Requests), + storage_nodes_quorum()) of ok -> + lager:debug("leafhash ~s: all requests answered", + [mochihex:to_hex(TreeLeafHash)]), lists:foreach(fun (URLBase) -> send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) end, Nodes), ok; Any -> + lager:debug("leafhash ~s: error: ~p", + [mochihex:to_hex(TreeLeafHash), Any]), Any end. |