summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db.erl10
-rw-r--r--src/frontend.erl30
-rw-r--r--src/perm.erl6
-rw-r--r--src/permdb.erl39
-rw-r--r--src/plop_httputil.erl3
5 files changed, 63 insertions, 25 deletions
diff --git a/src/db.erl b/src/db.erl
index bf17e96..f130da9 100644
--- a/src/db.erl
+++ b/src/db.erl
@@ -7,7 +7,8 @@
%% API.
-export([start_link/0, stop/0]).
-export([create_size_table/0]).
--export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entryhash/2, set_treesize/1, size/0]).
+-export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entries_nosync/1,
+ add_entryhash/2, set_treesize/1, size/0]).
-export([add_index_nosync_noreverse/2]).
-export([verifiedsize/0, set_verifiedsize/1]).
-export([sendsth_verified/0, set_sendsth_verified/2]).
@@ -92,6 +93,13 @@ add_entry_nosync(LeafHash, Data) ->
lager:debug("leafhash ~s added", [mochihex:to_hex(LeafHash)]),
ok.
+-spec add_entries_nosync([{binary(), binary()}]) -> ok.
+add_entries_nosync(Entries) ->
+ lager:debug("add entries: ~s", [length(Entries)]),
+ ok = perm:addvalues(entry_db, Entries),
+ lager:debug("~s entries added", [length(Entries)]),
+ ok.
+
sync_entry_db() ->
lager:debug("committing entry db"),
perm:commit(entry_db),
diff --git a/src/frontend.erl b/src/frontend.erl
index a3ea885..80cd9d2 100644
--- a/src/frontend.erl
+++ b/src/frontend.erl
@@ -15,11 +15,11 @@ request(post, ?APPURL_PLOP_FRONTEND, "sendentry", Input) ->
{error, E} ->
html("sendentry: bad input:", E);
Entries when is_list(Entries) ->
- lists:map(fun ({struct, PropList}) ->
- LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
- TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
- ok = db:add_entry_nosync(TreeLeafHash, LogEntry)
- end, Entries),
+ ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+ {TreeLeafHash, LogEntry}
+ end, Entries)),
ok = db:sync_entry_db(),
success({[{result, <<"ok">>}]});
{struct, PropList} ->
@@ -124,11 +124,11 @@ request(post, ?APPURL_PLOP_MERGE, "sendentry", Input) ->
{error, E} ->
html("sendentry: bad input:", E);
Entries when is_list(Entries) ->
- lists:map(fun ({struct, PropList}) ->
- LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
- TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
- ok = db:add_entry_nosync(TreeLeafHash, LogEntry)
- end, Entries),
+ ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+ {TreeLeafHash, LogEntry}
+ end, Entries)),
ok = db:sync_entry_db(),
success({[{result, <<"ok">>}]});
{struct, PropList} ->
@@ -273,14 +273,18 @@ check_entries_onechunk(Start, End) ->
end.
check_entries_int(Entries, Start, End) ->
- lists:foldl(fun ({Hash, Index}, Acc) ->
- case check_entry(Hash, Index) of
+ ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1),
+ Results = util:parallel_map(fun ({Hash, Index}) ->
+ check_entry(Hash, Index)
+ end, lists:zip(Entries, lists:seq(Start, End)), ParallelTasks),
+ lists:foldl(fun (Result, Acc) ->
+ case Result of
ok ->
Acc;
Error ->
[Error | Acc]
end
- end, [], lists:zip(Entries, lists:seq(Start, End))).
+ end, [], Results).
check_entries_noreverse(Entries, Start, End) ->
ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1),
diff --git a/src/perm.erl b/src/perm.erl
index 2e12fdf..c238e8c 100644
--- a/src/perm.erl
+++ b/src/perm.erl
@@ -4,7 +4,7 @@
-module(perm).
-export([start_link/2, stop/1, init_module/0]).
--export([getvalue/2, addvalue/3, commit/1, commit/2]).
+-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2]).
start_link(Name, Filename) ->
Module = application:get_env(plop, db_backend, fsdb),
@@ -26,6 +26,10 @@ addvalue(Name, Key, Value) ->
Module = application:get_env(plop, db_backend, fsdb),
Module:addvalue(Name, Key, Value).
+addvalues(Name, KeyValues) ->
+ Module = application:get_env(plop, db_backend, fsdb),
+ Module:addvalues(Name, KeyValues).
+
commit(Name) ->
Module = application:get_env(plop, db_backend, fsdb),
Module:commit(Name).
diff --git a/src/permdb.erl b/src/permdb.erl
index 461b8b3..6cc0751 100644
--- a/src/permdb.erl
+++ b/src/permdb.erl
@@ -6,7 +6,7 @@
-behaviour(gen_server).
-export([start_link/2, stop/1, init_module/0]).
--export([getvalue/2, addvalue/3, commit/1, commit/2, keyexists/2]).
+-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2, keyexists/2]).
%% gen_server callbacks.
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
@@ -35,6 +35,9 @@ keyexists(Name, Key) ->
addvalue(Name, Key, Value) ->
gen_server:call(Name, {addvalue, Key, Value}).
+addvalues(Name, KeyValues) ->
+ gen_server:call(Name, {addvalues, KeyValues}).
+
commit(Name) ->
gen_server:call(Name, {commit}).
commit(Name, Timeout) ->
@@ -69,31 +72,38 @@ handle_info({Port, {data, Data}}, State) when is_port(Port) ->
lager:debug("response: ~p", [Data]),
{{value, {From, Action}}, Requests} = queue:out(State#state.requests),
lager:debug("response ~p ~p: ~p", [State#state.name, State#state.requestcounter - queue:len(State#state.requests), Action]),
- gen_server:reply(From, case Action of
+ case Action of
getvalue ->
case Data of
<<>> ->
- noentry;
+ gen_server:reply(From, noentry);
_ ->
- Data
+ gen_server:reply(From, Data)
end;
addvalue ->
case Data of
<<>> ->
util:exit_with_error(addvalue, unknown, "Error in addvalue");
_ ->
- ok
+ gen_server:reply(From, ok)
+ end;
+ addvalue_partial ->
+ case Data of
+ <<>> ->
+ util:exit_with_error(addvalue, unknown, "Error in addvalue");
+ _ ->
+ none
end;
commit ->
- Data;
+ gen_server:reply(From, Data);
keyexists ->
case Data of
<<0>> ->
- false;
+ gen_server:reply(From, false);
<<1>> ->
- true
+ gen_server:reply(From, true)
end
- end),
+ end,
{noreply, State#state{requests = Requests}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -125,6 +135,17 @@ handle_call({addvalue, Key, Value}, From, State) ->
addvalue_port_command(State#state.port, Key, Value),
{noreply, add_request(State, From, addvalue)};
+handle_call({addvalues, KeyValues}, From, State) ->
+ lager:debug("addvalues ~p ~p: ~p", [State#state.name, State#state.requestcounter, length(KeyValues)]),
+ lists:foreach(fun ({Key, Value}) ->
+ addvalue_port_command(State#state.port, Key, Value)
+ end, KeyValues),
+ NewState = lists:foldl(fun (_, Acc) ->
+ add_request(Acc, none, addvalue_partial)
+ end, State, tl(KeyValues)),
+
+ {noreply, add_request(NewState, From, addvalue)};
+
handle_call({commit}, From, State) ->
lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]),
commit_port_command(State#state.port),
diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl
index 37e25c1..e43e18c 100644
--- a/src/plop_httputil.erl
+++ b/src/plop_httputil.erl
@@ -67,7 +67,8 @@ request(DebugTag, URL, Headers, RequestBody) ->
lager:debug("~s: sending http request to ~p",
[DebugTag, URL]),
case hackney:connect(ParsedURL,
- [{ssl_options, [{cacertfile, CACertFile},
+ [{pool, default},
+ {ssl_options, [{cacertfile, CACertFile},
{verify, verify_peer},
{verify_fun, {fun verify_fun/3,
[{check_hostname, Host}]}}