diff options
| author | Magnus Ahltorp <map@kth.se> | 2014-11-14 16:21:08 +0100 |
|---|---|---|
| committer | Magnus Ahltorp <map@kth.se> | 2014-11-14 16:21:08 +0100 |
| commit | 66bbe95336fe2781ffe6a56e03ef9f7b1ff0399b (patch) | |
| tree | 65f3efe7c9bf74dcb5300f2b75db09598b80d346 /src | |
| parent | 124b79a8fed213c31db0360dd75c53fc415b5ab2 (diff) | |
Reimplement parallel fsync
Diffstat (limited to 'src')
| -rw-r--r-- | src/fsyncport.erl | 106 |
1 files changed, 79 insertions, 27 deletions
diff --git a/src/fsyncport.erl b/src/fsyncport.erl index 945eb97..b688f9c 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -10,18 +10,19 @@ code_change/3]). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [code:priv_dir(plop) ++ "/fsynchelper"], [{debug, [trace]}]). + gen_server:start_link({local, ?MODULE}, ?MODULE, + [code:priv_dir(plop) ++ "/fsynchelper"], []). stop() -> gen_server:call(?MODULE, stop). fsync(Path) -> - gen_server:call(?MODULE, {fsync, Path}). + gen_server:call(?MODULE, {fsync, [Path]}). fsyncall(Paths) -> - lists:foreach(fun (Path) -> fsync(Path) end, Paths). + gen_server:call(?MODULE, {fsync, Paths}). --record(state, {idleports, busyports, waiting}). +-record(state, {idleports, busyports, waiting, requests}). init(ExtPrg) -> lager:debug("starting fsync service"), @@ -30,7 +31,8 @@ init(ExtPrg) -> [{packet, 2}]) end, lists:seq(1, 32)), lager:debug("fsync service started", []), - {ok, #state{idleports = Ports, busyports = dict:new(), waiting = queue:new()}}. + {ok, #state{idleports = Ports, busyports = dict:new(), + waiting = queue:new(), requests = dict:new()}}. handle_call(stop, _From, State) -> lager:debug("fsync stop request received"), @@ -48,35 +50,85 @@ handle_call(stop, _From, State) -> end, {stop, normal, stopped, State}; -handle_call({fsync, Path}, From, State) -> - lager:debug("fsync incoming request: ~p", [Path]), +handle_call({fsync, Paths}, From, State) -> + lager:debug("fsync incoming request: ~p", [Paths]), + AddedState = + lists:foldl( + fun(Path, StateAcc) -> + add_subrequest(StateAcc, From, Path) + end, State, Paths), + NewState = dequeue(AddedState), {noreply, - case State#state.idleports of - [] -> - State#state{waiting = queue:in({From, Path}, State#state.waiting)}; - [Port | Rest] -> - lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), - Port ! {self(), {command, Path}}, - State#state{idleports = Rest, - busyports = dict:store(Port, {From, os:timestamp()}, State#state.busyports)} - end}. + NewState}. + +check_overload(State) -> + case queue:is_empty(State#state.waiting) of + true -> + none; + false -> + lager:info("~p fsync requests waiting", + [queue:len(State#state.waiting)]) + end. + +add_subrequest(State, From, Path) -> + State#state{ + waiting = queue:in({From, Path}, State#state.waiting), + requests = dict:update_counter(From, 1, State#state.requests) + }. + +dequeue(State) -> + case try_dequeue(State) of + {last, NewState} -> + check_overload(NewState), + NewState; + {continue, NewState} -> + dequeue(NewState) + end. + +try_dequeue(State) -> + case State#state.idleports of + [] -> + {last, State}; + [Port | Rest] -> + case queue:out(State#state.waiting) of + {empty, _} -> + {last, State}; + {{value, {Caller, Path}}, Waiting} -> + lager:debug("fsync port ~p assigned to request ~p", + [Port, Path]), + Port ! {self(), {command, Path}}, + {continue, + State#state{ + idleports = Rest, + busyports = dict:store(Port, {Caller, os:timestamp()}, + State#state.busyports), + waiting = Waiting + }} + end + end. + +subrequest_finished(State, Caller, Data) -> + NewRequests = case dict:fetch(Caller, State#state.requests) of + 1 -> + gen_server:reply(Caller, list_to_atom(Data)), + dict:erase(Caller, State#state.requests); + _ -> + dict:update_counter(Caller, -1, State#state.requests) + end, + State#state{requests = NewRequests}. handle_info({Port, {data, Data}}, State) when is_port(Port) -> lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, State#state.busyports), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), - gen_server:reply(Caller, list_to_atom(Data)), - {noreply, - case queue:out(State#state.waiting) of - {empty, _} -> - State#state{busyports = dict:erase(Port, State#state.busyports), - idleports = [Port | State#state.idleports]}; - {{value, {NewCaller, NewPath}}, NewWaiting} -> - [] = State#state.idleports, - Port ! {self(), {command, NewPath}}, - State#state{busyports = dict:store(Port, {NewCaller, os:timestamp()}, State#state.busyports), waiting = NewWaiting} - end}; + State2 = subrequest_finished(State, Caller, Data), + State3 = State2#state{ + busyports = dict:erase(Port, State2#state.busyports), + idleports = [Port | State2#state.idleports] + }, + State4 = dequeue(State3), + {noreply, State4}; handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) -> lager:debug("fsync port ~p exited, exiting", [Port]), |
