From ed641d942ce265bd12913713794cd2221b312733 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 13 Nov 2014 23:26:04 +0100 Subject: Convert fsyncport to gen_server --- src/fsyncport.erl | 207 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 121 insertions(+), 86 deletions(-) diff --git a/src/fsyncport.erl b/src/fsyncport.erl index c9be44d..b688f9c 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -2,112 +2,147 @@ %%% See LICENSE for licensing information. -module(fsyncport). --export([start_link/0, stop/0, init/1]). +-behaviour(gen_server). +-export([start_link/0, stop/0]). -export([fsync/1, fsyncall/1]). +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). start_link() -> - Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), - {ok, Pid}. + gen_server:start_link({local, ?MODULE}, ?MODULE, + [code:priv_dir(plop) ++ "/fsynchelper"], []). + stop() -> - fsyncport ! stop. + gen_server:call(?MODULE, stop). fsync(Path) -> - call_port({fsync, Path}). + gen_server:call(?MODULE, {fsync, [Path]}). fsyncall(Paths) -> - call_port_multi([{fsync, Path} || Path <- Paths]). + gen_server:call(?MODULE, {fsync, Paths}). -call_port(Msg) -> - fsyncport ! {call, self(), Msg}, - receive - {fsyncport, Result} -> - Result - end. - -call_port_multi(Msgs) -> - lists:foreach(fun (Msg) -> - fsyncport ! {call, self(), Msg} - end, Msgs), - lists:foldl(fun (_Msg, Acc) -> - R = receive - {fsyncport, Result} -> - Result - end, - case R of - ok -> - Acc; - Error -> - Error - end - end, ok, Msgs). +-record(state, {idleports, busyports, waiting, requests}). init(ExtPrg) -> lager:debug("starting fsync service"), - register(fsyncport, self()), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), lager:debug("fsync service started", []), - loop(Ports). + {ok, #state{idleports = Ports, busyports = dict:new(), + waiting = queue:new(), requests = dict:new()}}. -loop(Ports) -> - loop(Ports, dict:new(), queue:new()). -loop(IdlePorts, BusyPorts, Waiting) -> - receive - {call, Caller, {fsync, Path}} -> - lager:debug("fsync incoming request: ~p", [Path]), - case IdlePorts of - [] -> - loop(IdlePorts, - BusyPorts, - queue:in({Caller, Path}, Waiting)); - [Port | Rest] -> - lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), - Port ! {self(), {command, Path}}, - loop(Rest, - dict:store(Port, {Caller, os:timestamp()}, BusyPorts), - Waiting) - end; - - {Port, {data, Data}} when is_port(Port) -> - lager:debug("fsync request finished: ~p", [Port]), - {Caller, Starttime} = dict:fetch(Port, BusyPorts), - Stoptime = os:timestamp(), - statreport({fsync, Stoptime, Starttime}), - Caller ! {fsyncport, list_to_atom(Data)}, - case queue:out(Waiting) of - {empty, _} -> - loop([Port | IdlePorts], - dict:erase(Port, BusyPorts), - Waiting); - {{value, {NewCaller, NewPath}}, NewWaiting} -> - IdlePorts = [], - Port ! {self(), {command, NewPath}}, - loop(IdlePorts, - dict:store(Port, {NewCaller, os:timestamp()}, - BusyPorts), - NewWaiting) - end; - stop -> - lager:debug("fsync stop request received"), - lists:foreach(fun (Port) -> +handle_call(stop, _From, State) -> + lager:debug("fsync stop request received"), + lists:foreach(fun (Port) -> Port ! {self(), close} - end, - IdlePorts), - lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> - Port ! {self(), close} - end, - dict:to_list(BusyPorts)), - receive - {Port, closed} when is_port(Port) -> - exit(normal) %% XXX exits when first port is closed - end; - {'EXIT', Port, _Reason} when is_port(Port) -> - lager:debug("fsync port ~p exited, exiting", [Port]), - %% XXX supervisor doesn't restart fsyncport, why? - exit(port_terminated) + end, + State#state.idleports), + lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> + Port ! {self(), close} + end, + dict:to_list(State#state.busyports)), + receive + {Port, closed} when is_port(Port) -> + exit(normal) %% XXX exits when first port is closed + end, + {stop, normal, stopped, State}; + +handle_call({fsync, Paths}, From, State) -> + lager:debug("fsync incoming request: ~p", [Paths]), + AddedState = + lists:foldl( + fun(Path, StateAcc) -> + add_subrequest(StateAcc, From, Path) + end, State, Paths), + NewState = dequeue(AddedState), + {noreply, + NewState}. + +check_overload(State) -> + case queue:is_empty(State#state.waiting) of + true -> + none; + false -> + lager:info("~p fsync requests waiting", + [queue:len(State#state.waiting)]) + end. + +add_subrequest(State, From, Path) -> + State#state{ + waiting = queue:in({From, Path}, State#state.waiting), + requests = dict:update_counter(From, 1, State#state.requests) + }. + +dequeue(State) -> + case try_dequeue(State) of + {last, NewState} -> + check_overload(NewState), + NewState; + {continue, NewState} -> + dequeue(NewState) end. +try_dequeue(State) -> + case State#state.idleports of + [] -> + {last, State}; + [Port | Rest] -> + case queue:out(State#state.waiting) of + {empty, _} -> + {last, State}; + {{value, {Caller, Path}}, Waiting} -> + lager:debug("fsync port ~p assigned to request ~p", + [Port, Path]), + Port ! {self(), {command, Path}}, + {continue, + State#state{ + idleports = Rest, + busyports = dict:store(Port, {Caller, os:timestamp()}, + State#state.busyports), + waiting = Waiting + }} + end + end. + +subrequest_finished(State, Caller, Data) -> + NewRequests = case dict:fetch(Caller, State#state.requests) of + 1 -> + gen_server:reply(Caller, list_to_atom(Data)), + dict:erase(Caller, State#state.requests); + _ -> + dict:update_counter(Caller, -1, State#state.requests) + end, + State#state{requests = NewRequests}. + +handle_info({Port, {data, Data}}, State) when is_port(Port) -> + lager:debug("fsync request finished: ~p", [Port]), + {Caller, Starttime} = dict:fetch(Port, State#state.busyports), + Stoptime = os:timestamp(), + statreport({fsync, Stoptime, Starttime}), + State2 = subrequest_finished(State, Caller, Data), + State3 = State2#state{ + busyports = dict:erase(Port, State2#state.busyports), + idleports = [Port | State2#state.idleports] + }, + State4 = dequeue(State3), + {noreply, State4}; + +handle_info({'EXIT', Port, _Reason}, State) when is_port(Port) -> + lager:debug("fsync port ~p exited, exiting", [Port]), + {stop, portexit, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +terminate(Reason, _State) -> + lager:info("fsyncport terminating: ~p", [Reason]), + ok. + statreport(_Entry) -> none. -- cgit v1.1