summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2014-11-13 23:26:04 +0100
committerMagnus Ahltorp <map@kth.se>2014-11-13 23:26:04 +0100
commit89a6ea31f0646f2a3ccbbca521b038d7ef811da3 (patch)
tree37c87465c8177aca29047239f013d90bcc18f762 /src
parentfc248ddd84cd6a2317b896cad63c105ea6d47ca7 (diff)
Convert fsyncport to gen_server, losing parallel fsync.
Diffstat (limited to 'src')
-rw-r--r--src/fsyncport.erl155
1 files changed, 69 insertions, 86 deletions
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
index c9be44d..945eb97 100644
--- a/src/fsyncport.erl
+++ b/src/fsyncport.erl
@@ -2,112 +2,95 @@
%%% See LICENSE for licensing information.
-module(fsyncport).
--export([start_link/0, stop/0, init/1]).
+-behaviour(gen_server).
+-export([start_link/0, stop/0]).
-export([fsync/1, fsyncall/1]).
+%% gen_server callbacks.
+-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
+ code_change/3]).
start_link() ->
- Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
- {ok, Pid}.
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [code:priv_dir(plop) ++ "/fsynchelper"], [{debug, [trace]}]).
+
stop() ->
- fsyncport ! stop.
+ gen_server:call(?MODULE, stop).
fsync(Path) ->
- call_port({fsync, Path}).
+ gen_server:call(?MODULE, {fsync, Path}).
fsyncall(Paths) ->
- call_port_multi([{fsync, Path} || Path <- Paths]).
-
-call_port(Msg) ->
- fsyncport ! {call, self(), Msg},
- receive
- {fsyncport, Result} ->
- Result
- end.
+ lists:foreach(fun (Path) -> fsync(Path) end, Paths).
-call_port_multi(Msgs) ->
- lists:foreach(fun (Msg) ->
- fsyncport ! {call, self(), Msg}
- end, Msgs),
- lists:foldl(fun (_Msg, Acc) ->
- R = receive
- {fsyncport, Result} ->
- Result
- end,
- case R of
- ok ->
- Acc;
- Error ->
- Error
- end
- end, ok, Msgs).
+-record(state, {idleports, busyports, waiting}).
init(ExtPrg) ->
lager:debug("starting fsync service"),
- register(fsyncport, self()),
process_flag(trap_exit, true),
Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
[{packet, 2}]) end,
lists:seq(1, 32)),
lager:debug("fsync service started", []),
- loop(Ports).
+ {ok, #state{idleports = Ports, busyports = dict:new(), waiting = queue:new()}}.
-loop(Ports) ->
- loop(Ports, dict:new(), queue:new()).
-loop(IdlePorts, BusyPorts, Waiting) ->
+handle_call(stop, _From, State) ->
+ lager:debug("fsync stop request received"),
+ lists:foreach(fun (Port) ->
+ Port ! {self(), close}
+ end,
+ State#state.idleports),
+ lists:foreach(fun ({Port, {_Caller, _Starttime}}) ->
+ Port ! {self(), close}
+ end,
+ dict:to_list(State#state.busyports)),
receive
- {call, Caller, {fsync, Path}} ->
- lager:debug("fsync incoming request: ~p", [Path]),
- case IdlePorts of
- [] ->
- loop(IdlePorts,
- BusyPorts,
- queue:in({Caller, Path}, Waiting));
- [Port | Rest] ->
- lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
- Port ! {self(), {command, Path}},
- loop(Rest,
- dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
- Waiting)
- end;
+ {Port, closed} when is_port(Port) ->
+ exit(normal) %% XXX exits when first port is closed
+ end,
+ {stop, normal, stopped, State};
- {Port, {data, Data}} when is_port(Port) ->
- lager:debug("fsync request finished: ~p", [Port]),
- {Caller, Starttime} = dict:fetch(Port, BusyPorts),
- Stoptime = os:timestamp(),
- statreport({fsync, Stoptime, Starttime}),
- Caller ! {fsyncport, list_to_atom(Data)},
- case queue:out(Waiting) of
- {empty, _} ->
- loop([Port | IdlePorts],
- dict:erase(Port, BusyPorts),
- Waiting);
- {{value, {NewCaller, NewPath}}, NewWaiting} ->
- IdlePorts = [],
- Port ! {self(), {command, NewPath}},
- loop(IdlePorts,
- dict:store(Port, {NewCaller, os:timestamp()},
- BusyPorts),
- NewWaiting)
- end;
- stop ->
- lager:debug("fsync stop request received"),
- lists:foreach(fun (Port) ->
- Port ! {self(), close}
- end,
- IdlePorts),
- lists:foreach(fun ({Port, {_Caller, _Starttime}}) ->
- Port ! {self(), close}
- end,
- dict:to_list(BusyPorts)),
- receive
- {Port, closed} when is_port(Port) ->
- exit(normal) %% XXX exits when first port is closed
- end;
- {'EXIT', Port, _Reason} when is_port(Port) ->
- lager:debug("fsync port ~p exited, exiting", [Port]),
- %% XXX supervisor doesn't restart fsyncport, why?
- exit(port_terminated)
- end.
+handle_call({fsync, Path}, From, State) ->
+ lager:debug("fsync incoming request: ~p", [Path]),
+ {noreply,
+ case State#state.idleports of
+ [] ->
+ State#state{waiting = queue:in({From, Path}, State#state.waiting)};
+ [Port | Rest] ->
+ lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
+ Port ! {self(), {command, Path}},
+ State#state{idleports = Rest,
+ busyports = dict:store(Port, {From, os:timestamp()}, State#state.busyports)}
+ end}.
+
+handle_info({Port, {data, Data}}, State) when is_port(Port) ->
+ lager:debug("fsync request finished: ~p", [Port]),
+ {Caller, Starttime} = dict:fetch(Port, State#state.busyports),
+ Stoptime = os:timestamp(),
+ statreport({fsync, Stoptime, Starttime}),
+ gen_server:reply(Caller, list_to_atom(Data)),
+ {noreply,
+ case queue:out(State#state.waiting) of
+ {empty, _} ->
+ State#state{busyports = dict:erase(Port, State#state.busyports),
+ idleports = [Port | State#state.idleports]};
+ {{value, {NewCaller, NewPath}}, NewWaiting} ->
+ [] = State#state.idleports,
+ Port ! {self(), {command, NewPath}},
+ State#state{busyports = dict:store(Port, {NewCaller, os:timestamp()}, State#state.busyports), waiting = NewWaiting}
+ end};
+
+handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) ->
+ lager:debug("fsync port ~p exited, exiting", [Port]),
+ {stop, portexit, State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+terminate(Reason, _State) ->
+ lager:info("fsyncport terminating: ~p", [Reason]),
+ ok.
statreport(_Entry) ->
none.