summaryrefslogtreecommitdiff
path: root/src/plop.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/plop.erl')
-rw-r--r--src/plop.erl159
1 files changed, 142 insertions, 17 deletions
diff --git a/src/plop.erl b/src/plop.erl
index 5244144..d363582 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -35,6 +35,7 @@
-export([init/1, handle_call/3, terminate/2,
handle_cast/2, handle_info/2, code_change/3]).
+-import(stacktrace, [call/2]).
-include("plop.hrl").
%%-include("db.hrl").
-include_lib("public_key/include/public_key.hrl").
@@ -45,7 +46,10 @@
-record(state, {pubkey :: public_key:rsa_public_key(),
privkey :: public_key:rsa_private_key(),
- logid :: binary()}).
+ logid :: binary(),
+ http_requests,
+ own_requests
+ }).
%%%%% moved from plop.hrl, maybe remove
-define(PLOPVERSION, 0).
@@ -68,7 +72,23 @@ start_link(Keyfile, Passphrase) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Keyfile, Passphrase], []).
stop() ->
- gen_server:call(?MODULE, stop).
+ call(?MODULE, stop).
+
+add_http_request(Plop, RequestId, Data) ->
+ Plop#state{http_requests = dict:store(RequestId, Data,
+ Plop#state.http_requests)}.
+
+add_own_request(Plop, RequestId, Data) ->
+ Plop#state{own_requests = dict:store(RequestId, Data,
+ Plop#state.own_requests)}.
+
+remove_http_request(Plop, RequestId) ->
+ Plop#state{http_requests = dict:erase(RequestId,
+ Plop#state.http_requests)}.
+
+remove_own_request(Plop, RequestId) ->
+ Plop#state{own_requests = dict:erase(RequestId,
+ Plop#state.own_requests)}.
%%%%%%%%%%%%%%%%%%%%
init([PrivKeyfile, PubKeyfile]) ->
@@ -81,11 +101,49 @@ init([PrivKeyfile, PubKeyfile]) ->
_Tree = ht:reset_tree([db:size() - 1]),
{ok, #state{pubkey = Public_key,
privkey = Private_key,
- logid = LogID}}.
+ logid = LogID,
+ http_requests = dict:new(),
+ own_requests = dict:new()}}.
handle_cast(_Request, State) ->
{noreply, State}.
+handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}},
+ StatusCode, Body) ->
+ lager:debug("http_reply: ~p", [Body]),
+ {struct, PropList} = mochijson2:decode(Body),
+ Result = proplists:get_value(<<"result">>, PropList),
+ case dict:fetch(OwnRequestId, State#state.own_requests) of
+ undefined ->
+ {noreply, State};
+ {storage_sendentry, {From, Completion, RepliesUntilQuorum}}
+ when Result == <<"ok">>, StatusCode == 200 ->
+ case RepliesUntilQuorum - 1 of
+ 0 ->
+ %% reached quorum
+ gen_server:reply(From, ok),
+ StateWithCompletion = Completion(State),
+ {noreply, remove_own_request(StateWithCompletion,
+ OwnRequestId)};
+ NewRepliesUntilQuorum ->
+ {noreply, add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ NewRepliesUntilQuorum}})}
+ end
+ end.
+
+handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) ->
+ {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine,
+ case dict:fetch(RequestId, Plop#state.http_requests) of
+ undefined ->
+ {noreply, Plop};
+ ignore ->
+ {noreply, Plop};
+ HttpRequest ->
+ handle_http_reply(remove_http_request(Plop, RequestId),
+ HttpRequest, StatusCode, Body)
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -99,38 +157,97 @@ terminate(_Reason, _State) ->
%%%%%%%%%%%%%%%%%%%%
-spec add(binary(), binary(), binary()) -> ok.
add(LogEntry, TreeLeafHash, EntryHash) ->
- gen_server:call(?MODULE,
+ call(?MODULE,
{add, {LogEntry, TreeLeafHash, EntryHash}}).
sth() ->
- gen_server:call(?MODULE, {sth, []}).
+ call(?MODULE, {sth, []}).
-spec get(non_neg_integer(), non_neg_integer()) ->
[{non_neg_integer(), binary(), binary()}].
get(Start, End) ->
- gen_server:call(?MODULE, {get, {index, Start, End}}).
+ call(?MODULE, {get, {index, Start, End}}).
get(Hash) ->
- gen_server:call(?MODULE, {get, {hash, Hash}}).
+ call(?MODULE, {get, {hash, Hash}}).
spt(Data) ->
- gen_server:call(?MODULE, {spt, Data}).
+ call(?MODULE, {spt, Data}).
consistency(TreeSizeFirst, TreeSizeSecond) ->
- gen_server:call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}).
+ call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}).
-spec inclusion(binary(), non_neg_integer()) ->
{ok, {binary(), binary()}} | {notfound, string()}.
inclusion(Hash, TreeSize) ->
- gen_server:call(?MODULE, {inclusion, {Hash, TreeSize}}).
+ call(?MODULE, {inclusion, {Hash, TreeSize}}).
-spec inclusion_and_entry(non_neg_integer(), non_neg_integer()) ->
{ok, {binary(), binary()}} |
{notfound, string()}.
inclusion_and_entry(Index, TreeSize) ->
- gen_server:call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}).
+ call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}).
get_logid() ->
- gen_server:call(?MODULE, {get, logid}).
+ call(?MODULE, {get, logid}).
testing_get_pubkey() ->
- gen_server:call(?MODULE, {test, pubkey}).
+ call(?MODULE, {test, pubkey}).
+
+storage_nodes() ->
+ application:get_env(plop, storage_nodes, []).
+
+storage_nodes_quorum() ->
+ {ok, Value} = application:get_env(plop, storage_nodes_quorum),
+ Value.
+
+send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) ->
+ Request = mochijson2:encode(
+ {[{plop_version, 1},
+ {entry, base64:encode(LogEntry)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]),
+ httpc:request(post, {URLBase ++ "sendentry", [],
+ "text/json", list_to_binary(Request)},
+ [], [{sync, false}]).
+
+send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
+ Request = mochijson2:encode(
+ {[{plop_version, 1},
+ {entryhash, base64:encode(EntryHash)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ httpc:request(post, {URLBase ++ "entrycommitted", [],
+ "text/json", list_to_binary(Request)},
+ [], [{sync, false}]).
+
+store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
+ lager:debug("leafhash ~p", [TreeLeafHash]),
+ OwnRequestId = make_ref(),
+
+ Completion =
+ fun(CompletionState) ->
+ RequestIds = [send_storage_entrycommitted(URLBase, EntryHash,
+ TreeLeafHash)
+ || URLBase <- Nodes],
+ lists:foldl(fun({ok, RequestId}, StateAcc) ->
+ add_http_request(StateAcc, RequestId,
+ ignore)
+ end, CompletionState, RequestIds)
+ end,
+
+ PlopWithOwn = add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ storage_nodes_quorum()}}),
+
+ lager:debug("send requests to ~p", [Nodes]),
+ RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash)
+ || URLBase <- Nodes],
+ PlopWithRequests =
+ lists:foldl(fun({ok, RequestId}, PlopAcc) ->
+ add_http_request(PlopAcc, RequestId,
+ {storage_sendentry_http,
+ {OwnRequestId}})
+ end, PlopWithOwn, RequestIds),
+ PlopWithRequests.
fill_in_entry({_Index, LeafHash, notfetched}) ->
db:get_by_leaf_hash(LeafHash).
@@ -151,10 +268,18 @@ handle_call({get, logid}, _From,
Plop = #state{logid = LogID}) ->
{reply, LogID, Plop};
-handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) ->
- ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
- ok = ht:add(TreeLeafHash),
- {reply, ok, Plop};
+handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) ->
+ lager:debug("add leafhash ~p", [TreeLeafHash]),
+ case storage_nodes() of
+ [] ->
+ ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
+ ok = ht:add(TreeLeafHash),
+ {reply, ok, Plop};
+ Nodes ->
+ {noreply,
+ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash},
+ From, Plop)}
+ end;
handle_call({sth, Data}, _From,
Plop = #state{privkey = PrivKey}) ->