summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2016-11-22 14:35:36 +0100
committerMagnus Ahltorp <map@kth.se>2016-11-22 14:35:36 +0100
commit885c44e472843e1646687c071e9277da92d991e8 (patch)
treef48d443631f0a7fa57caa31bfbf46352814263c6
parent8826eb502c73df3a512a2d257f4264d68a10e1c8 (diff)
Make certain operations parallelexperimental-opt
-rw-r--r--src/db.erl10
-rw-r--r--src/frontend.erl30
-rw-r--r--src/perm.erl6
-rw-r--r--src/permdb.erl39
-rw-r--r--src/plop_httputil.erl3
-rwxr-xr-xtest/permdbtest.erl27
6 files changed, 89 insertions, 26 deletions
diff --git a/src/db.erl b/src/db.erl
index bf17e96..f130da9 100644
--- a/src/db.erl
+++ b/src/db.erl
@@ -7,7 +7,8 @@
%% API.
-export([start_link/0, stop/0]).
-export([create_size_table/0]).
--export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entryhash/2, set_treesize/1, size/0]).
+-export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entries_nosync/1,
+ add_entryhash/2, set_treesize/1, size/0]).
-export([add_index_nosync_noreverse/2]).
-export([verifiedsize/0, set_verifiedsize/1]).
-export([sendsth_verified/0, set_sendsth_verified/2]).
@@ -92,6 +93,13 @@ add_entry_nosync(LeafHash, Data) ->
lager:debug("leafhash ~s added", [mochihex:to_hex(LeafHash)]),
ok.
+-spec add_entries_nosync([{binary(), binary()}]) -> ok.
+add_entries_nosync(Entries) ->
+ lager:debug("add entries: ~s", [length(Entries)]),
+ ok = perm:addvalues(entry_db, Entries),
+ lager:debug("~s entries added", [length(Entries)]),
+ ok.
+
sync_entry_db() ->
lager:debug("committing entry db"),
perm:commit(entry_db),
diff --git a/src/frontend.erl b/src/frontend.erl
index a3ea885..80cd9d2 100644
--- a/src/frontend.erl
+++ b/src/frontend.erl
@@ -15,11 +15,11 @@ request(post, ?APPURL_PLOP_FRONTEND, "sendentry", Input) ->
{error, E} ->
html("sendentry: bad input:", E);
Entries when is_list(Entries) ->
- lists:map(fun ({struct, PropList}) ->
- LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
- TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
- ok = db:add_entry_nosync(TreeLeafHash, LogEntry)
- end, Entries),
+ ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+ {TreeLeafHash, LogEntry}
+ end, Entries)),
ok = db:sync_entry_db(),
success({[{result, <<"ok">>}]});
{struct, PropList} ->
@@ -124,11 +124,11 @@ request(post, ?APPURL_PLOP_MERGE, "sendentry", Input) ->
{error, E} ->
html("sendentry: bad input:", E);
Entries when is_list(Entries) ->
- lists:map(fun ({struct, PropList}) ->
- LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
- TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
- ok = db:add_entry_nosync(TreeLeafHash, LogEntry)
- end, Entries),
+ ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+ {TreeLeafHash, LogEntry}
+ end, Entries)),
ok = db:sync_entry_db(),
success({[{result, <<"ok">>}]});
{struct, PropList} ->
@@ -273,14 +273,18 @@ check_entries_onechunk(Start, End) ->
end.
check_entries_int(Entries, Start, End) ->
- lists:foldl(fun ({Hash, Index}, Acc) ->
- case check_entry(Hash, Index) of
+ ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1),
+ Results = util:parallel_map(fun ({Hash, Index}) ->
+ check_entry(Hash, Index)
+ end, lists:zip(Entries, lists:seq(Start, End)), ParallelTasks),
+ lists:foldl(fun (Result, Acc) ->
+ case Result of
ok ->
Acc;
Error ->
[Error | Acc]
end
- end, [], lists:zip(Entries, lists:seq(Start, End))).
+ end, [], Results).
check_entries_noreverse(Entries, Start, End) ->
ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1),
diff --git a/src/perm.erl b/src/perm.erl
index 2e12fdf..c238e8c 100644
--- a/src/perm.erl
+++ b/src/perm.erl
@@ -4,7 +4,7 @@
-module(perm).
-export([start_link/2, stop/1, init_module/0]).
--export([getvalue/2, addvalue/3, commit/1, commit/2]).
+-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2]).
start_link(Name, Filename) ->
Module = application:get_env(plop, db_backend, fsdb),
@@ -26,6 +26,10 @@ addvalue(Name, Key, Value) ->
Module = application:get_env(plop, db_backend, fsdb),
Module:addvalue(Name, Key, Value).
+addvalues(Name, KeyValues) ->
+ Module = application:get_env(plop, db_backend, fsdb),
+ Module:addvalues(Name, KeyValues).
+
commit(Name) ->
Module = application:get_env(plop, db_backend, fsdb),
Module:commit(Name).
diff --git a/src/permdb.erl b/src/permdb.erl
index 461b8b3..6cc0751 100644
--- a/src/permdb.erl
+++ b/src/permdb.erl
@@ -6,7 +6,7 @@
-behaviour(gen_server).
-export([start_link/2, stop/1, init_module/0]).
--export([getvalue/2, addvalue/3, commit/1, commit/2, keyexists/2]).
+-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2, keyexists/2]).
%% gen_server callbacks.
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
@@ -35,6 +35,9 @@ keyexists(Name, Key) ->
addvalue(Name, Key, Value) ->
gen_server:call(Name, {addvalue, Key, Value}).
+addvalues(Name, KeyValues) ->
+ gen_server:call(Name, {addvalues, KeyValues}).
+
commit(Name) ->
gen_server:call(Name, {commit}).
commit(Name, Timeout) ->
@@ -69,31 +72,38 @@ handle_info({Port, {data, Data}}, State) when is_port(Port) ->
lager:debug("response: ~p", [Data]),
{{value, {From, Action}}, Requests} = queue:out(State#state.requests),
lager:debug("response ~p ~p: ~p", [State#state.name, State#state.requestcounter - queue:len(State#state.requests), Action]),
- gen_server:reply(From, case Action of
+ case Action of
getvalue ->
case Data of
<<>> ->
- noentry;
+ gen_server:reply(From, noentry);
_ ->
- Data
+ gen_server:reply(From, Data)
end;
addvalue ->
case Data of
<<>> ->
util:exit_with_error(addvalue, unknown, "Error in addvalue");
_ ->
- ok
+ gen_server:reply(From, ok)
+ end;
+ addvalue_partial ->
+ case Data of
+ <<>> ->
+ util:exit_with_error(addvalue, unknown, "Error in addvalue");
+ _ ->
+ none
end;
commit ->
- Data;
+ gen_server:reply(From, Data);
keyexists ->
case Data of
<<0>> ->
- false;
+ gen_server:reply(From, false);
<<1>> ->
- true
+ gen_server:reply(From, true)
end
- end),
+ end,
{noreply, State#state{requests = Requests}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -125,6 +135,17 @@ handle_call({addvalue, Key, Value}, From, State) ->
addvalue_port_command(State#state.port, Key, Value),
{noreply, add_request(State, From, addvalue)};
+handle_call({addvalues, KeyValues}, From, State) ->
+ lager:debug("addvalues ~p ~p: ~p", [State#state.name, State#state.requestcounter, length(KeyValues)]),
+ lists:foreach(fun ({Key, Value}) ->
+ addvalue_port_command(State#state.port, Key, Value)
+ end, KeyValues),
+ NewState = lists:foldl(fun (_, Acc) ->
+ add_request(Acc, none, addvalue_partial)
+ end, State, tl(KeyValues)),
+
+ {noreply, add_request(NewState, From, addvalue)};
+
handle_call({commit}, From, State) ->
lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]),
commit_port_command(State#state.port),
diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl
index 37e25c1..e43e18c 100644
--- a/src/plop_httputil.erl
+++ b/src/plop_httputil.erl
@@ -67,7 +67,8 @@ request(DebugTag, URL, Headers, RequestBody) ->
lager:debug("~s: sending http request to ~p",
[DebugTag, URL]),
case hackney:connect(ParsedURL,
- [{ssl_options, [{cacertfile, CACertFile},
+ [{pool, default},
+ {ssl_options, [{cacertfile, CACertFile},
{verify, verify_peer},
{verify_fun, {fun verify_fun/3,
[{check_hostname, Host}]}}
diff --git a/test/permdbtest.erl b/test/permdbtest.erl
index 1c43861..a36ae85 100755
--- a/test/permdbtest.erl
+++ b/test/permdbtest.erl
@@ -53,6 +53,19 @@ addvalue_loop([{K, VSeed}|Rest], Port, Datasize) ->
exit(mismatch)
end.
+addvalues(TestData, Port, Datasize) ->
+ KeyValues = lists:map(fun ({K, VSeed}) ->
+ V = constructdata(VSeed, Datasize),
+ {K, V}
+ end, TestData),
+ case permdb:addvalues(testdb, KeyValues) of
+ ok ->
+ none;
+ Other ->
+ io:format("expected: 0 or 1 got: ~p~n", [Other]),
+ exit(mismatch)
+ end.
+
testget(_Filename, TestData, Datasize) ->
getvalue_loop(TestData, none, Datasize),
ok.
@@ -67,6 +80,16 @@ testadd(_Filename, TestData, Datasize) ->
exit(mismatch)
end.
+testaddmulti(_Filename, TestData, Datasize) ->
+ addvalues(TestData, none, Datasize),
+ case permdb:commit(testdb) of
+ <<0>> ->
+ ok;
+ Other ->
+ io:format("commit expected: 0 got: ~p~n", [Other]),
+ exit(mismatch)
+ end.
+
stop() ->
teststop(),
receive
@@ -76,6 +99,8 @@ stop() ->
end.
main([]) ->
+ application:set_env(lager, handlers, [{lager_console_backend, debug}]),
+lager:start(),
{ok, Cwd} = file:get_cwd(),
code:add_path(Cwd ++ "/ebin"),
Size = 10,
@@ -133,7 +158,7 @@ main([]) ->
testadd(Filename, gentestdata(1), 99),
testadd(Filename, gentestdata(1+2), 99),
testadd(Filename, gentestdata(1+2+3), 99),
- testadd(Filename, gentestdata(1+2+3+4), 99),
+ testaddmulti(Filename, gentestdata(1+2+3+4), 99),
testget(Filename, gentestdata(1+2+3+4), 99),
stop(),