diff options
author | Magnus Ahltorp <map@kth.se> | 2015-10-01 15:09:33 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2015-11-11 13:32:37 +0100 |
commit | f7a0018fb849bf0baefbea4af16ce8ce61ec69d0 (patch) | |
tree | 3afb7bffe904a7dc065d1a1e44e98c89c76f2515 /src | |
parent | 0a3e6aafee314eaf9e5343c4cad89a9e2ae1d913 (diff) |
Added util:parallel_map and use it when checking entries.
Diffstat (limited to 'src')
-rw-r--r-- | src/frontend.erl | 11 | ||||
-rw-r--r-- | src/util.erl | 84 |
2 files changed, 89 insertions, 6 deletions
diff --git a/src/frontend.erl b/src/frontend.erl index b7fa4b1..1055e5f 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -241,16 +241,17 @@ check_entries_int(Entries, Start, End) -> end, [], lists:zip(Entries, lists:seq(Start, End))). check_entries_noreverse(Entries, Start, End) -> - lists:foldl(fun ({Hash, Index}, Acc) -> - lager:info("checking entry ~p", [Index]), - case check_entry_noreverse(Hash, Index) of + Results = util:parallel_map(fun ({Hash, Index}) -> + check_entry_noreverse(Hash, Index) + end, lists:zip(Entries, lists:seq(Start, End)), 2), + lists:foldl(fun (Result, Acc) -> + case Result of ok -> - lager:info("entry ~p is correct", [Index]), Acc; Error -> [Error | Acc] end - end, [], lists:zip(Entries, lists:seq(Start, End))). + end, [], Results). entryhash_from_entry(Entry) -> {ok, {Module, Function}} = application:get_env(plop, entryhash_from_entry), diff --git a/src/util.erl b/src/util.erl index c3b30db..af78b93 100644 --- a/src/util.erl +++ b/src/util.erl @@ -4,7 +4,7 @@ -module(util). -export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3, check_error/3, write_tempfile_and_rename/3, - spawn_and_wait/1]). + spawn_and_wait/1, parallel_map/3]). -spec tempfilename(string()) -> string(). tempfilename(Base) -> @@ -73,3 +73,85 @@ spawn_and_wait(Fun) -> {result, ChildPid, Result} -> Result end. + + +parallel_map_worker_loop(ParentPid, Fun, N) -> + receive + {parallel_map_request, ParentPid, Input} -> + Result = Fun(Input), + ParentPid ! {parallel_map_result, self(), Result}, + parallel_map_worker_loop(ParentPid, Fun, N); + {parallel_map_stop, ParentPid} -> + ok + end. + +parallel_map_worker(ParentPid, Fun, N) -> + try + parallel_map_worker_loop(ParentPid, Fun, N) + catch + Type:What -> + [CrashFunction | Stack] = erlang:get_stacktrace(), + lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]), + ParentPid ! {parallel_map_crash, self()} + end. + +parallel_map_loop([], _FreeChildren, WorkingChildren, Acc) -> + case queue:out(WorkingChildren) of + {{value, FirstChild}, NewWorkingChildren} -> + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop([], [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + {empty, _} -> + Acc + end; + +parallel_map_loop(Items, [], WorkingChildren, Acc) -> + {{value, FirstChild}, NewWorkingChildren} = queue:out(WorkingChildren), + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop(Items, [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + +parallel_map_loop([Item|Rest], [FreeChild|FreeChildren], WorkingChildren, Acc) -> + FreeChild ! {parallel_map_request, self(), Item}, + parallel_map_loop(Rest, FreeChildren, queue:in(FreeChild, WorkingChildren), Acc). + + +parallel_map(Fun, List, Parallel) -> + ParentPid = self(), + ChildPids = lists:map(fun(N) -> + spawn_link(fun () -> + parallel_map_worker(ParentPid, Fun, N) + end) + end, lists:seq(1, Parallel)), + case parallel_map_loop(List, ChildPids, queue:new(), []) of + crash -> + exit(crash); + Result -> + lists:foreach(fun (Child) -> + Child ! {parallel_map_stop, self()} + end, ChildPids), + lists:reverse(Result) + end. + +%%%%%%%%%%%%%%%%%%%% + +-include_lib("eunit/include/eunit.hrl"). + +fact(N) -> + fact(N, 1). + +fact(1, Acc) -> + Acc; +fact(N, Acc) -> + fact(N - 1, Acc * N). + +parallel_map_test() -> + Result1 = lists:map(fun fact/1, lists:seq(1, 2000)), + Result2 = parallel_map(fun fact/1, lists:seq(1, 2000), 10), + ?assertEqual(Result1, Result2). |