summaryrefslogtreecommitdiff
path: root/src/fsyncport.erl
blob: 8bc8c60dae5684608f66a0420a1af70fa33b0681 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
%%
%% Copyright (c) 2014 Kungliga Tekniska Högskolan
%% (KTH Royal Institute of Technology, Stockholm, Sweden).
%%

-module(fsyncport).
-export([start_link/0, stop/0, init/1]).
-export([fsync/1]).

start_link() ->
    Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
    {ok, Pid}.
stop() ->
    fsyncport ! stop.

fsync(Path) ->
    call_port({fsync, Path}).

call_port(Msg) ->
    fsyncport ! {call, self(), Msg},
    receive
	{fsyncport, Result} ->
	    Result
    end.

init(ExtPrg) ->
    register(fsyncport, self()),
    process_flag(trap_exit, true),
    Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
                                           [{packet, 2}]) end,
		      lists:seq(1, 32)),
    loop(Ports).

loop(Ports) ->
    loop(Ports, dict:new(), queue:new()).
loop(IdlePorts, BusyPorts, Waiting) ->
    receive
	{call, Caller, {fsync, Path}} ->
	    case IdlePorts of
	        [] ->
		    loop(IdlePorts,
                         BusyPorts,
                         queue:in({Caller, Path}, Waiting));
	        [Port | Rest] ->
		    Port ! {self(), {command, Path}},
		    loop(Rest,
                         dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
                         Waiting)
            end;

	{Port, {data, Data}} when is_port(Port) ->
	    {Caller, Starttime} = dict:fetch(Port, BusyPorts),
	    Stoptime = os:timestamp(),
            statreport({fsync, Stoptime, Starttime}),
	    Caller ! {fsyncport, list_to_atom(Data)},
	    case queue:out(Waiting) of
		{empty, _} ->
		    loop([Port | IdlePorts],
                         dict:erase(Port, BusyPorts),
                         Waiting);
		{{value, {NewCaller, NewPath}}, NewWaiting} ->
		    IdlePorts = [],
		    Port ! {self(), {command, NewPath}},
		    loop(IdlePorts,
                         dict:store(Port, {NewCaller, os:timestamp()},
                                    BusyPorts),
                         NewWaiting)
	    end;
	stop ->
	    lists:foreach(fun (Port) ->
                                  Port ! {self(), close}
                          end,
                          IdlePorts),
	    lists:foreach(fun ({Port, {_Caller, _Starttime}}) ->
                                  Port ! {self(), close}
                          end,
                          dict:to_list(BusyPorts)),
	    receive
		{Port, closed} when is_port(Port) ->
		    exit(normal) %% XXX exits when first port is closed
	    end;
	{'EXIT', Port, _Reason} when is_port(Port) ->
            %% XXX supervisor doesn't restart fsyncport, why?
	    exit(port_terminated)
    end.

statreport(_Entry) ->
    none.