summaryrefslogtreecommitdiff
path: root/src/fsyncport.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fsyncport.erl')
-rw-r--r--src/fsyncport.erl29
1 files changed, 28 insertions, 1 deletions
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
index 7e2bf11..c9be44d 100644
--- a/src/fsyncport.erl
+++ b/src/fsyncport.erl
@@ -3,7 +3,7 @@
-module(fsyncport).
-export([start_link/0, stop/0, init/1]).
--export([fsync/1]).
+-export([fsync/1, fsyncall/1]).
start_link() ->
Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
@@ -14,6 +14,9 @@ stop() ->
fsync(Path) ->
call_port({fsync, Path}).
+fsyncall(Paths) ->
+ call_port_multi([{fsync, Path} || Path <- Paths]).
+
call_port(Msg) ->
fsyncport ! {call, self(), Msg},
receive
@@ -21,12 +24,31 @@ call_port(Msg) ->
Result
end.
+call_port_multi(Msgs) ->
+ lists:foreach(fun (Msg) ->
+ fsyncport ! {call, self(), Msg}
+ end, Msgs),
+ lists:foldl(fun (_Msg, Acc) ->
+ R = receive
+ {fsyncport, Result} ->
+ Result
+ end,
+ case R of
+ ok ->
+ Acc;
+ Error ->
+ Error
+ end
+ end, ok, Msgs).
+
init(ExtPrg) ->
+ lager:debug("starting fsync service"),
register(fsyncport, self()),
process_flag(trap_exit, true),
Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
[{packet, 2}]) end,
lists:seq(1, 32)),
+ lager:debug("fsync service started", []),
loop(Ports).
loop(Ports) ->
@@ -34,12 +56,14 @@ loop(Ports) ->
loop(IdlePorts, BusyPorts, Waiting) ->
receive
{call, Caller, {fsync, Path}} ->
+ lager:debug("fsync incoming request: ~p", [Path]),
case IdlePorts of
[] ->
loop(IdlePorts,
BusyPorts,
queue:in({Caller, Path}, Waiting));
[Port | Rest] ->
+ lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
Port ! {self(), {command, Path}},
loop(Rest,
dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
@@ -47,6 +71,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
end;
{Port, {data, Data}} when is_port(Port) ->
+ lager:debug("fsync request finished: ~p", [Port]),
{Caller, Starttime} = dict:fetch(Port, BusyPorts),
Stoptime = os:timestamp(),
statreport({fsync, Stoptime, Starttime}),
@@ -65,6 +90,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
NewWaiting)
end;
stop ->
+ lager:debug("fsync stop request received"),
lists:foreach(fun (Port) ->
Port ! {self(), close}
end,
@@ -78,6 +104,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
exit(normal) %% XXX exits when first port is closed
end;
{'EXIT', Port, _Reason} when is_port(Port) ->
+ lager:debug("fsync port ~p exited, exiting", [Port]),
%% XXX supervisor doesn't restart fsyncport, why?
exit(port_terminated)
end.