diff options
Diffstat (limited to 'src/util.erl')
-rw-r--r-- | src/util.erl | 84 |
1 files changed, 83 insertions, 1 deletions
diff --git a/src/util.erl b/src/util.erl index c3b30db..af78b93 100644 --- a/src/util.erl +++ b/src/util.erl @@ -4,7 +4,7 @@ -module(util). -export([tempfilename/1, fsync/1, fsync/2, exit_with_error/3, check_error/3, write_tempfile_and_rename/3, - spawn_and_wait/1]). + spawn_and_wait/1, parallel_map/3]). -spec tempfilename(string()) -> string(). tempfilename(Base) -> @@ -73,3 +73,85 @@ spawn_and_wait(Fun) -> {result, ChildPid, Result} -> Result end. + + +parallel_map_worker_loop(ParentPid, Fun, N) -> + receive + {parallel_map_request, ParentPid, Input} -> + Result = Fun(Input), + ParentPid ! {parallel_map_result, self(), Result}, + parallel_map_worker_loop(ParentPid, Fun, N); + {parallel_map_stop, ParentPid} -> + ok + end. + +parallel_map_worker(ParentPid, Fun, N) -> + try + parallel_map_worker_loop(ParentPid, Fun, N) + catch + Type:What -> + [CrashFunction | Stack] = erlang:get_stacktrace(), + lager:error("Crashed process: ~p ~p~n ~p~n ~p~n", [Type, What, CrashFunction, Stack]), + ParentPid ! {parallel_map_crash, self()} + end. + +parallel_map_loop([], _FreeChildren, WorkingChildren, Acc) -> + case queue:out(WorkingChildren) of + {{value, FirstChild}, NewWorkingChildren} -> + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop([], [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + {empty, _} -> + Acc + end; + +parallel_map_loop(Items, [], WorkingChildren, Acc) -> + {{value, FirstChild}, NewWorkingChildren} = queue:out(WorkingChildren), + receive + {parallel_map_result, FirstChild, Result} -> + parallel_map_loop(Items, [FirstChild], NewWorkingChildren, [Result | Acc]); + {parallel_map_crash, FirstChild} -> + crash + end; + +parallel_map_loop([Item|Rest], [FreeChild|FreeChildren], WorkingChildren, Acc) -> + FreeChild ! {parallel_map_request, self(), Item}, + parallel_map_loop(Rest, FreeChildren, queue:in(FreeChild, WorkingChildren), Acc). + + +parallel_map(Fun, List, Parallel) -> + ParentPid = self(), + ChildPids = lists:map(fun(N) -> + spawn_link(fun () -> + parallel_map_worker(ParentPid, Fun, N) + end) + end, lists:seq(1, Parallel)), + case parallel_map_loop(List, ChildPids, queue:new(), []) of + crash -> + exit(crash); + Result -> + lists:foreach(fun (Child) -> + Child ! {parallel_map_stop, self()} + end, ChildPids), + lists:reverse(Result) + end. + +%%%%%%%%%%%%%%%%%%%% + +-include_lib("eunit/include/eunit.hrl"). + +fact(N) -> + fact(N, 1). + +fact(1, Acc) -> + Acc; +fact(N, Acc) -> + fact(N - 1, Acc * N). + +parallel_map_test() -> + Result1 = lists:map(fun fact/1, lists:seq(1, 2000)), + Result2 = parallel_map(fun fact/1, lists:seq(1, 2000), 10), + ?assertEqual(Result1, Result2). |