From f7a0018fb849bf0baefbea4af16ce8ce61ec69d0 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 1 Oct 2015 15:09:33 +0200 Subject: Added util:parallel_map and use it when checking entries. --- src/frontend.erl | 11 ++++---- src/util.erl | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- test/check.erl | 4 +-- 3 files changed, 91 insertions(+), 8 deletions(-) diff --git a/src/frontend.erl b/src/frontend.erl index b7fa4b1..1055e5f 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -241,16 +241,17 @@ check_entries_int(Entries, Start, End) -> end, [], lists:zip(Entries, lists:seq(Start, End))). check_entries_noreverse(Entries, Start, End) -> - lists:foldl(fun ({Hash, Index}, Acc) -> - lager:info("checking entry ~p", [Index]), - case check_entry_noreverse(Hash, Index) of + Results = util:parallel_map(fun ({Hash, Index}) -> + check_entry_noreverse(Hash, Index) + end, lists:zip(Entries, lists:seq(Start, End)), 2), + lists:foldl(fun (Result, Acc) -> + case Result of ok -> - lager:info("entry ~p is correct", [Index]), Acc; Error -> [Error | Acc] end - end, [], lists:zip(Entries, lists:seq(Start, End))). + end, [], Results). entryhash_from_entry(Entry) -> {ok, {Module, Function}} = application:get_env(plop, entryhash_from_entry), diff --git a/src/util.erl b/src/util.erl index c3b30db..af78b93 100644 --- a/src/util.erl +++ b/src/util.erl @@ -4,7 +4,7 @@ -module(util). -export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3, check_error/3, write_tempfile_and_rename/3, - spawn_and_wait/1]). + spawn_and_wait/1, parallel_map/3]). -spec tempfilename(string()) -> string(). tempfilename(Base) -> @@ -73,3 +73,85 @@ spawn_and_wait(Fun) -> {result, ChildPid, Result} -> Result end. + + +parallel_map_worker_loop(ParentPid, Fun, N) -> + receive + {parallel_map_request, ParentPid, Input} -> + Result = Fun(Input), + ParentPid ! {parallel_map_result, self(), Result}, + parallel_map_worker_loop(ParentPid, Fun, N); + {parallel_map_stop, ParentPid} -> + ok + end. + +parallel_map_worker(ParentPid, Fun, N) -> + try + parallel_map_worker_loop(ParentPid, Fun, N) + catch + Type:What -> + [CrashFunction | Stack] = erlang:get_stacktrace(), + lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]), + ParentPid ! {parallel_map_crash, self()} + end. + +parallel_map_loop([], _FreeChildren, WorkingChildren, Acc) -> + case queue:out(WorkingChildren) of + {{value, FirstChild}, NewWorkingChildren} -> + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop([], [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + {empty, _} -> + Acc + end; + +parallel_map_loop(Items, [], WorkingChildren, Acc) -> + {{value, FirstChild}, NewWorkingChildren} = queue:out(WorkingChildren), + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop(Items, [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + +parallel_map_loop([Item|Rest], [FreeChild|FreeChildren], WorkingChildren, Acc) -> + FreeChild ! {parallel_map_request, self(), Item}, + parallel_map_loop(Rest, FreeChildren, queue:in(FreeChild, WorkingChildren), Acc). + + +parallel_map(Fun, List, Parallel) -> + ParentPid = self(), + ChildPids = lists:map(fun(N) -> + spawn_link(fun () -> + parallel_map_worker(ParentPid, Fun, N) + end) + end, lists:seq(1, Parallel)), + case parallel_map_loop(List, ChildPids, queue:new(), []) of + crash -> + exit(crash); + Result -> + lists:foreach(fun (Child) -> + Child ! {parallel_map_stop, self()} + end, ChildPids), + lists:reverse(Result) + end. + +%%%%%%%%%%%%%%%%%%%% + +-include_lib("eunit/include/eunit.hrl"). + +fact(N) -> + fact(N, 1). + +fact(1, Acc) -> + Acc; +fact(N, Acc) -> + fact(N - 1, Acc * N). + +parallel_map_test() -> + Result1 = lists:map(fun fact/1, lists:seq(1, 2000)), + Result2 = parallel_map(fun fact/1, lists:seq(1, 2000), 10), + ?assertEqual(Result1, Result2). diff --git a/test/check.erl b/test/check.erl index f4fdb8d..fd726e5 100755 --- a/test/check.erl +++ b/test/check.erl @@ -9,5 +9,5 @@ main(_) -> ok = ht:test(), ok = ts:test(), - ok = tlv:test(). - + ok = tlv:test(), + ok = util:test(). -- cgit v1.1