summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Emakefile3
-rw-r--r--Makefile2
-rw-r--r--c_src/fsynchelper.c2
-rw-r--r--src/atomic.erl2
-rw-r--r--src/db.erl85
-rw-r--r--src/frontend.erl130
-rw-r--r--src/fsyncport.erl29
-rw-r--r--src/ht.erl25
-rw-r--r--src/index.erl80
-rw-r--r--src/perm.erl13
-rw-r--r--src/plop.erl159
-rw-r--r--src/plop_app.erl4
-rw-r--r--src/stacktrace.erl18
-rw-r--r--src/storage.erl63
-rw-r--r--src/util.erl16
15 files changed, 539 insertions, 92 deletions
diff --git a/Emakefile b/Emakefile
index a42a6ee..f6cea09 100644
--- a/Emakefile
+++ b/Emakefile
@@ -2,4 +2,5 @@
{["src/*", "test/*"],
[debug_info,
{i, "include/"},
- {outdir, "ebin/"}]}.
+ {outdir, "ebin/"},
+ {parse_transform, lager_transform}]}.
diff --git a/Makefile b/Makefile
index b4bb715..a905c78 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,7 @@ build all:
(cd c_src && make all)
mkdir -p priv
cp c_src/fsynchelper priv/
- erl -make
+ erl -pa ../lager/ebin -make
clean:
(cd c_src && make clean)
-rm priv/fsynchelper
diff --git a/c_src/fsynchelper.c b/c_src/fsynchelper.c
index 117f5e9..6ffa80a 100644
--- a/c_src/fsynchelper.c
+++ b/c_src/fsynchelper.c
@@ -27,7 +27,7 @@ dosync(int fd)
int
main()
{
- char buf[100];
+ char buf[1000];
ssize_t len;
/* XXX: exits when command size is 0 */
diff --git a/src/atomic.erl b/src/atomic.erl
index 5ad48ba..36fba81 100644
--- a/src/atomic.erl
+++ b/src/atomic.erl
@@ -10,7 +10,7 @@ replacefile(Path, Content) ->
util:write_tempfile_and_rename(Path, TempName, Content),
util:fsync([Path, filename:dirname(Path)]).
--spec readfile(string()) -> binary().
+-spec readfile(string()) -> binary() | noentry.
readfile(Path) ->
case file:read_file(Path) of
{ok, Contents} ->
diff --git a/src/db.erl b/src/db.erl
index a0855b9..943c70e 100644
--- a/src/db.erl
+++ b/src/db.erl
@@ -6,18 +6,24 @@
%% API.
-export([start_link/0, stop/0]).
--export([add/4, size/0]).
--export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1]).
+-export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]).
+-export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]).
+-export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]).
+-export([leafhash_for_indices/2, indexsize/0]).
%% gen_server callbacks.
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
+-import(stacktrace, [call/2]).
-include_lib("stdlib/include/qlc.hrl").
-include("db.hrl").
size() ->
binary_to_integer(atomic:readfile(treesize_path())).
+indexsize() ->
+ index:indexsize(index_path()).
+
init(_Args) ->
{ok, []}.
@@ -25,34 +31,50 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
- gen_server:call(?MODULE, stop).
+ call(?MODULE, stop).
%%%%%%%%%%%%%%%%%%%%
%% Public API.
-spec add(binary(), binary(), binary(), non_neg_integer()) -> ok.
add(LeafHash, EntryHash, Data, Index) ->
- gen_server:call(?MODULE, {add, {LeafHash, EntryHash, Data, Index}}).
+ call(?MODULE, {add, {LeafHash, EntryHash, Data, Index}}).
+
+-spec add(binary(), binary()) -> ok.
+add(LeafHash, Data) ->
+ call(?MODULE, {add, {LeafHash, Data}}).
+
+-spec add_entryhash(binary(), binary()) -> ok.
+add_entryhash(LeafHash, EntryHash) ->
+ call(?MODULE, {add_entryhash, {LeafHash, EntryHash}}).
+
+-spec add_index(binary(), non_neg_integer()) -> ok.
+add_index(LeafHash, Index) ->
+ call(?MODULE, {add_index, {LeafHash, Index}}).
+
+-spec set_treesize(non_neg_integer()) -> ok.
+set_treesize(Size) ->
+ call(?MODULE, {set_treesize, Size}).
-spec get_by_indices(integer(), integer(), {sorted, true|false}) ->
[{non_neg_integer(), binary(), binary()}].
get_by_indices(Start, End, {sorted, Sorted}) ->
- gen_server:call(?MODULE, {get_by_indices, {Start, End, Sorted}}).
+ call(?MODULE, {get_by_indices, {Start, End, Sorted}}).
-spec get_by_index(binary()) -> notfound |
{non_neg_integer(), binary(), binary()}.
get_by_index(Index) ->
- gen_server:call(?MODULE, {get_by_index, Index}).
+ call(?MODULE, {get_by_index, Index}).
-spec get_by_leaf_hash(binary()) -> notfound |
{non_neg_integer(), binary(), binary()}.
get_by_leaf_hash(LeafHash) ->
- gen_server:call(?MODULE, {get_by_leaf_hash, LeafHash}).
+ call(?MODULE, {get_by_leaf_hash, LeafHash}).
-spec get_by_entry_hash(binary()) -> notfound |
{non_neg_integer(), binary(), binary()}.
get_by_entry_hash(EntryHash) ->
- gen_server:call(?MODULE, {get_by_entry_hash, EntryHash}).
+ call(?MODULE, {get_by_entry_hash, EntryHash}).
%%%%%%%%%%%%%%%%%%%%
%% gen_server callbacks.
@@ -103,7 +125,12 @@ entry_for_leafhash(LeafHash) ->
perm:readfile(entry_root_path(), LeafHash).
index_for_leafhash(LeafHash) ->
- binary_to_integer(perm:readfile(indexforhash_root_path(), LeafHash)).
+ case perm:readfile(indexforhash_root_path(), LeafHash) of
+ noentry ->
+ noentry;
+ Index ->
+ binary_to_integer(Index)
+ end.
leafhash_for_index(Index) ->
index:get(index_path(), Index).
@@ -137,7 +164,27 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) ->
ok = perm:ensurefile(indexforhash_root_path(),
LeafHash, integer_to_binary(Index)),
ok = index:add(index_path(), Index, LeafHash),
- ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)),
+ ok = atomic:replacefile(treesize_path(), integer_to_binary(Index+1)),
+ {reply, ok, State};
+
+handle_call({add, {LeafHash, Data}}, _From, State) ->
+ lager:debug("add leafhash ~p", [LeafHash]),
+ ok = perm:ensurefile(entry_root_path(), LeafHash, Data),
+ lager:debug("leafhash ~p added", [LeafHash]),
+ {reply, ok, State};
+
+handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) ->
+ ok = perm:ensurefile(entryhash_root_path(), EntryHash, LeafHash),
+ {reply, ok, State};
+
+handle_call({add_index, {LeafHash, Index}}, _From, State) ->
+ ok = perm:ensurefile(indexforhash_root_path(),
+ LeafHash, integer_to_binary(Index)),
+ ok = index:add(index_path(), Index, LeafHash),
+ {reply, ok, State};
+
+handle_call({set_treesize, Size}, _From, State) ->
+ ok = atomic:replacefile(treesize_path(), integer_to_binary(Size)),
{reply, ok, State};
handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) ->
@@ -150,9 +197,17 @@ handle_call({get_by_index, Index}, _From, State) ->
{reply, R, State};
handle_call({get_by_leaf_hash, LeafHash}, _From, State) ->
- Entry = entry_for_leafhash(LeafHash),
- Index = index_for_leafhash(LeafHash),
- R = {Index, LeafHash, Entry},
+ R = case entry_for_leafhash(LeafHash) of
+ noentry ->
+ notfound;
+ Entry ->
+ case index_for_leafhash(LeafHash) of
+ noentry ->
+ notfound;
+ Index ->
+ {Index, LeafHash, Entry}
+ end
+ end,
{reply, R, State};
handle_call({get_by_entry_hash, EntryHash}, _From, State) ->
@@ -161,7 +216,7 @@ handle_call({get_by_entry_hash, EntryHash}, _From, State) ->
notfound;
LeafHash ->
Entry = entry_for_leafhash(LeafHash),
- Index = index_for_leafhash(LeafHash),
- {Index, LeafHash, Entry}
+ %% Don't fetch index, isn't used and might not exist
+ {notfetched, LeafHash, Entry}
end,
{reply, R, State}.
diff --git a/src/frontend.erl b/src/frontend.erl
new file mode 100644
index 0000000..9c69517
--- /dev/null
+++ b/src/frontend.erl
@@ -0,0 +1,130 @@
+%%% Copyright (c) 2014, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+%%% @doc Frontend node API
+
+-module(frontend).
+%% API (URL)
+-export([request/3]).
+
+request(post, "ct/frontend/sendentry", Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("sendentry: bad input:", E);
+ {struct, PropList} ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+
+ ok = db:add(TreeLeafHash, LogEntry),
+ success({[{result, <<"ok">>}]})
+ end;
+
+request(post, "ct/frontend/sendlog", Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("sendentry: bad input:", E);
+ {struct, PropList} ->
+ Start = proplists:get_value(<<"start">>, PropList),
+ Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)),
+
+ Indices = lists:seq(Start, Start + length(Hashes) - 1),
+ lists:foreach(fun ({Hash, Index}) ->
+ ok = db:add_index(Hash, Index)
+ end, lists:zip(Hashes, Indices)),
+ success({[{result, <<"ok">>}]})
+ end;
+
+request(post, "ct/frontend/sendsth", Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("sendentry: bad input:", E);
+ {struct, PropList} ->
+ OldSize = db:size(),
+ Treesize = proplists:get_value(<<"tree_size">>, PropList),
+ Indexsize = db:indexsize(),
+
+ if
+ Treesize < OldSize ->
+ html("Size is older than current size", OldSize);
+ Treesize == OldSize ->
+ success({[{result, <<"ok">>}]});
+ Treesize > Indexsize ->
+ html("Has too few entries", Indexsize);
+ true ->
+ NewEntries = db:leafhash_for_indices(OldSize, Treesize - 1),
+ lager:debug("old size: ~p new size: ~p entries: ~p",
+ [OldSize, Treesize, NewEntries]),
+
+ Errors = check_entries(NewEntries, OldSize, Treesize - 1),
+
+ case Errors of
+ [] ->
+ ok = db:set_treesize(Treesize),
+ ht:reset_tree([db:size() - 1]),
+ success({[{result, <<"ok">>}]});
+ _ ->
+ html("Database not complete", Errors)
+ end
+ end
+ end;
+
+request(get, "ct/frontend/currentposition", _Query) ->
+ Size = db:size(),
+ success({[{result, <<"ok">>},
+ {position, Size}]});
+
+request(get, "ct/frontend/missingentries", _Query) ->
+ Size = db:size(),
+ Missing = fetchmissingentries(Size),
+ lager:debug("missingentries: ~p", [Missing]),
+ success({[{result, <<"ok">>},
+ {entries, lists:map(fun (Entry) -> base64:encode(Entry) end,
+ Missing)}]}).
+check_entries(Entries, Start, End) ->
+ lists:foldl(fun ({Hash, Index}, Acc) ->
+ case check_entry(Hash, Index) of
+ ok ->
+ Acc;
+ Error ->
+ [Error | Acc]
+ end
+ end, [], lists:zip(Entries, lists:seq(Start, End))).
+
+check_entry(Hash, Index) ->
+ case db:get_by_leaf_hash(Hash) of
+ notfound ->
+ {notfound, Index};
+ _ ->
+ ok
+ end.
+
+fetchmissingentries(Index) ->
+ lists:reverse(fetchmissingentries(Index, [])).
+
+fetchmissingentries(Index, Acc) ->
+ lager:debug("index ~p", [Index]),
+ case db:leafhash_for_index(Index) of
+ noentry ->
+ Acc;
+ Hash ->
+ case db:entry_for_leafhash(Hash) of
+ noentry ->
+ lager:debug("didn't find hash ~p", [Hash]),
+ fetchmissingentries(Index + 1, [Hash | Acc]);
+ _ ->
+ fetchmissingentries(Index + 1, Acc)
+ end
+ end.
+
+
+%% Private functions.
+html(Text, Input) ->
+ {400, [{"Content-Type", "text/html"}],
+ io_lib:format(
+ "<html><body><p>~n" ++
+ "~s~n" ++
+ "~p~n" ++
+ "</body></html>~n", [Text, Input])}.
+
+success(Data) ->
+ {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}.
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
index 7e2bf11..c9be44d 100644
--- a/src/fsyncport.erl
+++ b/src/fsyncport.erl
@@ -3,7 +3,7 @@
-module(fsyncport).
-export([start_link/0, stop/0, init/1]).
--export([fsync/1]).
+-export([fsync/1, fsyncall/1]).
start_link() ->
Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
@@ -14,6 +14,9 @@ stop() ->
fsync(Path) ->
call_port({fsync, Path}).
+fsyncall(Paths) ->
+ call_port_multi([{fsync, Path} || Path <- Paths]).
+
call_port(Msg) ->
fsyncport ! {call, self(), Msg},
receive
@@ -21,12 +24,31 @@ call_port(Msg) ->
Result
end.
+call_port_multi(Msgs) ->
+ lists:foreach(fun (Msg) ->
+ fsyncport ! {call, self(), Msg}
+ end, Msgs),
+ lists:foldl(fun (_Msg, Acc) ->
+ R = receive
+ {fsyncport, Result} ->
+ Result
+ end,
+ case R of
+ ok ->
+ Acc;
+ Error ->
+ Error
+ end
+ end, ok, Msgs).
+
init(ExtPrg) ->
+ lager:debug("starting fsync service"),
register(fsyncport, self()),
process_flag(trap_exit, true),
Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
[{packet, 2}]) end,
lists:seq(1, 32)),
+ lager:debug("fsync service started", []),
loop(Ports).
loop(Ports) ->
@@ -34,12 +56,14 @@ loop(Ports) ->
loop(IdlePorts, BusyPorts, Waiting) ->
receive
{call, Caller, {fsync, Path}} ->
+ lager:debug("fsync incoming request: ~p", [Path]),
case IdlePorts of
[] ->
loop(IdlePorts,
BusyPorts,
queue:in({Caller, Path}, Waiting));
[Port | Rest] ->
+ lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
Port ! {self(), {command, Path}},
loop(Rest,
dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
@@ -47,6 +71,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
end;
{Port, {data, Data}} when is_port(Port) ->
+ lager:debug("fsync request finished: ~p", [Port]),
{Caller, Starttime} = dict:fetch(Port, BusyPorts),
Stoptime = os:timestamp(),
statreport({fsync, Stoptime, Starttime}),
@@ -65,6 +90,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
NewWaiting)
end;
stop ->
+ lager:debug("fsync stop request received"),
lists:foreach(fun (Port) ->
Port ! {self(), close}
end,
@@ -78,6 +104,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
exit(normal) %% XXX exits when first port is closed
end;
{'EXIT', Port, _Reason} when is_port(Port) ->
+ lager:debug("fsync port ~p exited, exiting", [Port]),
%% XXX supervisor doesn't restart fsyncport, why?
exit(port_terminated)
end.
diff --git a/src/ht.erl b/src/ht.erl
index 12ce4e3..b4c7401 100644
--- a/src/ht.erl
+++ b/src/ht.erl
@@ -35,6 +35,7 @@
code_change/3]).
-export([testing_get_state/0, print_tree/0, print_tree/1]).
+-import(stacktrace, [call/2]).
-include_lib("eunit/include/eunit.hrl").
-import(lists, [foreach/2, foldl/3, reverse/1]).
@@ -51,30 +52,30 @@ start_link() ->
start_link(NEntries) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [NEntries], []).
reset_tree(Arg) ->
- gen_server:call(?MODULE, {reset_tree, Arg}).
+ call(?MODULE, {reset_tree, Arg}).
stop() ->
- gen_server:call(?MODULE, stop).
+ call(?MODULE, stop).
size() ->
- gen_server:call(?MODULE, size).
+ call(?MODULE, size).
add(Hash) ->
- gen_server:call(?MODULE, {add, Hash}).
+ call(?MODULE, {add, Hash}).
root() ->
- gen_server:call(?MODULE, root).
+ call(?MODULE, root).
root(Version) ->
- gen_server:call(?MODULE, {root, Version}).
+ call(?MODULE, {root, Version}).
path(I, V) ->
- gen_server:call(?MODULE, {path, I, V}).
+ call(?MODULE, {path, I, V}).
consistency(V1, V2) ->
- gen_server:call(?MODULE, {consistency, V1, V2}).
+ call(?MODULE, {consistency, V1, V2}).
leaf_hash(Data) ->
- gen_server:call(?MODULE, {leaf_hash, Data}).
+ call(?MODULE, {leaf_hash, Data}).
%% Testing and debugging.
testing_get_state() ->
- gen_server:call(?MODULE, testing_get_state).
+ call(?MODULE, testing_get_state).
print_tree() ->
- gen_server:call(?MODULE, {print_tree, 4}).
+ call(?MODULE, {print_tree, 4}).
print_tree(HashOutputLen) ->
- gen_server:call(?MODULE, {print_tree, HashOutputLen}).
+ call(?MODULE, {print_tree, HashOutputLen}).
%% gen_server callbacks
init(Args) ->
diff --git a/src/index.erl b/src/index.erl
index a2b5c4a..96195e3 100644
--- a/src/index.erl
+++ b/src/index.erl
@@ -1,17 +1,18 @@
%%% Copyright (c) 2014, NORDUnet A/S.
%%% See LICENSE for licensing information.
-%% Implements an interface to a file pair (basename and basename.chksum)
-%% that stores an ordered list of fixed-size entries. Entries can be
-%% added at the end and are retrieved by index. The list can also be
-%% truncated.
+%% Implements an interface to a file pair (basename and
+%% basename.chksum) that stores an ordered list of fixed-size entries.
+%% Entries can be added at the end and are retrieved by index. Entries
+%% can also be added at already existing indices, but then the
+%% contents must be the same.
%%
-%% Writes(add, truncate, addlast) need to be serialized.
+%% Writes(add, addlast) need to be serialized.
%% TODO: Checksums
-module(index).
--export([get/2, getrange/3, add/3, addlast/2, truncate/2]).
+-export([get/2, getrange/3, add/3, addlast/2, indexsize/1]).
-define(ENTRYSIZE, 32).
-define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)).
@@ -21,27 +22,38 @@ add(Basepath, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE ->
case file:open(Basepath, [read, write, binary]) of
{ok, File} ->
{ok, Position} = file:position(File, eof),
- case Index of
- last when Position rem ?ENTRYSIZEINFILE == 0 ->
- ok;
- Index when is_integer(Index),
- Index * ?ENTRYSIZEINFILE == Position ->
- ok
- end,
+ Mode = case Index of
+ last when Position rem ?ENTRYSIZEINFILE == 0 ->
+ write;
+ Index when is_integer(Index),
+ Index * ?ENTRYSIZEINFILE == Position ->
+ write;
+ Index when is_integer(Index),
+ Index * ?ENTRYSIZEINFILE < Position ->
+ read;
+ _ ->
+ util:exit_with_error(invalid, writefile,
+ "Index not valid")
+ end,
EntryText = hex:bin_to_hexstr(Entry) ++ "\n",
- ok = file:write(File, EntryText),
- ok = file:close(File),
- util:fsync([Basepath, filename:dirname(Basepath)]);
- {error, Error} ->
- util:exit_with_error(Error, writefile,
- "Error opening file for writing")
- end.
-
-truncate(Basepath, Index) ->
- case file:open(Basepath, [read, write, binary]) of
- {ok, File} ->
- {ok, _Position} = file:position(File, Index * ?ENTRYSIZEINFILE),
- ok = file:truncate(File),
+ case Mode of
+ write ->
+ ok = file:write(File, EntryText);
+ read ->
+ {ok, _Position} =
+ file:position(File, {bof, Index * ?ENTRYSIZEINFILE}),
+ {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE),
+ %% check that the written content is the same as
+ %% the old content
+ case binary_to_list(OldEntryText) of
+ EntryText ->
+ ok;
+ _ ->
+ util:exit_with_error(invalid, writefile,
+ "Written content not the" ++
+ " same as old content")
+ end
+ end,
ok = file:close(File),
util:fsync([Basepath, filename:dirname(Basepath)]);
{error, Error} ->
@@ -65,7 +77,19 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) ->
util:exit_with_error(badformat, readindex,
"Index line not ending with linefeed").
--spec get(string(), integer()) -> binary().
+-spec indexsize(string()) -> integer().
+indexsize(Basepath) ->
+ case file:open(Basepath, [read, binary]) of
+ {ok, File} ->
+ {ok, Filesize} = file:position(File, eof),
+ lager:debug("file ~p size ~p", [Basepath, Filesize]),
+ Filesize div ?ENTRYSIZEINFILE;
+ {error, Error} ->
+ util:exit_with_error(Error, readfile,
+ "Error opening file for reading")
+ end.
+
+-spec get(string(), integer()) -> binary() | noentry.
get(Basepath, Index) ->
case getrange(Basepath, Index, Index) of
noentry ->
@@ -76,6 +100,7 @@ get(Basepath, Index) ->
-spec getrange(string(), integer(), integer()) -> [binary()].
getrange(Basepath, Start, End) when Start =< End ->
+ lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]),
case file:open(Basepath, [read, binary]) of
{ok, File} ->
{ok, Filesize} = file:position(File, eof),
@@ -86,6 +111,7 @@ getrange(Basepath, Start, End) when Start =< End ->
{ok, EntryText} =
file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)),
Entry = decodedata(EntryText),
+ lager:debug("entries ~p", [length(Entry)]),
file:close(File),
Entry;
true ->
diff --git a/src/perm.erl b/src/perm.erl
index 466cc4f..9f02b55 100644
--- a/src/perm.erl
+++ b/src/perm.erl
@@ -49,25 +49,32 @@ path_for_key(Rootdir, Key) ->
-spec ensurefile(string(), binary(), binary()) -> ok | differ.
ensurefile(Rootdir, Key, Content) ->
+ lager:debug("dir ~p key ~p", [Rootdir, Key]),
{Dirs, Path} = path_for_key(Rootdir, Key),
case readfile_and_verify(Path, Content) of
ok ->
- util:fsync([Path, Rootdir | Dirs]);
+ lager:debug("key ~p existed, fsync", [Key]),
+ util:fsync([Path, Rootdir | Dirs]),
+ lager:debug("key ~p fsynced", [Key]);
differ ->
+ lager:debug("key ~p existed, was different", [Key]),
differ;
{error, enoent} ->
+ lager:debug("key ~p didn't exist, add", [Key]),
util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"]
++ Dirs),
makedir, "Error creating directory"),
NurseryName = Rootdir ++ "nursery/" ++
util:tempfilename(hex:bin_to_hexstr(Key)),
util:write_tempfile_and_rename(Path, NurseryName, Content),
- util:fsync([Path, Rootdir | Dirs]);
+ lager:debug("key ~p added, fsync", [Key]),
+ util:fsync([Path, Rootdir | Dirs]),
+ lager:debug("key ~p fsynced", [Key]);
{error, Error} ->
util:exit_with_error(Error, readfile, "Error reading file")
end.
--spec readfile(string(), binary()) -> binary().
+-spec readfile(string(), binary()) -> binary() | noentry.
readfile(Rootdir, Key) ->
{_Dirs, Path} = path_for_key(Rootdir, Key),
atomic:readfile(Path).
diff --git a/src/plop.erl b/src/plop.erl
index 5244144..d363582 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -35,6 +35,7 @@
-export([init/1, handle_call/3, terminate/2,
handle_cast/2, handle_info/2, code_change/3]).
+-import(stacktrace, [call/2]).
-include("plop.hrl").
%%-include("db.hrl").
-include_lib("public_key/include/public_key.hrl").
@@ -45,7 +46,10 @@
-record(state, {pubkey :: public_key:rsa_public_key(),
privkey :: public_key:rsa_private_key(),
- logid :: binary()}).
+ logid :: binary(),
+ http_requests,
+ own_requests
+ }).
%%%%% moved from plop.hrl, maybe remove
-define(PLOPVERSION, 0).
@@ -68,7 +72,23 @@ start_link(Keyfile, Passphrase) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Keyfile, Passphrase], []).
stop() ->
- gen_server:call(?MODULE, stop).
+ call(?MODULE, stop).
+
+add_http_request(Plop, RequestId, Data) ->
+ Plop#state{http_requests = dict:store(RequestId, Data,
+ Plop#state.http_requests)}.
+
+add_own_request(Plop, RequestId, Data) ->
+ Plop#state{own_requests = dict:store(RequestId, Data,
+ Plop#state.own_requests)}.
+
+remove_http_request(Plop, RequestId) ->
+ Plop#state{http_requests = dict:erase(RequestId,
+ Plop#state.http_requests)}.
+
+remove_own_request(Plop, RequestId) ->
+ Plop#state{own_requests = dict:erase(RequestId,
+ Plop#state.own_requests)}.
%%%%%%%%%%%%%%%%%%%%
init([PrivKeyfile, PubKeyfile]) ->
@@ -81,11 +101,49 @@ init([PrivKeyfile, PubKeyfile]) ->
_Tree = ht:reset_tree([db:size() - 1]),
{ok, #state{pubkey = Public_key,
privkey = Private_key,
- logid = LogID}}.
+ logid = LogID,
+ http_requests = dict:new(),
+ own_requests = dict:new()}}.
handle_cast(_Request, State) ->
{noreply, State}.
+handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}},
+ StatusCode, Body) ->
+ lager:debug("http_reply: ~p", [Body]),
+ {struct, PropList} = mochijson2:decode(Body),
+ Result = proplists:get_value(<<"result">>, PropList),
+ case dict:fetch(OwnRequestId, State#state.own_requests) of
+ undefined ->
+ {noreply, State};
+ {storage_sendentry, {From, Completion, RepliesUntilQuorum}}
+ when Result == <<"ok">>, StatusCode == 200 ->
+ case RepliesUntilQuorum - 1 of
+ 0 ->
+ %% reached quorum
+ gen_server:reply(From, ok),
+ StateWithCompletion = Completion(State),
+ {noreply, remove_own_request(StateWithCompletion,
+ OwnRequestId)};
+ NewRepliesUntilQuorum ->
+ {noreply, add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ NewRepliesUntilQuorum}})}
+ end
+ end.
+
+handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) ->
+ {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine,
+ case dict:fetch(RequestId, Plop#state.http_requests) of
+ undefined ->
+ {noreply, Plop};
+ ignore ->
+ {noreply, Plop};
+ HttpRequest ->
+ handle_http_reply(remove_http_request(Plop, RequestId),
+ HttpRequest, StatusCode, Body)
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -99,38 +157,97 @@ terminate(_Reason, _State) ->
%%%%%%%%%%%%%%%%%%%%
-spec add(binary(), binary(), binary()) -> ok.
add(LogEntry, TreeLeafHash, EntryHash) ->
- gen_server:call(?MODULE,
+ call(?MODULE,
{add, {LogEntry, TreeLeafHash, EntryHash}}).
sth() ->
- gen_server:call(?MODULE, {sth, []}).
+ call(?MODULE, {sth, []}).
-spec get(non_neg_integer(), non_neg_integer()) ->
[{non_neg_integer(), binary(), binary()}].
get(Start, End) ->
- gen_server:call(?MODULE, {get, {index, Start, End}}).
+ call(?MODULE, {get, {index, Start, End}}).
get(Hash) ->
- gen_server:call(?MODULE, {get, {hash, Hash}}).
+ call(?MODULE, {get, {hash, Hash}}).
spt(Data) ->
- gen_server:call(?MODULE, {spt, Data}).
+ call(?MODULE, {spt, Data}).
consistency(TreeSizeFirst, TreeSizeSecond) ->
- gen_server:call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}).
+ call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}).
-spec inclusion(binary(), non_neg_integer()) ->
{ok, {binary(), binary()}} | {notfound, string()}.
inclusion(Hash, TreeSize) ->
- gen_server:call(?MODULE, {inclusion, {Hash, TreeSize}}).
+ call(?MODULE, {inclusion, {Hash, TreeSize}}).
-spec inclusion_and_entry(non_neg_integer(), non_neg_integer()) ->
{ok, {binary(), binary()}} |
{notfound, string()}.
inclusion_and_entry(Index, TreeSize) ->
- gen_server:call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}).
+ call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}).
get_logid() ->
- gen_server:call(?MODULE, {get, logid}).
+ call(?MODULE, {get, logid}).
testing_get_pubkey() ->
- gen_server:call(?MODULE, {test, pubkey}).
+ call(?MODULE, {test, pubkey}).
+
+storage_nodes() ->
+ application:get_env(plop, storage_nodes, []).
+
+storage_nodes_quorum() ->
+ {ok, Value} = application:get_env(plop, storage_nodes_quorum),
+ Value.
+
+send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) ->
+ Request = mochijson2:encode(
+ {[{plop_version, 1},
+ {entry, base64:encode(LogEntry)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]),
+ httpc:request(post, {URLBase ++ "sendentry", [],
+ "text/json", list_to_binary(Request)},
+ [], [{sync, false}]).
+
+send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
+ Request = mochijson2:encode(
+ {[{plop_version, 1},
+ {entryhash, base64:encode(EntryHash)},
+ {treeleafhash, base64:encode(TreeLeafHash)}
+ ]}),
+ httpc:request(post, {URLBase ++ "entrycommitted", [],
+ "text/json", list_to_binary(Request)},
+ [], [{sync, false}]).
+
+store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
+ lager:debug("leafhash ~p", [TreeLeafHash]),
+ OwnRequestId = make_ref(),
+
+ Completion =
+ fun(CompletionState) ->
+ RequestIds = [send_storage_entrycommitted(URLBase, EntryHash,
+ TreeLeafHash)
+ || URLBase <- Nodes],
+ lists:foldl(fun({ok, RequestId}, StateAcc) ->
+ add_http_request(StateAcc, RequestId,
+ ignore)
+ end, CompletionState, RequestIds)
+ end,
+
+ PlopWithOwn = add_own_request(State, OwnRequestId,
+ {storage_sendentry,
+ {From, Completion,
+ storage_nodes_quorum()}}),
+
+ lager:debug("send requests to ~p", [Nodes]),
+ RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash)
+ || URLBase <- Nodes],
+ PlopWithRequests =
+ lists:foldl(fun({ok, RequestId}, PlopAcc) ->
+ add_http_request(PlopAcc, RequestId,
+ {storage_sendentry_http,
+ {OwnRequestId}})
+ end, PlopWithOwn, RequestIds),
+ PlopWithRequests.
fill_in_entry({_Index, LeafHash, notfetched}) ->
db:get_by_leaf_hash(LeafHash).
@@ -151,10 +268,18 @@ handle_call({get, logid}, _From,
Plop = #state{logid = LogID}) ->
{reply, LogID, Plop};
-handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) ->
- ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
- ok = ht:add(TreeLeafHash),
- {reply, ok, Plop};
+handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) ->
+ lager:debug("add leafhash ~p", [TreeLeafHash]),
+ case storage_nodes() of
+ [] ->
+ ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
+ ok = ht:add(TreeLeafHash),
+ {reply, ok, Plop};
+ Nodes ->
+ {noreply,
+ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash},
+ From, Plop)}
+ end;
handle_call({sth, Data}, _From,
Plop = #state{privkey = PrivKey}) ->
diff --git a/src/plop_app.erl b/src/plop_app.erl
index f90792d..767bf06 100644
--- a/src/plop_app.erl
+++ b/src/plop_app.erl
@@ -4,10 +4,6 @@
-module(plop_app).
-behaviour(application).
-export([start/2, stop/1]).
--export([install/1]).
-
-install(Nodes) ->
- db:init_db(Nodes).
start(normal, Args) ->
plop_sup:start_link(Args).
diff --git a/src/stacktrace.erl b/src/stacktrace.erl
new file mode 100644
index 0000000..3de4772
--- /dev/null
+++ b/src/stacktrace.erl
@@ -0,0 +1,18 @@
+%%% Copyright (c) 2014, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+-module(stacktrace).
+-export([call/2]).
+
+call(Name, Request) ->
+ Result = (catch gen_server:call(Name, Request)),
+ case Result of
+ {'EXIT', {timeout, Details}} ->
+ {current_stacktrace, Stacktrace} =
+ erlang:process_info(whereis(Name), current_stacktrace),
+ lager:error("~p: timeout ~p: ~p", [Name, Details, Stacktrace]),
+ throw(Result);
+ _ ->
+ none
+ end,
+ Result.
diff --git a/src/storage.erl b/src/storage.erl
new file mode 100644
index 0000000..8136308
--- /dev/null
+++ b/src/storage.erl
@@ -0,0 +1,63 @@
+%%% Copyright (c) 2014, NORDUnet A/S.
+%%% See LICENSE for licensing information.
+
+%%% @doc Storage node API
+
+-module(storage).
+%% API (URL)
+-export([request/3]).
+
+newentries_path() ->
+ {ok, Value} = application:get_env(plop, newentries_path),
+ Value.
+
+request(post, "ct/storage/sendentry", Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("sendentry: bad input:", E);
+ {struct, PropList} ->
+ LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)),
+ TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+
+ ok = db:add(TreeLeafHash, LogEntry),
+ ok = index:addlast(newentries_path(), TreeLeafHash),
+ success({[{result, <<"ok">>}]})
+ end;
+request(post, "ct/storage/entrycommitted", Input) ->
+ case (catch mochijson2:decode(Input)) of
+ {error, E} ->
+ html("entrycommitted: bad input:", E);
+ {struct, PropList} ->
+ EntryHash = base64:decode(proplists:get_value(<<"entryhash">>, PropList)),
+ LeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)),
+ ok = db:add_entryhash(LeafHash, EntryHash),
+ success({[{result, <<"ok">>}]})
+ end;
+request(get, "ct/storage/fetchnewentries", _Input) ->
+ NewHashes = fetchnewhashes(0),
+ Entries = lists:map(fun(LeafHash) ->
+ {[{hash, base64:encode(LeafHash)},
+ {entry, base64:encode(db:entry_for_leafhash(LeafHash))}]}
+ end, NewHashes),
+ success({[{result, <<"ok">>},
+ {entries, Entries}]}).
+
+fetchnewhashes(Index) ->
+ case index:indexsize(newentries_path()) of
+ 0 ->
+ [];
+ Size ->
+ index:getrange(newentries_path(), Index, Size - 1)
+ end.
+
+%% Private functions.
+html(Text, Input) ->
+ {400, [{"Content-Type", "text/html"}],
+ io_lib:format(
+ "<html><body><p>~n" ++
+ "~s~n" ++
+ "~p~n" ++
+ "</body></html>~n", [Text, Input])}.
+
+success(Data) ->
+ {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}.
diff --git a/src/util.erl b/src/util.erl
index dd42752..435dbc8 100644
--- a/src/util.erl
+++ b/src/util.erl
@@ -13,15 +13,13 @@ tempfilename(Base) ->
Filename.
-spec fsync([string()]) -> ok.
-fsync([]) ->
- ok;
-fsync([Name | Rest]) ->
- case fsyncport:fsync(Name) of
- ok ->
- fsync(Rest);
- {error, Error} ->
- exit_with_error(fsync, Error, "Error in fsync")
- end.
+fsync(Paths) ->
+ case fsyncport:fsyncall(Paths) of
+ ok ->
+ ok;
+ {error, Error} ->
+ exit_with_error(fsync, Error, "Error in fsync")
+ end.
-spec exit_with_error(atom(), atom(), string()) -> no_return().
exit_with_error(Operation, Error, ErrorMessage) ->