summaryrefslogtreecommitdiff
path: root/src/fsyncport.erl
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2014-09-25 01:35:33 +0200
committerMagnus Ahltorp <map@kth.se>2014-09-25 01:35:33 +0200
commitae9f673d35ec0140a7276297cee002bfd3a15852 (patch)
treeea33f6ab2fdc35fe359f0067763db756885e7ac9 /src/fsyncport.erl
parent0b253574667acde453a50a2b61cf46d6f7a6c86e (diff)
Permanent storage implementation
Diffstat (limited to 'src/fsyncport.erl')
-rw-r--r--src/fsyncport.erl88
1 files changed, 88 insertions, 0 deletions
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
new file mode 100644
index 0000000..8bc8c60
--- /dev/null
+++ b/src/fsyncport.erl
@@ -0,0 +1,88 @@
+%%
+%% Copyright (c) 2014 Kungliga Tekniska Högskolan
+%% (KTH Royal Institute of Technology, Stockholm, Sweden).
+%%
+
+-module(fsyncport).
+-export([start_link/0, stop/0, init/1]).
+-export([fsync/1]).
+
+start_link() ->
+ Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
+ {ok, Pid}.
+stop() ->
+ fsyncport ! stop.
+
+fsync(Path) ->
+ call_port({fsync, Path}).
+
+call_port(Msg) ->
+ fsyncport ! {call, self(), Msg},
+ receive
+ {fsyncport, Result} ->
+ Result
+ end.
+
+init(ExtPrg) ->
+ register(fsyncport, self()),
+ process_flag(trap_exit, true),
+ Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
+ [{packet, 2}]) end,
+ lists:seq(1, 32)),
+ loop(Ports).
+
+loop(Ports) ->
+ loop(Ports, dict:new(), queue:new()).
+loop(IdlePorts, BusyPorts, Waiting) ->
+ receive
+ {call, Caller, {fsync, Path}} ->
+ case IdlePorts of
+ [] ->
+ loop(IdlePorts,
+ BusyPorts,
+ queue:in({Caller, Path}, Waiting));
+ [Port | Rest] ->
+ Port ! {self(), {command, Path}},
+ loop(Rest,
+ dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
+ Waiting)
+ end;
+
+ {Port, {data, Data}} when is_port(Port) ->
+ {Caller, Starttime} = dict:fetch(Port, BusyPorts),
+ Stoptime = os:timestamp(),
+ statreport({fsync, Stoptime, Starttime}),
+ Caller ! {fsyncport, list_to_atom(Data)},
+ case queue:out(Waiting) of
+ {empty, _} ->
+ loop([Port | IdlePorts],
+ dict:erase(Port, BusyPorts),
+ Waiting);
+ {{value, {NewCaller, NewPath}}, NewWaiting} ->
+ IdlePorts = [],
+ Port ! {self(), {command, NewPath}},
+ loop(IdlePorts,
+ dict:store(Port, {NewCaller, os:timestamp()},
+ BusyPorts),
+ NewWaiting)
+ end;
+ stop ->
+ lists:foreach(fun (Port) ->
+ Port ! {self(), close}
+ end,
+ IdlePorts),
+ lists:foreach(fun ({Port, {_Caller, _Starttime}}) ->
+ Port ! {self(), close}
+ end,
+ dict:to_list(BusyPorts)),
+ receive
+ {Port, closed} when is_port(Port) ->
+ exit(normal) %% XXX exits when first port is closed
+ end;
+ {'EXIT', Port, _Reason} when is_port(Port) ->
+ %% XXX supervisor doesn't restart fsyncport, why?
+ exit(port_terminated)
+ end.
+
+statreport(_Entry) ->
+ none.