diff options
-rw-r--r-- | Emakefile | 3 | ||||
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | c_src/fsynchelper.c | 2 | ||||
-rw-r--r-- | src/atomic.erl | 2 | ||||
-rw-r--r-- | src/db.erl | 85 | ||||
-rw-r--r-- | src/frontend.erl | 130 | ||||
-rw-r--r-- | src/fsyncport.erl | 29 | ||||
-rw-r--r-- | src/ht.erl | 25 | ||||
-rw-r--r-- | src/index.erl | 80 | ||||
-rw-r--r-- | src/perm.erl | 13 | ||||
-rw-r--r-- | src/plop.erl | 159 | ||||
-rw-r--r-- | src/plop_app.erl | 4 | ||||
-rw-r--r-- | src/stacktrace.erl | 18 | ||||
-rw-r--r-- | src/storage.erl | 63 | ||||
-rw-r--r-- | src/util.erl | 16 |
15 files changed, 539 insertions, 92 deletions
@@ -2,4 +2,5 @@ {["src/*", "test/*"], [debug_info, {i, "include/"}, - {outdir, "ebin/"}]}. + {outdir, "ebin/"}, + {parse_transform, lager_transform}]}. @@ -2,7 +2,7 @@ build all: (cd c_src && make all) mkdir -p priv cp c_src/fsynchelper priv/ - erl -make + erl -pa ../lager/ebin -make clean: (cd c_src && make clean) -rm priv/fsynchelper diff --git a/c_src/fsynchelper.c b/c_src/fsynchelper.c index 117f5e9..6ffa80a 100644 --- a/c_src/fsynchelper.c +++ b/c_src/fsynchelper.c @@ -27,7 +27,7 @@ dosync(int fd) int main() { - char buf[100]; + char buf[1000]; ssize_t len; /* XXX: exits when command size is 0 */ diff --git a/src/atomic.erl b/src/atomic.erl index 5ad48ba..36fba81 100644 --- a/src/atomic.erl +++ b/src/atomic.erl @@ -10,7 +10,7 @@ replacefile(Path, Content) -> util:write_tempfile_and_rename(Path, TempName, Content), util:fsync([Path, filename:dirname(Path)]). --spec readfile(string()) -> binary(). +-spec readfile(string()) -> binary() | noentry. readfile(Path) -> case file:read_file(Path) of {ok, Contents} -> @@ -6,18 +6,24 @@ %% API. -export([start_link/0, stop/0]). --export([add/4, size/0]). --export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1]). +-export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]). +-export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]). +-export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). +-export([leafhash_for_indices/2, indexsize/0]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). +-import(stacktrace, [call/2]). -include_lib("stdlib/include/qlc.hrl"). -include("db.hrl"). size() -> binary_to_integer(atomic:readfile(treesize_path())). +indexsize() -> + index:indexsize(index_path()). + init(_Args) -> {ok, []}. @@ -25,34 +31,50 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). stop() -> - gen_server:call(?MODULE, stop). + call(?MODULE, stop). %%%%%%%%%%%%%%%%%%%% %% Public API. -spec add(binary(), binary(), binary(), non_neg_integer()) -> ok. add(LeafHash, EntryHash, Data, Index) -> - gen_server:call(?MODULE, {add, {LeafHash, EntryHash, Data, Index}}). + call(?MODULE, {add, {LeafHash, EntryHash, Data, Index}}). + +-spec add(binary(), binary()) -> ok. +add(LeafHash, Data) -> + call(?MODULE, {add, {LeafHash, Data}}). + +-spec add_entryhash(binary(), binary()) -> ok. +add_entryhash(LeafHash, EntryHash) -> + call(?MODULE, {add_entryhash, {LeafHash, EntryHash}}). + +-spec add_index(binary(), non_neg_integer()) -> ok. +add_index(LeafHash, Index) -> + call(?MODULE, {add_index, {LeafHash, Index}}). + +-spec set_treesize(non_neg_integer()) -> ok. +set_treesize(Size) -> + call(?MODULE, {set_treesize, Size}). -spec get_by_indices(integer(), integer(), {sorted, true|false}) -> [{non_neg_integer(), binary(), binary()}]. get_by_indices(Start, End, {sorted, Sorted}) -> - gen_server:call(?MODULE, {get_by_indices, {Start, End, Sorted}}). + call(?MODULE, {get_by_indices, {Start, End, Sorted}}). -spec get_by_index(binary()) -> notfound | {non_neg_integer(), binary(), binary()}. get_by_index(Index) -> - gen_server:call(?MODULE, {get_by_index, Index}). + call(?MODULE, {get_by_index, Index}). -spec get_by_leaf_hash(binary()) -> notfound | {non_neg_integer(), binary(), binary()}. get_by_leaf_hash(LeafHash) -> - gen_server:call(?MODULE, {get_by_leaf_hash, LeafHash}). + call(?MODULE, {get_by_leaf_hash, LeafHash}). -spec get_by_entry_hash(binary()) -> notfound | {non_neg_integer(), binary(), binary()}. get_by_entry_hash(EntryHash) -> - gen_server:call(?MODULE, {get_by_entry_hash, EntryHash}). + call(?MODULE, {get_by_entry_hash, EntryHash}). %%%%%%%%%%%%%%%%%%%% %% gen_server callbacks. @@ -103,7 +125,12 @@ entry_for_leafhash(LeafHash) -> perm:readfile(entry_root_path(), LeafHash). index_for_leafhash(LeafHash) -> - binary_to_integer(perm:readfile(indexforhash_root_path(), LeafHash)). + case perm:readfile(indexforhash_root_path(), LeafHash) of + noentry -> + noentry; + Index -> + binary_to_integer(Index) + end. leafhash_for_index(Index) -> index:get(index_path(), Index). @@ -137,7 +164,27 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> ok = perm:ensurefile(indexforhash_root_path(), LeafHash, integer_to_binary(Index)), ok = index:add(index_path(), Index, LeafHash), - ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)), + ok = atomic:replacefile(treesize_path(), integer_to_binary(Index+1)), + {reply, ok, State}; + +handle_call({add, {LeafHash, Data}}, _From, State) -> + lager:debug("add leafhash ~p", [LeafHash]), + ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + lager:debug("leafhash ~p added", [LeafHash]), + {reply, ok, State}; + +handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) -> + ok = perm:ensurefile(entryhash_root_path(), EntryHash, LeafHash), + {reply, ok, State}; + +handle_call({add_index, {LeafHash, Index}}, _From, State) -> + ok = perm:ensurefile(indexforhash_root_path(), + LeafHash, integer_to_binary(Index)), + ok = index:add(index_path(), Index, LeafHash), + {reply, ok, State}; + +handle_call({set_treesize, Size}, _From, State) -> + ok = atomic:replacefile(treesize_path(), integer_to_binary(Size)), {reply, ok, State}; handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) -> @@ -150,9 +197,17 @@ handle_call({get_by_index, Index}, _From, State) -> {reply, R, State}; handle_call({get_by_leaf_hash, LeafHash}, _From, State) -> - Entry = entry_for_leafhash(LeafHash), - Index = index_for_leafhash(LeafHash), - R = {Index, LeafHash, Entry}, + R = case entry_for_leafhash(LeafHash) of + noentry -> + notfound; + Entry -> + case index_for_leafhash(LeafHash) of + noentry -> + notfound; + Index -> + {Index, LeafHash, Entry} + end + end, {reply, R, State}; handle_call({get_by_entry_hash, EntryHash}, _From, State) -> @@ -161,7 +216,7 @@ handle_call({get_by_entry_hash, EntryHash}, _From, State) -> notfound; LeafHash -> Entry = entry_for_leafhash(LeafHash), - Index = index_for_leafhash(LeafHash), - {Index, LeafHash, Entry} + %% Don't fetch index, isn't used and might not exist + {notfetched, LeafHash, Entry} end, {reply, R, State}. diff --git a/src/frontend.erl b/src/frontend.erl new file mode 100644 index 0000000..9c69517 --- /dev/null +++ b/src/frontend.erl @@ -0,0 +1,130 @@ +%%% Copyright (c) 2014, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Frontend node API + +-module(frontend). +%% API (URL) +-export([request/3]). + +request(post, "ct/frontend/sendentry", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + + ok = db:add(TreeLeafHash, LogEntry), + success({[{result, <<"ok">>}]}) + end; + +request(post, "ct/frontend/sendlog", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + Start = proplists:get_value(<<"start">>, PropList), + Hashes = lists:map(fun (S) -> base64:decode(S) end, proplists:get_value(<<"hashes">>, PropList)), + + Indices = lists:seq(Start, Start + length(Hashes) - 1), + lists:foreach(fun ({Hash, Index}) -> + ok = db:add_index(Hash, Index) + end, lists:zip(Hashes, Indices)), + success({[{result, <<"ok">>}]}) + end; + +request(post, "ct/frontend/sendsth", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + OldSize = db:size(), + Treesize = proplists:get_value(<<"tree_size">>, PropList), + Indexsize = db:indexsize(), + + if + Treesize < OldSize -> + html("Size is older than current size", OldSize); + Treesize == OldSize -> + success({[{result, <<"ok">>}]}); + Treesize > Indexsize -> + html("Has too few entries", Indexsize); + true -> + NewEntries = db:leafhash_for_indices(OldSize, Treesize - 1), + lager:debug("old size: ~p new size: ~p entries: ~p", + [OldSize, Treesize, NewEntries]), + + Errors = check_entries(NewEntries, OldSize, Treesize - 1), + + case Errors of + [] -> + ok = db:set_treesize(Treesize), + ht:reset_tree([db:size() - 1]), + success({[{result, <<"ok">>}]}); + _ -> + html("Database not complete", Errors) + end + end + end; + +request(get, "ct/frontend/currentposition", _Query) -> + Size = db:size(), + success({[{result, <<"ok">>}, + {position, Size}]}); + +request(get, "ct/frontend/missingentries", _Query) -> + Size = db:size(), + Missing = fetchmissingentries(Size), + lager:debug("missingentries: ~p", [Missing]), + success({[{result, <<"ok">>}, + {entries, lists:map(fun (Entry) -> base64:encode(Entry) end, + Missing)}]}). +check_entries(Entries, Start, End) -> + lists:foldl(fun ({Hash, Index}, Acc) -> + case check_entry(Hash, Index) of + ok -> + Acc; + Error -> + [Error | Acc] + end + end, [], lists:zip(Entries, lists:seq(Start, End))). + +check_entry(Hash, Index) -> + case db:get_by_leaf_hash(Hash) of + notfound -> + {notfound, Index}; + _ -> + ok + end. + +fetchmissingentries(Index) -> + lists:reverse(fetchmissingentries(Index, [])). + +fetchmissingentries(Index, Acc) -> + lager:debug("index ~p", [Index]), + case db:leafhash_for_index(Index) of + noentry -> + Acc; + Hash -> + case db:entry_for_leafhash(Hash) of + noentry -> + lager:debug("didn't find hash ~p", [Hash]), + fetchmissingentries(Index + 1, [Hash | Acc]); + _ -> + fetchmissingentries(Index + 1, Acc) + end + end. + + +%% Private functions. +html(Text, Input) -> + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "<html><body><p>~n" ++ + "~s~n" ++ + "~p~n" ++ + "</body></html>~n", [Text, Input])}. + +success(Data) -> + {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}. diff --git a/src/fsyncport.erl b/src/fsyncport.erl index 7e2bf11..c9be44d 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -3,7 +3,7 @@ -module(fsyncport). -export([start_link/0, stop/0, init/1]). --export([fsync/1]). +-export([fsync/1, fsyncall/1]). start_link() -> Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), @@ -14,6 +14,9 @@ stop() -> fsync(Path) -> call_port({fsync, Path}). +fsyncall(Paths) -> + call_port_multi([{fsync, Path} || Path <- Paths]). + call_port(Msg) -> fsyncport ! {call, self(), Msg}, receive @@ -21,12 +24,31 @@ call_port(Msg) -> Result end. +call_port_multi(Msgs) -> + lists:foreach(fun (Msg) -> + fsyncport ! {call, self(), Msg} + end, Msgs), + lists:foldl(fun (_Msg, Acc) -> + R = receive + {fsyncport, Result} -> + Result + end, + case R of + ok -> + Acc; + Error -> + Error + end + end, ok, Msgs). + init(ExtPrg) -> + lager:debug("starting fsync service"), register(fsyncport, self()), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), + lager:debug("fsync service started", []), loop(Ports). loop(Ports) -> @@ -34,12 +56,14 @@ loop(Ports) -> loop(IdlePorts, BusyPorts, Waiting) -> receive {call, Caller, {fsync, Path}} -> + lager:debug("fsync incoming request: ~p", [Path]), case IdlePorts of [] -> loop(IdlePorts, BusyPorts, queue:in({Caller, Path}, Waiting)); [Port | Rest] -> + lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), Port ! {self(), {command, Path}}, loop(Rest, dict:store(Port, {Caller, os:timestamp()}, BusyPorts), @@ -47,6 +71,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> end; {Port, {data, Data}} when is_port(Port) -> + lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, BusyPorts), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), @@ -65,6 +90,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> NewWaiting) end; stop -> + lager:debug("fsync stop request received"), lists:foreach(fun (Port) -> Port ! {self(), close} end, @@ -78,6 +104,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> exit(normal) %% XXX exits when first port is closed end; {'EXIT', Port, _Reason} when is_port(Port) -> + lager:debug("fsync port ~p exited, exiting", [Port]), %% XXX supervisor doesn't restart fsyncport, why? exit(port_terminated) end. @@ -35,6 +35,7 @@ code_change/3]). -export([testing_get_state/0, print_tree/0, print_tree/1]). +-import(stacktrace, [call/2]). -include_lib("eunit/include/eunit.hrl"). -import(lists, [foreach/2, foldl/3, reverse/1]). @@ -51,30 +52,30 @@ start_link() -> start_link(NEntries) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [NEntries], []). reset_tree(Arg) -> - gen_server:call(?MODULE, {reset_tree, Arg}). + call(?MODULE, {reset_tree, Arg}). stop() -> - gen_server:call(?MODULE, stop). + call(?MODULE, stop). size() -> - gen_server:call(?MODULE, size). + call(?MODULE, size). add(Hash) -> - gen_server:call(?MODULE, {add, Hash}). + call(?MODULE, {add, Hash}). root() -> - gen_server:call(?MODULE, root). + call(?MODULE, root). root(Version) -> - gen_server:call(?MODULE, {root, Version}). + call(?MODULE, {root, Version}). path(I, V) -> - gen_server:call(?MODULE, {path, I, V}). + call(?MODULE, {path, I, V}). consistency(V1, V2) -> - gen_server:call(?MODULE, {consistency, V1, V2}). + call(?MODULE, {consistency, V1, V2}). leaf_hash(Data) -> - gen_server:call(?MODULE, {leaf_hash, Data}). + call(?MODULE, {leaf_hash, Data}). %% Testing and debugging. testing_get_state() -> - gen_server:call(?MODULE, testing_get_state). + call(?MODULE, testing_get_state). print_tree() -> - gen_server:call(?MODULE, {print_tree, 4}). + call(?MODULE, {print_tree, 4}). print_tree(HashOutputLen) -> - gen_server:call(?MODULE, {print_tree, HashOutputLen}). + call(?MODULE, {print_tree, HashOutputLen}). %% gen_server callbacks init(Args) -> diff --git a/src/index.erl b/src/index.erl index a2b5c4a..96195e3 100644 --- a/src/index.erl +++ b/src/index.erl @@ -1,17 +1,18 @@ %%% Copyright (c) 2014, NORDUnet A/S. %%% See LICENSE for licensing information. -%% Implements an interface to a file pair (basename and basename.chksum) -%% that stores an ordered list of fixed-size entries. Entries can be -%% added at the end and are retrieved by index. The list can also be -%% truncated. +%% Implements an interface to a file pair (basename and +%% basename.chksum) that stores an ordered list of fixed-size entries. +%% Entries can be added at the end and are retrieved by index. Entries +%% can also be added at already existing indices, but then the +%% contents must be the same. %% -%% Writes(add, truncate, addlast) need to be serialized. +%% Writes(add, addlast) need to be serialized. %% TODO: Checksums -module(index). --export([get/2, getrange/3, add/3, addlast/2, truncate/2]). +-export([get/2, getrange/3, add/3, addlast/2, indexsize/1]). -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). @@ -21,27 +22,38 @@ add(Basepath, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> case file:open(Basepath, [read, write, binary]) of {ok, File} -> {ok, Position} = file:position(File, eof), - case Index of - last when Position rem ?ENTRYSIZEINFILE == 0 -> - ok; - Index when is_integer(Index), - Index * ?ENTRYSIZEINFILE == Position -> - ok - end, + Mode = case Index of + last when Position rem ?ENTRYSIZEINFILE == 0 -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE == Position -> + write; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE < Position -> + read; + _ -> + util:exit_with_error(invalid, writefile, + "Index not valid") + end, EntryText = hex:bin_to_hexstr(Entry) ++ "\n", - ok = file:write(File, EntryText), - ok = file:close(File), - util:fsync([Basepath, filename:dirname(Basepath)]); - {error, Error} -> - util:exit_with_error(Error, writefile, - "Error opening file for writing") - end. - -truncate(Basepath, Index) -> - case file:open(Basepath, [read, write, binary]) of - {ok, File} -> - {ok, _Position} = file:position(File, Index * ?ENTRYSIZEINFILE), - ok = file:truncate(File), + case Mode of + write -> + ok = file:write(File, EntryText); + read -> + {ok, _Position} = + file:position(File, {bof, Index * ?ENTRYSIZEINFILE}), + {ok, OldEntryText} = file:read(File, ?ENTRYSIZEINFILE), + %% check that the written content is the same as + %% the old content + case binary_to_list(OldEntryText) of + EntryText -> + ok; + _ -> + util:exit_with_error(invalid, writefile, + "Written content not the" ++ + " same as old content") + end + end, ok = file:close(File), util:fsync([Basepath, filename:dirname(Basepath)]); {error, Error} -> @@ -65,7 +77,19 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> util:exit_with_error(badformat, readindex, "Index line not ending with linefeed"). --spec get(string(), integer()) -> binary(). +-spec indexsize(string()) -> integer(). +indexsize(Basepath) -> + case file:open(Basepath, [read, binary]) of + {ok, File} -> + {ok, Filesize} = file:position(File, eof), + lager:debug("file ~p size ~p", [Basepath, Filesize]), + Filesize div ?ENTRYSIZEINFILE; + {error, Error} -> + util:exit_with_error(Error, readfile, + "Error opening file for reading") + end. + +-spec get(string(), integer()) -> binary() | noentry. get(Basepath, Index) -> case getrange(Basepath, Index, Index) of noentry -> @@ -76,6 +100,7 @@ get(Basepath, Index) -> -spec getrange(string(), integer(), integer()) -> [binary()]. getrange(Basepath, Start, End) when Start =< End -> + lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), @@ -86,6 +111,7 @@ getrange(Basepath, Start, End) when Start =< End -> {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)), Entry = decodedata(EntryText), + lager:debug("entries ~p", [length(Entry)]), file:close(File), Entry; true -> diff --git a/src/perm.erl b/src/perm.erl index 466cc4f..9f02b55 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -49,25 +49,32 @@ path_for_key(Rootdir, Key) -> -spec ensurefile(string(), binary(), binary()) -> ok | differ. ensurefile(Rootdir, Key, Content) -> + lager:debug("dir ~p key ~p", [Rootdir, Key]), {Dirs, Path} = path_for_key(Rootdir, Key), case readfile_and_verify(Path, Content) of ok -> - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p existed, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); differ -> + lager:debug("key ~p existed, was different", [Key]), differ; {error, enoent} -> + lager:debug("key ~p didn't exist, add", [Key]), util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"] ++ Dirs), makedir, "Error creating directory"), NurseryName = Rootdir ++ "nursery/" ++ util:tempfilename(hex:bin_to_hexstr(Key)), util:write_tempfile_and_rename(Path, NurseryName, Content), - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p added, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); {error, Error} -> util:exit_with_error(Error, readfile, "Error reading file") end. --spec readfile(string(), binary()) -> binary(). +-spec readfile(string(), binary()) -> binary() | noentry. readfile(Rootdir, Key) -> {_Dirs, Path} = path_for_key(Rootdir, Key), atomic:readfile(Path). diff --git a/src/plop.erl b/src/plop.erl index 5244144..d363582 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -35,6 +35,7 @@ -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). +-import(stacktrace, [call/2]). -include("plop.hrl"). %%-include("db.hrl"). -include_lib("public_key/include/public_key.hrl"). @@ -45,7 +46,10 @@ -record(state, {pubkey :: public_key:rsa_public_key(), privkey :: public_key:rsa_private_key(), - logid :: binary()}). + logid :: binary(), + http_requests, + own_requests + }). %%%%% moved from plop.hrl, maybe remove -define(PLOPVERSION, 0). @@ -68,7 +72,23 @@ start_link(Keyfile, Passphrase) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Keyfile, Passphrase], []). stop() -> - gen_server:call(?MODULE, stop). + call(?MODULE, stop). + +add_http_request(Plop, RequestId, Data) -> + Plop#state{http_requests = dict:store(RequestId, Data, + Plop#state.http_requests)}. + +add_own_request(Plop, RequestId, Data) -> + Plop#state{own_requests = dict:store(RequestId, Data, + Plop#state.own_requests)}. + +remove_http_request(Plop, RequestId) -> + Plop#state{http_requests = dict:erase(RequestId, + Plop#state.http_requests)}. + +remove_own_request(Plop, RequestId) -> + Plop#state{own_requests = dict:erase(RequestId, + Plop#state.own_requests)}. %%%%%%%%%%%%%%%%%%%% init([PrivKeyfile, PubKeyfile]) -> @@ -81,11 +101,49 @@ init([PrivKeyfile, PubKeyfile]) -> _Tree = ht:reset_tree([db:size() - 1]), {ok, #state{pubkey = Public_key, privkey = Private_key, - logid = LogID}}. + logid = LogID, + http_requests = dict:new(), + own_requests = dict:new()}}. handle_cast(_Request, State) -> {noreply, State}. +handle_http_reply(State, {storage_sendentry_http, {OwnRequestId}}, + StatusCode, Body) -> + lager:debug("http_reply: ~p", [Body]), + {struct, PropList} = mochijson2:decode(Body), + Result = proplists:get_value(<<"result">>, PropList), + case dict:fetch(OwnRequestId, State#state.own_requests) of + undefined -> + {noreply, State}; + {storage_sendentry, {From, Completion, RepliesUntilQuorum}} + when Result == <<"ok">>, StatusCode == 200 -> + case RepliesUntilQuorum - 1 of + 0 -> + %% reached quorum + gen_server:reply(From, ok), + StateWithCompletion = Completion(State), + {noreply, remove_own_request(StateWithCompletion, + OwnRequestId)}; + NewRepliesUntilQuorum -> + {noreply, add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + NewRepliesUntilQuorum}})} + end + end. + +handle_info({http, {RequestId, {StatusLine, _Headers, Body}}}, Plop) -> + {_HttpVersion, StatusCode, _ReasonPhrase} = StatusLine, + case dict:fetch(RequestId, Plop#state.http_requests) of + undefined -> + {noreply, Plop}; + ignore -> + {noreply, Plop}; + HttpRequest -> + handle_http_reply(remove_http_request(Plop, RequestId), + HttpRequest, StatusCode, Body) + end; handle_info(_Info, State) -> {noreply, State}. @@ -99,38 +157,97 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% -spec add(binary(), binary(), binary()) -> ok. add(LogEntry, TreeLeafHash, EntryHash) -> - gen_server:call(?MODULE, + call(?MODULE, {add, {LogEntry, TreeLeafHash, EntryHash}}). sth() -> - gen_server:call(?MODULE, {sth, []}). + call(?MODULE, {sth, []}). -spec get(non_neg_integer(), non_neg_integer()) -> [{non_neg_integer(), binary(), binary()}]. get(Start, End) -> - gen_server:call(?MODULE, {get, {index, Start, End}}). + call(?MODULE, {get, {index, Start, End}}). get(Hash) -> - gen_server:call(?MODULE, {get, {hash, Hash}}). + call(?MODULE, {get, {hash, Hash}}). spt(Data) -> - gen_server:call(?MODULE, {spt, Data}). + call(?MODULE, {spt, Data}). consistency(TreeSizeFirst, TreeSizeSecond) -> - gen_server:call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}). + call(?MODULE, {consistency, {TreeSizeFirst, TreeSizeSecond}}). -spec inclusion(binary(), non_neg_integer()) -> {ok, {binary(), binary()}} | {notfound, string()}. inclusion(Hash, TreeSize) -> - gen_server:call(?MODULE, {inclusion, {Hash, TreeSize}}). + call(?MODULE, {inclusion, {Hash, TreeSize}}). -spec inclusion_and_entry(non_neg_integer(), non_neg_integer()) -> {ok, {binary(), binary()}} | {notfound, string()}. inclusion_and_entry(Index, TreeSize) -> - gen_server:call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}). + call(?MODULE, {inclusion_and_entry, {Index, TreeSize}}). get_logid() -> - gen_server:call(?MODULE, {get, logid}). + call(?MODULE, {get, logid}). testing_get_pubkey() -> - gen_server:call(?MODULE, {test, pubkey}). + call(?MODULE, {test, pubkey}). + +storage_nodes() -> + application:get_env(plop, storage_nodes, []). + +storage_nodes_quorum() -> + {ok, Value} = application:get_env(plop, storage_nodes_quorum), + Value. + +send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) -> + Request = mochijson2:encode( + {[{plop_version, 1}, + {entry, base64:encode(LogEntry)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + lager:debug("send sendentry to storage node ~p: ~p", [URLBase, Request]), + httpc:request(post, {URLBase ++ "sendentry", [], + "text/json", list_to_binary(Request)}, + [], [{sync, false}]). + +send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> + Request = mochijson2:encode( + {[{plop_version, 1}, + {entryhash, base64:encode(EntryHash)}, + {treeleafhash, base64:encode(TreeLeafHash)} + ]}), + httpc:request(post, {URLBase ++ "entrycommitted", [], + "text/json", list_to_binary(Request)}, + [], [{sync, false}]). + +store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + lager:debug("leafhash ~p", [TreeLeafHash]), + OwnRequestId = make_ref(), + + Completion = + fun(CompletionState) -> + RequestIds = [send_storage_entrycommitted(URLBase, EntryHash, + TreeLeafHash) + || URLBase <- Nodes], + lists:foldl(fun({ok, RequestId}, StateAcc) -> + add_http_request(StateAcc, RequestId, + ignore) + end, CompletionState, RequestIds) + end, + + PlopWithOwn = add_own_request(State, OwnRequestId, + {storage_sendentry, + {From, Completion, + storage_nodes_quorum()}}), + + lager:debug("send requests to ~p", [Nodes]), + RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) + || URLBase <- Nodes], + PlopWithRequests = + lists:foldl(fun({ok, RequestId}, PlopAcc) -> + add_http_request(PlopAcc, RequestId, + {storage_sendentry_http, + {OwnRequestId}}) + end, PlopWithOwn, RequestIds), + PlopWithRequests. fill_in_entry({_Index, LeafHash, notfetched}) -> db:get_by_leaf_hash(LeafHash). @@ -151,10 +268,18 @@ handle_call({get, logid}, _From, Plop = #state{logid = LogID}) -> {reply, LogID, Plop}; -handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) -> - ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), - ok = ht:add(TreeLeafHash), - {reply, ok, Plop}; +handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + lager:debug("add leafhash ~p", [TreeLeafHash]), + case storage_nodes() of + [] -> + ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), + ok = ht:add(TreeLeafHash), + {reply, ok, Plop}; + Nodes -> + {noreply, + store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, + From, Plop)} + end; handle_call({sth, Data}, _From, Plop = #state{privkey = PrivKey}) -> diff --git a/src/plop_app.erl b/src/plop_app.erl index f90792d..767bf06 100644 --- a/src/plop_app.erl +++ b/src/plop_app.erl @@ -4,10 +4,6 @@ -module(plop_app). -behaviour(application). -export([start/2, stop/1]). --export([install/1]). - -install(Nodes) -> - db:init_db(Nodes). start(normal, Args) -> plop_sup:start_link(Args). diff --git a/src/stacktrace.erl b/src/stacktrace.erl new file mode 100644 index 0000000..3de4772 --- /dev/null +++ b/src/stacktrace.erl @@ -0,0 +1,18 @@ +%%% Copyright (c) 2014, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(stacktrace). +-export([call/2]). + +call(Name, Request) -> + Result = (catch gen_server:call(Name, Request)), + case Result of + {'EXIT', {timeout, Details}} -> + {current_stacktrace, Stacktrace} = + erlang:process_info(whereis(Name), current_stacktrace), + lager:error("~p: timeout ~p: ~p", [Name, Details, Stacktrace]), + throw(Result); + _ -> + none + end, + Result. diff --git a/src/storage.erl b/src/storage.erl new file mode 100644 index 0000000..8136308 --- /dev/null +++ b/src/storage.erl @@ -0,0 +1,63 @@ +%%% Copyright (c) 2014, NORDUnet A/S. +%%% See LICENSE for licensing information. + +%%% @doc Storage node API + +-module(storage). +%% API (URL) +-export([request/3]). + +newentries_path() -> + {ok, Value} = application:get_env(plop, newentries_path), + Value. + +request(post, "ct/storage/sendentry", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("sendentry: bad input:", E); + {struct, PropList} -> + LogEntry = base64:decode(proplists:get_value(<<"entry">>, PropList)), + TreeLeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + + ok = db:add(TreeLeafHash, LogEntry), + ok = index:addlast(newentries_path(), TreeLeafHash), + success({[{result, <<"ok">>}]}) + end; +request(post, "ct/storage/entrycommitted", Input) -> + case (catch mochijson2:decode(Input)) of + {error, E} -> + html("entrycommitted: bad input:", E); + {struct, PropList} -> + EntryHash = base64:decode(proplists:get_value(<<"entryhash">>, PropList)), + LeafHash = base64:decode(proplists:get_value(<<"treeleafhash">>, PropList)), + ok = db:add_entryhash(LeafHash, EntryHash), + success({[{result, <<"ok">>}]}) + end; +request(get, "ct/storage/fetchnewentries", _Input) -> + NewHashes = fetchnewhashes(0), + Entries = lists:map(fun(LeafHash) -> + {[{hash, base64:encode(LeafHash)}, + {entry, base64:encode(db:entry_for_leafhash(LeafHash))}]} + end, NewHashes), + success({[{result, <<"ok">>}, + {entries, Entries}]}). + +fetchnewhashes(Index) -> + case index:indexsize(newentries_path()) of + 0 -> + []; + Size -> + index:getrange(newentries_path(), Index, Size - 1) + end. + +%% Private functions. +html(Text, Input) -> + {400, [{"Content-Type", "text/html"}], + io_lib:format( + "<html><body><p>~n" ++ + "~s~n" ++ + "~p~n" ++ + "</body></html>~n", [Text, Input])}. + +success(Data) -> + {200, [{"Content-Type", "text/json"}], mochijson2:encode(Data)}. diff --git a/src/util.erl b/src/util.erl index dd42752..435dbc8 100644 --- a/src/util.erl +++ b/src/util.erl @@ -13,15 +13,13 @@ tempfilename(Base) -> Filename. -spec fsync([string()]) -> ok. -fsync([]) -> - ok; -fsync([Name | Rest]) -> - case fsyncport:fsync(Name) of - ok -> - fsync(Rest); - {error, Error} -> - exit_with_error(fsync, Error, "Error in fsync") - end. +fsync(Paths) -> + case fsyncport:fsyncall(Paths) of + ok -> + ok; + {error, Error} -> + exit_with_error(fsync, Error, "Error in fsync") + end. -spec exit_with_error(atom(), atom(), string()) -> no_return(). exit_with_error(Operation, Error, ErrorMessage) -> |