%%% Copyright (c) 2014, NORDUnet A/S. %%% See LICENSE for licensing information. -module(fsyncport). -export([start_link/0, stop/0, init/1]). -export([fsync/1]). start_link() -> Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), {ok, Pid}. stop() -> fsyncport ! stop. fsync(Path) -> call_port({fsync, Path}). call_port(Msg) -> fsyncport ! {call, self(), Msg}, receive {fsyncport, Result} -> Result end. init(ExtPrg) -> lager:debug("starting fsync service"), register(fsyncport, self()), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), lager:debug("fsync service started", []), loop(Ports). loop(Ports) -> loop(Ports, dict:new(), queue:new()). loop(IdlePorts, BusyPorts, Waiting) -> receive {call, Caller, {fsync, Path}} -> lager:debug("fsync incoming request: ~p", [Path]), case IdlePorts of [] -> loop(IdlePorts, BusyPorts, queue:in({Caller, Path}, Waiting)); [Port | Rest] -> lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), Port ! {self(), {command, Path}}, loop(Rest, dict:store(Port, {Caller, os:timestamp()}, BusyPorts), Waiting) end; {Port, {data, Data}} when is_port(Port) -> lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, BusyPorts), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), Caller ! {fsyncport, list_to_atom(Data)}, case queue:out(Waiting) of {empty, _} -> loop([Port | IdlePorts], dict:erase(Port, BusyPorts), Waiting); {{value, {NewCaller, NewPath}}, NewWaiting} -> IdlePorts = [], Port ! {self(), {command, NewPath}}, loop(IdlePorts, dict:store(Port, {NewCaller, os:timestamp()}, BusyPorts), NewWaiting) end; stop -> lager:debug("fsync stop request received"), lists:foreach(fun (Port) -> Port ! {self(), close} end, IdlePorts), lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> Port ! {self(), close} end, dict:to_list(BusyPorts)), receive {Port, closed} when is_port(Port) -> exit(normal) %% XXX exits when first port is closed end; {'EXIT', Port, _Reason} when is_port(Port) -> lager:debug("fsync port ~p exited, exiting", [Port]), %% XXX supervisor doesn't restart fsyncport, why? exit(port_terminated) end. statreport(_Entry) -> none.