summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2014-11-14 16:21:08 +0100
committerMagnus Ahltorp <map@kth.se>2014-11-14 16:21:08 +0100
commit66bbe95336fe2781ffe6a56e03ef9f7b1ff0399b (patch)
tree65f3efe7c9bf74dcb5300f2b75db09598b80d346 /src
parent124b79a8fed213c31db0360dd75c53fc415b5ab2 (diff)
Reimplement parallel fsync
Diffstat (limited to 'src')
-rw-r--r--src/fsyncport.erl106
1 files changed, 79 insertions, 27 deletions
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
index 945eb97..b688f9c 100644
--- a/src/fsyncport.erl
+++ b/src/fsyncport.erl
@@ -10,18 +10,19 @@
code_change/3]).
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [code:priv_dir(plop) ++ "/fsynchelper"], [{debug, [trace]}]).
+ gen_server:start_link({local, ?MODULE}, ?MODULE,
+ [code:priv_dir(plop) ++ "/fsynchelper"], []).
stop() ->
gen_server:call(?MODULE, stop).
fsync(Path) ->
- gen_server:call(?MODULE, {fsync, Path}).
+ gen_server:call(?MODULE, {fsync, [Path]}).
fsyncall(Paths) ->
- lists:foreach(fun (Path) -> fsync(Path) end, Paths).
+ gen_server:call(?MODULE, {fsync, Paths}).
--record(state, {idleports, busyports, waiting}).
+-record(state, {idleports, busyports, waiting, requests}).
init(ExtPrg) ->
lager:debug("starting fsync service"),
@@ -30,7 +31,8 @@ init(ExtPrg) ->
[{packet, 2}]) end,
lists:seq(1, 32)),
lager:debug("fsync service started", []),
- {ok, #state{idleports = Ports, busyports = dict:new(), waiting = queue:new()}}.
+ {ok, #state{idleports = Ports, busyports = dict:new(),
+ waiting = queue:new(), requests = dict:new()}}.
handle_call(stop, _From, State) ->
lager:debug("fsync stop request received"),
@@ -48,35 +50,85 @@ handle_call(stop, _From, State) ->
end,
{stop, normal, stopped, State};
-handle_call({fsync, Path}, From, State) ->
- lager:debug("fsync incoming request: ~p", [Path]),
+handle_call({fsync, Paths}, From, State) ->
+ lager:debug("fsync incoming request: ~p", [Paths]),
+ AddedState =
+ lists:foldl(
+ fun(Path, StateAcc) ->
+ add_subrequest(StateAcc, From, Path)
+ end, State, Paths),
+ NewState = dequeue(AddedState),
{noreply,
- case State#state.idleports of
- [] ->
- State#state{waiting = queue:in({From, Path}, State#state.waiting)};
- [Port | Rest] ->
- lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
- Port ! {self(), {command, Path}},
- State#state{idleports = Rest,
- busyports = dict:store(Port, {From, os:timestamp()}, State#state.busyports)}
- end}.
+ NewState}.
+
+check_overload(State) ->
+ case queue:is_empty(State#state.waiting) of
+ true ->
+ none;
+ false ->
+ lager:info("~p fsync requests waiting",
+ [queue:len(State#state.waiting)])
+ end.
+
+add_subrequest(State, From, Path) ->
+ State#state{
+ waiting = queue:in({From, Path}, State#state.waiting),
+ requests = dict:update_counter(From, 1, State#state.requests)
+ }.
+
+dequeue(State) ->
+ case try_dequeue(State) of
+ {last, NewState} ->
+ check_overload(NewState),
+ NewState;
+ {continue, NewState} ->
+ dequeue(NewState)
+ end.
+
+try_dequeue(State) ->
+ case State#state.idleports of
+ [] ->
+ {last, State};
+ [Port | Rest] ->
+ case queue:out(State#state.waiting) of
+ {empty, _} ->
+ {last, State};
+ {{value, {Caller, Path}}, Waiting} ->
+ lager:debug("fsync port ~p assigned to request ~p",
+ [Port, Path]),
+ Port ! {self(), {command, Path}},
+ {continue,
+ State#state{
+ idleports = Rest,
+ busyports = dict:store(Port, {Caller, os:timestamp()},
+ State#state.busyports),
+ waiting = Waiting
+ }}
+ end
+ end.
+
+subrequest_finished(State, Caller, Data) ->
+ NewRequests = case dict:fetch(Caller, State#state.requests) of
+ 1 ->
+ gen_server:reply(Caller, list_to_atom(Data)),
+ dict:erase(Caller, State#state.requests);
+ _ ->
+ dict:update_counter(Caller, -1, State#state.requests)
+ end,
+ State#state{requests = NewRequests}.
handle_info({Port, {data, Data}}, State) when is_port(Port) ->
lager:debug("fsync request finished: ~p", [Port]),
{Caller, Starttime} = dict:fetch(Port, State#state.busyports),
Stoptime = os:timestamp(),
statreport({fsync, Stoptime, Starttime}),
- gen_server:reply(Caller, list_to_atom(Data)),
- {noreply,
- case queue:out(State#state.waiting) of
- {empty, _} ->
- State#state{busyports = dict:erase(Port, State#state.busyports),
- idleports = [Port | State#state.idleports]};
- {{value, {NewCaller, NewPath}}, NewWaiting} ->
- [] = State#state.idleports,
- Port ! {self(), {command, NewPath}},
- State#state{busyports = dict:store(Port, {NewCaller, os:timestamp()}, State#state.busyports), waiting = NewWaiting}
- end};
+ State2 = subrequest_finished(State, Caller, Data),
+ State3 = State2#state{
+ busyports = dict:erase(Port, State2#state.busyports),
+ idleports = [Port | State2#state.idleports]
+ },
+ State4 = dequeue(State3),
+ {noreply, State4};
handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) ->
lager:debug("fsync port ~p exited, exiting", [Port]),