summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2014-10-29 16:35:44 +0100
committerLinus Nordberg <linus@nordberg.se>2014-10-29 16:56:48 +0100
commit92f681e1cbb444317d2603994c60c02feeab32be (patch)
treeef62cdfece8c1f063cb27cf299094e1f4d7eed1a /src
parentb15f4636337c45b487651e8d442afed0d4141725 (diff)
parentcc2aaa2807bb13f4683c2d74a414d39d5b29a372 (diff)
Merge remote-tracking branch 'refs/remotes/map/external-merge3' into merging-external-merge
Conflicts: src/db.erl src/frontend.erl src/index.erl src/plop.erl src/storage.erl src/ts.erl
Diffstat (limited to 'src')
-rw-r--r--src/atomic.erl2
-rw-r--r--src/db.erl10
-rw-r--r--src/frontend.erl55
-rw-r--r--src/fsyncport.erl29
-rw-r--r--src/index.erl18
-rw-r--r--src/perm.erl13
-rw-r--r--src/plop.erl12
-rw-r--r--src/plop_app.erl4
-rw-r--r--src/storage.erl13
-rw-r--r--src/ts.erl18
-rw-r--r--src/util.erl16
11 files changed, 139 insertions, 51 deletions
diff --git a/src/atomic.erl b/src/atomic.erl
index 5ad48ba..36fba81 100644
--- a/src/atomic.erl
+++ b/src/atomic.erl
@@ -10,7 +10,7 @@ replacefile(Path, Content) ->
util:write_tempfile_and_rename(Path, TempName, Content),
util:fsync([Path, filename:dirname(Path)]).
--spec readfile(string()) -> binary().
+-spec readfile(string()) -> binary() | noentry.
readfile(Path) ->
case file:read_file(Path) of
{ok, Contents} ->
diff --git a/src/db.erl b/src/db.erl
index cb1e618..60c4d30 100644
--- a/src/db.erl
+++ b/src/db.erl
@@ -9,6 +9,7 @@
-export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]).
-export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]).
-export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]).
+-export([leafhash_for_indices/2, indexsize/0]).
%% gen_server callbacks.
-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2,
code_change/3]).
@@ -19,6 +20,9 @@
size() ->
binary_to_integer(atomic:readfile(treesize_path())).
+indexsize() ->
+ index:indexsize(index_path()).
+
init(_Args) ->
{ok, []}.
@@ -159,11 +163,13 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) ->
ok = perm:ensurefile(indexforhash_root_path(),
LeafHash, integer_to_binary(Index)),
ok = index:add(index_path(), Index, LeafHash),
- ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)),
+ ok = atomic:replacefile(treesize_path(), integer_to_binary(Index+1)),
{reply, ok, State};
handle_call({add, {LeafHash, Data}}, _From, State) ->
+ lager:debug("add leafhash ~p", [LeafHash]),
ok = perm:ensurefile(entry_root_path(), LeafHash, Data),
+ lager:debug("leafhash ~p added", [LeafHash]),
{reply, ok, State};
handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) ->
@@ -177,7 +183,7 @@ handle_call({add_index, {LeafHash, Index}}, _From, State) ->
{reply, ok, State};
handle_call({set_treesize, Size}, _From, State) ->
- ok = atomic:replacefile(treesize_path(), integer_to_list(Size)),
+ ok = atomic:replacefile(treesize_path(), integer_to_binary(Size)),
{reply, ok, State};
handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) ->
diff --git a/src/frontend.erl b/src/frontend.erl
index a8a8b9e..9c69517 100644
--- a/src/frontend.erl
+++ b/src/frontend.erl
@@ -39,13 +39,33 @@ request(post, "ct/frontend/sendsth", Input) ->
{error, E} ->
html("sendentry: bad input:", E);
{struct, PropList} ->
+ OldSize = db:size(),
Treesize = proplists:get_value(<<"tree_size">>, PropList),
-
- ok = db:set_treesize(Treesize),
-
- ht:reset_tree([db:size() - 1]),
-
- success({[{result, <<"ok">>}]})
+ Indexsize = db:indexsize(),
+
+ if
+ Treesize < OldSize ->
+ html("Size is older than current size", OldSize);
+ Treesize == OldSize ->
+ success({[{result, <<"ok">>}]});
+ Treesize > Indexsize ->
+ html("Has too few entries", Indexsize);
+ true ->
+ NewEntries = db:leafhash_for_indices(OldSize, Treesize - 1),
+ lager:debug("old size: ~p new size: ~p entries: ~p",
+ [OldSize, Treesize, NewEntries]),
+
+ Errors = check_entries(NewEntries, OldSize, Treesize - 1),
+
+ case Errors of
+ [] ->
+ ok = db:set_treesize(Treesize),
+ ht:reset_tree([db:size() - 1]),
+ success({[{result, <<"ok">>}]});
+ _ ->
+ html("Database not complete", Errors)
+ end
+ end
end;
request(get, "ct/frontend/currentposition", _Query) ->
@@ -56,19 +76,40 @@ request(get, "ct/frontend/currentposition", _Query) ->
request(get, "ct/frontend/missingentries", _Query) ->
Size = db:size(),
Missing = fetchmissingentries(Size),
+ lager:debug("missingentries: ~p", [Missing]),
success({[{result, <<"ok">>},
- {entries, Missing}]}).
+ {entries, lists:map(fun (Entry) -> base64:encode(Entry) end,
+ Missing)}]}).
+check_entries(Entries, Start, End) ->
+ lists:foldl(fun ({Hash, Index}, Acc) ->
+ case check_entry(Hash, Index) of
+ ok ->
+ Acc;
+ Error ->
+ [Error | Acc]
+ end
+ end, [], lists:zip(Entries, lists:seq(Start, End))).
+
+check_entry(Hash, Index) ->
+ case db:get_by_leaf_hash(Hash) of
+ notfound ->
+ {notfound, Index};
+ _ ->
+ ok
+ end.
fetchmissingentries(Index) ->
lists:reverse(fetchmissingentries(Index, [])).
fetchmissingentries(Index, Acc) ->
+ lager:debug("index ~p", [Index]),
case db:leafhash_for_index(Index) of
noentry ->
Acc;
Hash ->
case db:entry_for_leafhash(Hash) of
noentry ->
+ lager:debug("didn't find hash ~p", [Hash]),
fetchmissingentries(Index + 1, [Hash | Acc]);
_ ->
fetchmissingentries(Index + 1, Acc)
diff --git a/src/fsyncport.erl b/src/fsyncport.erl
index 7e2bf11..c9be44d 100644
--- a/src/fsyncport.erl
+++ b/src/fsyncport.erl
@@ -3,7 +3,7 @@
-module(fsyncport).
-export([start_link/0, stop/0, init/1]).
--export([fsync/1]).
+-export([fsync/1, fsyncall/1]).
start_link() ->
Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]),
@@ -14,6 +14,9 @@ stop() ->
fsync(Path) ->
call_port({fsync, Path}).
+fsyncall(Paths) ->
+ call_port_multi([{fsync, Path} || Path <- Paths]).
+
call_port(Msg) ->
fsyncport ! {call, self(), Msg},
receive
@@ -21,12 +24,31 @@ call_port(Msg) ->
Result
end.
+call_port_multi(Msgs) ->
+ lists:foreach(fun (Msg) ->
+ fsyncport ! {call, self(), Msg}
+ end, Msgs),
+ lists:foldl(fun (_Msg, Acc) ->
+ R = receive
+ {fsyncport, Result} ->
+ Result
+ end,
+ case R of
+ ok ->
+ Acc;
+ Error ->
+ Error
+ end
+ end, ok, Msgs).
+
init(ExtPrg) ->
+ lager:debug("starting fsync service"),
register(fsyncport, self()),
process_flag(trap_exit, true),
Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg},
[{packet, 2}]) end,
lists:seq(1, 32)),
+ lager:debug("fsync service started", []),
loop(Ports).
loop(Ports) ->
@@ -34,12 +56,14 @@ loop(Ports) ->
loop(IdlePorts, BusyPorts, Waiting) ->
receive
{call, Caller, {fsync, Path}} ->
+ lager:debug("fsync incoming request: ~p", [Path]),
case IdlePorts of
[] ->
loop(IdlePorts,
BusyPorts,
queue:in({Caller, Path}, Waiting));
[Port | Rest] ->
+ lager:debug("fsync port ~p assigned to request ~p", [Port, Path]),
Port ! {self(), {command, Path}},
loop(Rest,
dict:store(Port, {Caller, os:timestamp()}, BusyPorts),
@@ -47,6 +71,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
end;
{Port, {data, Data}} when is_port(Port) ->
+ lager:debug("fsync request finished: ~p", [Port]),
{Caller, Starttime} = dict:fetch(Port, BusyPorts),
Stoptime = os:timestamp(),
statreport({fsync, Stoptime, Starttime}),
@@ -65,6 +90,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
NewWaiting)
end;
stop ->
+ lager:debug("fsync stop request received"),
lists:foreach(fun (Port) ->
Port ! {self(), close}
end,
@@ -78,6 +104,7 @@ loop(IdlePorts, BusyPorts, Waiting) ->
exit(normal) %% XXX exits when first port is closed
end;
{'EXIT', Port, _Reason} when is_port(Port) ->
+ lager:debug("fsync port ~p exited, exiting", [Port]),
%% XXX supervisor doesn't restart fsyncport, why?
exit(port_terminated)
end.
diff --git a/src/index.erl b/src/index.erl
index bbc9a10..96195e3 100644
--- a/src/index.erl
+++ b/src/index.erl
@@ -12,7 +12,7 @@
%% TODO: Checksums
-module(index).
--export([get/2, getrange/3, add/3, addlast/2]).
+-export([get/2, getrange/3, add/3, addlast/2, indexsize/1]).
-define(ENTRYSIZE, 32).
-define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)).
@@ -77,7 +77,19 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) ->
util:exit_with_error(badformat, readindex,
"Index line not ending with linefeed").
--spec get(string(), integer()) -> binary().
+-spec indexsize(string()) -> integer().
+indexsize(Basepath) ->
+ case file:open(Basepath, [read, binary]) of
+ {ok, File} ->
+ {ok, Filesize} = file:position(File, eof),
+ lager:debug("file ~p size ~p", [Basepath, Filesize]),
+ Filesize div ?ENTRYSIZEINFILE;
+ {error, Error} ->
+ util:exit_with_error(Error, readfile,
+ "Error opening file for reading")
+ end.
+
+-spec get(string(), integer()) -> binary() | noentry.
get(Basepath, Index) ->
case getrange(Basepath, Index, Index) of
noentry ->
@@ -88,6 +100,7 @@ get(Basepath, Index) ->
-spec getrange(string(), integer(), integer()) -> [binary()].
getrange(Basepath, Start, End) when Start =< End ->
+ lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]),
case file:open(Basepath, [read, binary]) of
{ok, File} ->
{ok, Filesize} = file:position(File, eof),
@@ -98,6 +111,7 @@ getrange(Basepath, Start, End) when Start =< End ->
{ok, EntryText} =
file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)),
Entry = decodedata(EntryText),
+ lager:debug("entries ~p", [length(Entry)]),
file:close(File),
Entry;
true ->
diff --git a/src/perm.erl b/src/perm.erl
index 466cc4f..9f02b55 100644
--- a/src/perm.erl
+++ b/src/perm.erl
@@ -49,25 +49,32 @@ path_for_key(Rootdir, Key) ->
-spec ensurefile(string(), binary(), binary()) -> ok | differ.
ensurefile(Rootdir, Key, Content) ->
+ lager:debug("dir ~p key ~p", [Rootdir, Key]),
{Dirs, Path} = path_for_key(Rootdir, Key),
case readfile_and_verify(Path, Content) of
ok ->
- util:fsync([Path, Rootdir | Dirs]);
+ lager:debug("key ~p existed, fsync", [Key]),
+ util:fsync([Path, Rootdir | Dirs]),
+ lager:debug("key ~p fsynced", [Key]);
differ ->
+ lager:debug("key ~p existed, was different", [Key]),
differ;
{error, enoent} ->
+ lager:debug("key ~p didn't exist, add", [Key]),
util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"]
++ Dirs),
makedir, "Error creating directory"),
NurseryName = Rootdir ++ "nursery/" ++
util:tempfilename(hex:bin_to_hexstr(Key)),
util:write_tempfile_and_rename(Path, NurseryName, Content),
- util:fsync([Path, Rootdir | Dirs]);
+ lager:debug("key ~p added, fsync", [Key]),
+ util:fsync([Path, Rootdir | Dirs]),
+ lager:debug("key ~p fsynced", [Key]);
{error, Error} ->
util:exit_with_error(Error, readfile, "Error reading file")
end.
--spec readfile(string(), binary()) -> binary().
+-spec readfile(string(), binary()) -> binary() | noentry.
readfile(Rootdir, Key) ->
{_Dirs, Path} = path_for_key(Rootdir, Key),
atomic:readfile(Path).
diff --git a/src/plop.erl b/src/plop.erl
index 57febf5..0c85b21 100644
--- a/src/plop.erl
+++ b/src/plop.erl
@@ -189,12 +189,8 @@ get_logid() ->
testing_get_pubkey() ->
gen_server:call(?MODULE, {test, pubkey}).
-fill_in_entry({_Index, LeafHash, notfetched}) ->
- db:get_by_leaf_hash(LeafHash).
-
storage_nodes() ->
- {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}),
- Value.
+ application:get_env(plop, storage_nodes, []).
storage_nodes_quorum() ->
{ok, Value} = application:get_env(plop, storage_nodes_quorum),
@@ -222,6 +218,7 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) ->
[], [{sync, false}]).
store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
+ lager:debug("leafhash ~p", [TreeLeafHash]),
OwnRequestId = make_ref(),
Completion =
@@ -240,6 +237,7 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
{From, Completion,
storage_nodes_quorum()}}),
+ lager:debug("send requests to ~p", [Nodes]),
RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash)
|| URLBase <- Nodes],
PlopWithRequests =
@@ -250,6 +248,9 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) ->
end, PlopWithOwn, RequestIds),
PlopWithRequests.
+fill_in_entry({_Index, LeafHash, notfetched}) ->
+ db:get_by_leaf_hash(LeafHash).
+
%%%%%%%%%%%%%%%%%%%%
handle_call(stop, _From, Plop) ->
{stop, normal, stopped, Plop};
@@ -267,6 +268,7 @@ handle_call({get, logid}, _From,
{reply, LogID, Plop};
handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) ->
+ lager:debug("add leafhash ~p", [TreeLeafHash]),
case storage_nodes() of
[] ->
ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()),
diff --git a/src/plop_app.erl b/src/plop_app.erl
index f90792d..767bf06 100644
--- a/src/plop_app.erl
+++ b/src/plop_app.erl
@@ -4,10 +4,6 @@
-module(plop_app).
-behaviour(application).
-export([start/2, stop/1]).
--export([install/1]).
-
-install(Nodes) ->
- db:init_db(Nodes).
start(normal, Args) ->
plop_sup:start_link(Args).
diff --git a/src/storage.erl b/src/storage.erl
index e09acdb..8136308 100644
--- a/src/storage.erl
+++ b/src/storage.erl
@@ -43,14 +43,11 @@ request(get, "ct/storage/fetchnewentries", _Input) ->
{entries, Entries}]}).
fetchnewhashes(Index) ->
- lists:reverse(fetchnewhashes(Index, [])).
-
-fetchnewhashes(Index, Acc) ->
- case index:get(newentries_path(), Index) of
- noentry ->
- Acc;
- Entry ->
- fetchnewhashes(Index + 1, [Entry | Acc])
+ case index:indexsize(newentries_path()) of
+ 0 ->
+ [];
+ Size ->
+ index:getrange(newentries_path(), Index, Size - 1)
end.
%% Private functions.
diff --git a/src/ts.erl b/src/ts.erl
index 44d27cc..29d93aa 100644
--- a/src/ts.erl
+++ b/src/ts.erl
@@ -22,22 +22,22 @@ new() ->
-spec add(tree_store(), non_neg_integer(), binary()) -> tree_store().
add(S = #tree_store{layers = Layers}, Layer, Entry) ->
- {NewLayers, Array} = layer_rw(Layers, Layer),
- NewArray = array:set(array:size(Array), Entry, Array),
- S#tree_store{layers = array:set(Layer, NewArray, NewLayers)}.
+ {NewLayers, List} = layer_rw(Layers, Layer),
+ NewList = array:set(array:size(List), Entry, List),
+ S#tree_store{layers = array:set(Layer, NewList, NewLayers)}.
-spec delete(tree_store(), non_neg_integer()) -> tree_store().
delete(S = #tree_store{layers = Layers}, Layer) ->
- Array = layer_ro(Layers, Layer),
- NewArray = array:resize(array:size(Array) - 1, Array),
- S#tree_store{layers = array:set(Layer, NewArray, Layers)}.
+ List = layer_ro(Layers, Layer),
+ NewList = array:resize(array:size(List) - 1, List),
+ S#tree_store{layers = array:set(Layer, NewList, Layers)}.
-spec retrieve(tree_store(), tuple()) -> binary() | undefined.
retrieve(#tree_store{layers = Layers}, {Layer, Index}) ->
- Array = layer_ro(Layers, Layer),
- Len = array:size(Array),
+ List = layer_ro(Layers, Layer),
+ Len = array:size(List),
case Index < Len of
- true -> array:get(Index, Array);
+ true -> array:get(Index, List);
false -> undefined
end.
diff --git a/src/util.erl b/src/util.erl
index dd42752..435dbc8 100644
--- a/src/util.erl
+++ b/src/util.erl
@@ -13,15 +13,13 @@ tempfilename(Base) ->
Filename.
-spec fsync([string()]) -> ok.
-fsync([]) ->
- ok;
-fsync([Name | Rest]) ->
- case fsyncport:fsync(Name) of
- ok ->
- fsync(Rest);
- {error, Error} ->
- exit_with_error(fsync, Error, "Error in fsync")
- end.
+fsync(Paths) ->
+ case fsyncport:fsyncall(Paths) of
+ ok ->
+ ok;
+ {error, Error} ->
+ exit_with_error(fsync, Error, "Error in fsync")
+ end.
-spec exit_with_error(atom(), atom(), string()) -> no_return().
exit_with_error(Operation, Error, ErrorMessage) ->