diff options
author | Magnus Ahltorp <map@kth.se> | 2016-11-22 14:35:36 +0100 |
---|---|---|
committer | Magnus Ahltorp <map@kth.se> | 2016-11-22 14:35:36 +0100 |
commit | 885c44e472843e1646687c071e9277da92d991e8 (patch) | |
tree | f48d443631f0a7fa57caa31bfbf46352814263c6 | |
parent | 8826eb502c73df3a512a2d257f4264d68a10e1c8 (diff) |
Make certain operations parallelexperimental-opt
-rw-r--r-- | src/db.erl | 10 | ||||
-rw-r--r-- | src/frontend.erl | 30 | ||||
-rw-r--r-- | src/perm.erl | 6 | ||||
-rw-r--r-- | src/permdb.erl | 39 | ||||
-rw-r--r-- | src/plop_httputil.erl | 3 | ||||
-rwxr-xr-x | test/permdbtest.erl | 27 |
6 files changed, 89 insertions, 26 deletions
@@ -7,7 +7,8 @@ %% API. -export([start_link/0, stop/0]). -export([create_size_table/0]). --export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entryhash/2, set_treesize/1, size/0]). +-export([add_entry_sync/2, add_entry_nosync/2, sync_entry_db/0, add_entries_nosync/1, + add_entryhash/2, set_treesize/1, size/0]). -export([add_index_nosync_noreverse/2]). -export([verifiedsize/0, set_verifiedsize/1]). -export([sendsth_verified/0, set_sendsth_verified/2]). @@ -92,6 +93,13 @@ add_entry_nosync(LeafHash, Data) -> lager:debug("leafhash ~s added", [mochihex:to_hex(LeafHash)]), ok. +-spec add_entries_nosync([{binary(), binary()}]) -> ok. +add_entries_nosync(Entries) -> + lager:debug("add entries: ~s", [length(Entries)]), + ok = perm:addvalues(entry_db, Entries), + lager:debug("~s entries added", [length(Entries)]), + ok. + sync_entry_db() -> lager:debug("committing entry db"), perm:commit(entry_db), diff --git a/src/frontend.erl b/src/frontend.erl index a3ea885..80cd9d2 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -15,11 +15,11 @@ request(post, ?APPURL_PLOP_FRONTEND, "sendentry", Input) -> {error, E} -> html("sendentry: bad input:", E); Entries when is_list(Entries) -> - lists:map(fun ({struct, PropList}) -> - LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), - TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), - ok = db:add_entry_nosync(TreeLeafHash, LogEntry) - end, Entries), + ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + {TreeLeafHash, LogEntry} + end, Entries)), ok = db:sync_entry_db(), success({[{result, <<"ok">>}]}); {struct, PropList} -> @@ -124,11 +124,11 @@ request(post, ?APPURL_PLOP_MERGE, "sendentry", Input) -> {error, E} -> html("sendentry: bad input:", E); Entries when is_list(Entries) -> - lists:map(fun ({struct, PropList}) -> - LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), - TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), - ok = db:add_entry_nosync(TreeLeafHash, LogEntry) - end, Entries), + ok = db:add_entries_nosync(lists:map(fun ({struct, PropList}) -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + {TreeLeafHash, LogEntry} + end, Entries)), ok = db:sync_entry_db(), success({[{result, <<"ok">>}]}); {struct, PropList} -> @@ -273,14 +273,18 @@ check_entries_onechunk(Start, End) -> end. check_entries_int(Entries, Start, End) -> - lists:foldl(fun ({Hash, Index}, Acc) -> - case check_entry(Hash, Index) of + ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1), + Results = util:parallel_map(fun ({Hash, Index}) -> + check_entry(Hash, Index) + end, lists:zip(Entries, lists:seq(Start, End)), ParallelTasks), + lists:foldl(fun (Result, Acc) -> + case Result of ok -> Acc; Error -> [Error | Acc] end - end, [], lists:zip(Entries, lists:seq(Start, End))). + end, [], Results). check_entries_noreverse(Entries, Start, End) -> ParallelTasks = application:get_env(plop, check_entries_parallel_tasks, 1), diff --git a/src/perm.erl b/src/perm.erl index 2e12fdf..c238e8c 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -4,7 +4,7 @@ -module(perm). -export([start_link/2, stop/1, init_module/0]). --export([getvalue/2, addvalue/3, commit/1, commit/2]). +-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2]). start_link(Name, Filename) -> Module = application:get_env(plop, db_backend, fsdb), @@ -26,6 +26,10 @@ addvalue(Name, Key, Value) -> Module = application:get_env(plop, db_backend, fsdb), Module:addvalue(Name, Key, Value). +addvalues(Name, KeyValues) -> + Module = application:get_env(plop, db_backend, fsdb), + Module:addvalues(Name, KeyValues). + commit(Name) -> Module = application:get_env(plop, db_backend, fsdb), Module:commit(Name). diff --git a/src/permdb.erl b/src/permdb.erl index 461b8b3..6cc0751 100644 --- a/src/permdb.erl +++ b/src/permdb.erl @@ -6,7 +6,7 @@ -behaviour(gen_server). -export([start_link/2, stop/1, init_module/0]). --export([getvalue/2, addvalue/3, commit/1, commit/2, keyexists/2]). +-export([getvalue/2, addvalue/3, addvalues/2, commit/1, commit/2, keyexists/2]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, @@ -35,6 +35,9 @@ keyexists(Name, Key) -> addvalue(Name, Key, Value) -> gen_server:call(Name, {addvalue, Key, Value}). +addvalues(Name, KeyValues) -> + gen_server:call(Name, {addvalues, KeyValues}). + commit(Name) -> gen_server:call(Name, {commit}). commit(Name, Timeout) -> @@ -69,31 +72,38 @@ handle_info({Port, {data, Data}}, State) when is_port(Port) -> lager:debug("response: ~p", [Data]), {{value, {From, Action}}, Requests} = queue:out(State#state.requests), lager:debug("response ~p ~p: ~p", [State#state.name, State#state.requestcounter - queue:len(State#state.requests), Action]), - gen_server:reply(From, case Action of + case Action of getvalue -> case Data of <<>> -> - noentry; + gen_server:reply(From, noentry); _ -> - Data + gen_server:reply(From, Data) end; addvalue -> case Data of <<>> -> util:exit_with_error(addvalue, unknown, "Error in addvalue"); _ -> - ok + gen_server:reply(From, ok) + end; + addvalue_partial -> + case Data of + <<>> -> + util:exit_with_error(addvalue, unknown, "Error in addvalue"); + _ -> + none end; commit -> - Data; + gen_server:reply(From, Data); keyexists -> case Data of <<0>> -> - false; + gen_server:reply(From, false); <<1>> -> - true + gen_server:reply(From, true) end - end), + end, {noreply, State#state{requests = Requests}}; handle_info(_Info, State) -> {noreply, State}. @@ -125,6 +135,17 @@ handle_call({addvalue, Key, Value}, From, State) -> addvalue_port_command(State#state.port, Key, Value), {noreply, add_request(State, From, addvalue)}; +handle_call({addvalues, KeyValues}, From, State) -> + lager:debug("addvalues ~p ~p: ~p", [State#state.name, State#state.requestcounter, length(KeyValues)]), + lists:foreach(fun ({Key, Value}) -> + addvalue_port_command(State#state.port, Key, Value) + end, KeyValues), + NewState = lists:foldl(fun (_, Acc) -> + add_request(Acc, none, addvalue_partial) + end, State, tl(KeyValues)), + + {noreply, add_request(NewState, From, addvalue)}; + handle_call({commit}, From, State) -> lager:debug("commit ~p ~p", [State#state.name, State#state.requestcounter]), commit_port_command(State#state.port), diff --git a/src/plop_httputil.erl b/src/plop_httputil.erl index 37e25c1..e43e18c 100644 --- a/src/plop_httputil.erl +++ b/src/plop_httputil.erl @@ -67,7 +67,8 @@ request(DebugTag, URL, Headers, RequestBody) -> lager:debug("~s: sending http request to ~p", [DebugTag, URL]), case hackney:connect(ParsedURL, - [{ssl_options, [{cacertfile, CACertFile}, + [{pool, default}, + {ssl_options, [{cacertfile, CACertFile}, {verify, verify_peer}, {verify_fun, {fun verify_fun/3, [{check_hostname, Host}]}} diff --git a/test/permdbtest.erl b/test/permdbtest.erl index 1c43861..a36ae85 100755 --- a/test/permdbtest.erl +++ b/test/permdbtest.erl @@ -53,6 +53,19 @@ addvalue_loop([{K, VSeed}|Rest], Port, Datasize) -> exit(mismatch) end. +addvalues(TestData, Port, Datasize) -> + KeyValues = lists:map(fun ({K, VSeed}) -> + V = constructdata(VSeed, Datasize), + {K, V} + end, TestData), + case permdb:addvalues(testdb, KeyValues) of + ok -> + none; + Other -> + io:format("expected: 0 or 1 got: ~p~n", [Other]), + exit(mismatch) + end. + testget(_Filename, TestData, Datasize) -> getvalue_loop(TestData, none, Datasize), ok. @@ -67,6 +80,16 @@ testadd(_Filename, TestData, Datasize) -> exit(mismatch) end. +testaddmulti(_Filename, TestData, Datasize) -> + addvalues(TestData, none, Datasize), + case permdb:commit(testdb) of + <<0>> -> + ok; + Other -> + io:format("commit expected: 0 got: ~p~n", [Other]), + exit(mismatch) + end. + stop() -> teststop(), receive @@ -76,6 +99,8 @@ stop() -> end. main([]) -> + application:set_env(lager, handlers, [{lager_console_backend, debug}]), +lager:start(), {ok, Cwd} = file:get_cwd(), code:add_path(Cwd ++ "/ebin"), Size = 10, @@ -133,7 +158,7 @@ main([]) -> testadd(Filename, gentestdata(1), 99), testadd(Filename, gentestdata(1+2), 99), testadd(Filename, gentestdata(1+2+3), 99), - testadd(Filename, gentestdata(1+2+3+4), 99), + testaddmulti(Filename, gentestdata(1+2+3+4), 99), testget(Filename, gentestdata(1+2+3+4), 99), stop(), |