summaryrefslogtreecommitdiff
path: root/merge/src
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2017-02-07 14:51:24 +0100
committerLinus Nordberg <linus@nordu.net>2017-02-07 14:51:24 +0100
commit1c348ba24f38bed924a45d945c052fb1ccc29780 (patch)
treedffd2095bdc8f7f016235d4a0a4d92eb86d9cca0 /merge/src
parent0c50a66694fb55ca39397c6bfa2218597d856c66 (diff)
Parallelised merge, sth phase.
Diffstat (limited to 'merge/src')
-rw-r--r--merge/src/merge_backup.erl30
-rw-r--r--merge/src/merge_sup.erl4
-rw-r--r--merge/src/merge_util.erl25
3 files changed, 31 insertions, 28 deletions
diff --git a/merge/src/merge_backup.erl b/merge/src/merge_backup.erl
index bd75608..5f4f3d5 100644
--- a/merge/src/merge_backup.erl
+++ b/merge/src/merge_backup.erl
@@ -28,7 +28,7 @@ handle_cast(_Request, State) ->
{noreply, State}.
handle_info({timeout, _Timer, backup}, State) ->
- backup(fetched(), State).
+ backup(merge_util:readfile(fetched_path), State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -51,10 +51,10 @@ backup(-1, _, State) ->
{noreply, State#state{timer = erlang:start_timer(1000, self(), backup)}};
backup(Index, Hash,
#state{node_name = NodeName, node_address = NodeAddress} = State) ->
- ok = verify_logorder_and_fetched_consistency(Index, Hash),
+ ok = merge_util:verify_logorder_and_fetched_consistency(Index, Hash),
Size = index:indexsize(logorder),
lager:debug("~p: logorder size ~B", [NodeName, Size]),
- ht:load_tree(Size - 1),
+ ht:load_tree(Size - 1), % TODO: Make sure this is OK to do from multiple processes and that it's not "moving backwards".
try
{ok, VerifiedSize} = verified_size(NodeAddress),
lager:debug("~p: verifiedsize ~B", [NodeName, VerifiedSize]),
@@ -141,27 +141,3 @@ setverifiedsize(NodeAddress, Size) ->
Err ->
throw({request_error, result, DebugTag, Err})
end.
-
-fetched() ->
- case application:get_env(plop, fetched_path) of
- {ok, FetchedFile} ->
- case atomic:readfile(FetchedFile) of
- noentry ->
- noentry;
- Contents ->
- mochijson2:decode(Contents)
- end;
- undefined ->
- noentry
- end.
-
-verify_logorder_and_fetched_consistency(Index, Hash) ->
- HashString = binary_to_list(Hash),
- case hex:bin_to_hexstr(index:get(logorder, Index)) of
- HashString ->
- ok;
- Mismatch ->
- lager:error("fetched file hash=~p doesn't match logorder[~B]=~p",
- [HashString, Index, Mismatch]),
- fetched_mismatch
- end.
diff --git a/merge/src/merge_sup.erl b/merge/src/merge_sup.erl
index d20abf9..72512ba 100644
--- a/merge/src/merge_sup.erl
+++ b/merge/src/merge_sup.erl
@@ -19,5 +19,7 @@ init([]) ->
{merge_backup_sup, {merge_backup_sup, start_link, [[]]},
transient, infinity, supervisor, [merge_backup_sup]},
{merge_dist_sup, {merge_dist_sup, start_link, [[]]},
- transient, infinity, supervisor, [merge_dist_sup]}
+ transient, infinity, supervisor, [merge_dist_sup]},
+ {merge_sth, {merge_sth, start_link, [[]]},
+ permanent, 10000, worker, [merge_sth]}
]}}.
diff --git a/merge/src/merge_util.erl b/merge/src/merge_util.erl
index a6b3ac9..4a4340d 100644
--- a/merge/src/merge_util.erl
+++ b/merge/src/merge_util.erl
@@ -4,6 +4,7 @@
-module(merge_util).
-export([sendlog/4, sendentries/3]).
-export([request/2, request/4]).
+-export([readfile/1, verify_logorder_and_fetched_consistency/2]).
request(DebugTag, URL) ->
request(DebugTag, URL, [], <<>>).
@@ -72,3 +73,27 @@ sendentries_request(NodeAddress, HashesAndEntries) ->
Err ->
throw({request_error, result, DebugTag, Err})
end.
+
+readfile(FileInConfig) ->
+ case application:get_env(plop, FileInConfig) of
+ {ok, File} ->
+ case atomic:readfile(File) of
+ noentry ->
+ noentry;
+ Contents ->
+ mochijson2:decode(Contents)
+ end;
+ undefined ->
+ noentry
+ end.
+
+verify_logorder_and_fetched_consistency(Index, Hash) ->
+ HashString = binary_to_list(Hash),
+ case hex:bin_to_hexstr(index:get(logorder, Index)) of
+ HashString ->
+ ok;
+ Mismatch ->
+ lager:error("fetched file hash=~p doesn't match logorder[~B]=~p",
+ [HashString, Index, Mismatch]),
+ fetched_mismatch
+ end.