diff options
author | Linus Nordberg <linus@nordberg.se> | 2014-10-29 16:35:44 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordberg.se> | 2014-10-29 16:56:48 +0100 |
commit | 92f681e1cbb444317d2603994c60c02feeab32be (patch) | |
tree | ef62cdfece8c1f063cb27cf299094e1f4d7eed1a | |
parent | b15f4636337c45b487651e8d442afed0d4141725 (diff) | |
parent | cc2aaa2807bb13f4683c2d74a414d39d5b29a372 (diff) |
Merge remote-tracking branch 'refs/remotes/map/external-merge3' into merging-external-merge
Conflicts:
src/db.erl
src/frontend.erl
src/index.erl
src/plop.erl
src/storage.erl
src/ts.erl
-rw-r--r-- | Emakefile | 3 | ||||
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/atomic.erl | 2 | ||||
-rw-r--r-- | src/db.erl | 10 | ||||
-rw-r--r-- | src/frontend.erl | 55 | ||||
-rw-r--r-- | src/fsyncport.erl | 29 | ||||
-rw-r--r-- | src/index.erl | 18 | ||||
-rw-r--r-- | src/perm.erl | 13 | ||||
-rw-r--r-- | src/plop.erl | 12 | ||||
-rw-r--r-- | src/plop_app.erl | 4 | ||||
-rw-r--r-- | src/storage.erl | 13 | ||||
-rw-r--r-- | src/ts.erl | 18 | ||||
-rw-r--r-- | src/util.erl | 16 |
13 files changed, 142 insertions, 53 deletions
@@ -2,4 +2,5 @@ {["src/*", "test/*"], [debug_info, {i, "include/"}, - {outdir, "ebin/"}]}. + {outdir, "ebin/"}, + {parse_transform, lager_transform}]}. @@ -2,7 +2,7 @@ build all: (cd c_src && make all) mkdir -p priv cp c_src/fsynchelper priv/ - erl -make + erl -pa ../lager/ebin -make clean: (cd c_src && make clean) -rm priv/fsynchelper diff --git a/src/atomic.erl b/src/atomic.erl index 5ad48ba..36fba81 100644 --- a/src/atomic.erl +++ b/src/atomic.erl @@ -10,7 +10,7 @@ replacefile(Path, Content) -> util:write_tempfile_and_rename(Path, TempName, Content), util:fsync([Path, filename:dirname(Path)]). --spec readfile(string()) -> binary(). +-spec readfile(string()) -> binary() | noentry. readfile(Path) -> case file:read_file(Path) of {ok, Contents} -> @@ -9,6 +9,7 @@ -export([add/4, add/2, add_entryhash/2, add_index/2, set_treesize/1, size/0]). -export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1]). -export([get_by_entry_hash/1, entry_for_leafhash/1, leafhash_for_index/1]). +-export([leafhash_for_indices/2, indexsize/0]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -19,6 +20,9 @@ size() -> binary_to_integer(atomic:readfile(treesize_path())). +indexsize() -> + index:indexsize(index_path()). + init(_Args) -> {ok, []}. @@ -159,11 +163,13 @@ handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> ok = perm:ensurefile(indexforhash_root_path(), LeafHash, integer_to_binary(Index)), ok = index:add(index_path(), Index, LeafHash), - ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)), + ok = atomic:replacefile(treesize_path(), integer_to_binary(Index+1)), {reply, ok, State}; handle_call({add, {LeafHash, Data}}, _From, State) -> + lager:debug("add leafhash ~p", [LeafHash]), ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + lager:debug("leafhash ~p added", [LeafHash]), {reply, ok, State}; handle_call({add_entryhash, {LeafHash, EntryHash}}, _From, State) -> @@ -177,7 +183,7 @@ handle_call({add_index, {LeafHash, Index}}, _From, State) -> {reply, ok, State}; handle_call({set_treesize, Size}, _From, State) -> - ok = atomic:replacefile(treesize_path(), integer_to_list(Size)), + ok = atomic:replacefile(treesize_path(), integer_to_binary(Size)), {reply, ok, State}; handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) -> diff --git a/src/frontend.erl b/src/frontend.erl index a8a8b9e..9c69517 100644 --- a/src/frontend.erl +++ b/src/frontend.erl @@ -39,13 +39,33 @@ request(post, "ct/frontend/sendsth", Input) -> {error, E} -> html("sendentry: bad input:", E); {struct, PropList} -> + OldSize = db:size(), Treesize = proplists:get_value(<<"tree_size">>, PropList), - - ok = db:set_treesize(Treesize), - - ht:reset_tree([db:size() - 1]), - - success({[{result, <<"ok">>}]}) + Indexsize = db:indexsize(), + + if + Treesize < OldSize -> + html("Size is older than current size", OldSize); + Treesize == OldSize -> + success({[{result, <<"ok">>}]}); + Treesize > Indexsize -> + html("Has too few entries", Indexsize); + true -> + NewEntries = db:leafhash_for_indices(OldSize, Treesize - 1), + lager:debug("old size: ~p new size: ~p entries: ~p", + [OldSize, Treesize, NewEntries]), + + Errors = check_entries(NewEntries, OldSize, Treesize - 1), + + case Errors of + [] -> + ok = db:set_treesize(Treesize), + ht:reset_tree([db:size() - 1]), + success({[{result, <<"ok">>}]}); + _ -> + html("Database not complete", Errors) + end + end end; request(get, "ct/frontend/currentposition", _Query) -> @@ -56,19 +76,40 @@ request(get, "ct/frontend/currentposition", _Query) -> request(get, "ct/frontend/missingentries", _Query) -> Size = db:size(), Missing = fetchmissingentries(Size), + lager:debug("missingentries: ~p", [Missing]), success({[{result, <<"ok">>}, - {entries, Missing}]}). + {entries, lists:map(fun (Entry) -> base64:encode(Entry) end, + Missing)}]}). +check_entries(Entries, Start, End) -> + lists:foldl(fun ({Hash, Index}, Acc) -> + case check_entry(Hash, Index) of + ok -> + Acc; + Error -> + [Error | Acc] + end + end, [], lists:zip(Entries, lists:seq(Start, End))). + +check_entry(Hash, Index) -> + case db:get_by_leaf_hash(Hash) of + notfound -> + {notfound, Index}; + _ -> + ok + end. fetchmissingentries(Index) -> lists:reverse(fetchmissingentries(Index, [])). fetchmissingentries(Index, Acc) -> + lager:debug("index ~p", [Index]), case db:leafhash_for_index(Index) of noentry -> Acc; Hash -> case db:entry_for_leafhash(Hash) of noentry -> + lager:debug("didn't find hash ~p", [Hash]), fetchmissingentries(Index + 1, [Hash | Acc]); _ -> fetchmissingentries(Index + 1, Acc) diff --git a/src/fsyncport.erl b/src/fsyncport.erl index 7e2bf11..c9be44d 100644 --- a/src/fsyncport.erl +++ b/src/fsyncport.erl @@ -3,7 +3,7 @@ -module(fsyncport). -export([start_link/0, stop/0, init/1]). --export([fsync/1]). +-export([fsync/1, fsyncall/1]). start_link() -> Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), @@ -14,6 +14,9 @@ stop() -> fsync(Path) -> call_port({fsync, Path}). +fsyncall(Paths) -> + call_port_multi([{fsync, Path} || Path <- Paths]). + call_port(Msg) -> fsyncport ! {call, self(), Msg}, receive @@ -21,12 +24,31 @@ call_port(Msg) -> Result end. +call_port_multi(Msgs) -> + lists:foreach(fun (Msg) -> + fsyncport ! {call, self(), Msg} + end, Msgs), + lists:foldl(fun (_Msg, Acc) -> + R = receive + {fsyncport, Result} -> + Result + end, + case R of + ok -> + Acc; + Error -> + Error + end + end, ok, Msgs). + init(ExtPrg) -> + lager:debug("starting fsync service"), register(fsyncport, self()), process_flag(trap_exit, true), Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, [{packet, 2}]) end, lists:seq(1, 32)), + lager:debug("fsync service started", []), loop(Ports). loop(Ports) -> @@ -34,12 +56,14 @@ loop(Ports) -> loop(IdlePorts, BusyPorts, Waiting) -> receive {call, Caller, {fsync, Path}} -> + lager:debug("fsync incoming request: ~p", [Path]), case IdlePorts of [] -> loop(IdlePorts, BusyPorts, queue:in({Caller, Path}, Waiting)); [Port | Rest] -> + lager:debug("fsync port ~p assigned to request ~p", [Port, Path]), Port ! {self(), {command, Path}}, loop(Rest, dict:store(Port, {Caller, os:timestamp()}, BusyPorts), @@ -47,6 +71,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> end; {Port, {data, Data}} when is_port(Port) -> + lager:debug("fsync request finished: ~p", [Port]), {Caller, Starttime} = dict:fetch(Port, BusyPorts), Stoptime = os:timestamp(), statreport({fsync, Stoptime, Starttime}), @@ -65,6 +90,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> NewWaiting) end; stop -> + lager:debug("fsync stop request received"), lists:foreach(fun (Port) -> Port ! {self(), close} end, @@ -78,6 +104,7 @@ loop(IdlePorts, BusyPorts, Waiting) -> exit(normal) %% XXX exits when first port is closed end; {'EXIT', Port, _Reason} when is_port(Port) -> + lager:debug("fsync port ~p exited, exiting", [Port]), %% XXX supervisor doesn't restart fsyncport, why? exit(port_terminated) end. diff --git a/src/index.erl b/src/index.erl index bbc9a10..96195e3 100644 --- a/src/index.erl +++ b/src/index.erl @@ -12,7 +12,7 @@ %% TODO: Checksums -module(index). --export([get/2, getrange/3, add/3, addlast/2]). +-export([get/2, getrange/3, add/3, addlast/2, indexsize/1]). -define(ENTRYSIZE, 32). -define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). @@ -77,7 +77,19 @@ decodedata(<<_:?ENTRYSIZE/binary-unit:16, _>>, _Acc) -> util:exit_with_error(badformat, readindex, "Index line not ending with linefeed"). --spec get(string(), integer()) -> binary(). +-spec indexsize(string()) -> integer(). +indexsize(Basepath) -> + case file:open(Basepath, [read, binary]) of + {ok, File} -> + {ok, Filesize} = file:position(File, eof), + lager:debug("file ~p size ~p", [Basepath, Filesize]), + Filesize div ?ENTRYSIZEINFILE; + {error, Error} -> + util:exit_with_error(Error, readfile, + "Error opening file for reading") + end. + +-spec get(string(), integer()) -> binary() | noentry. get(Basepath, Index) -> case getrange(Basepath, Index, Index) of noentry -> @@ -88,6 +100,7 @@ get(Basepath, Index) -> -spec getrange(string(), integer(), integer()) -> [binary()]. getrange(Basepath, Start, End) when Start =< End -> + lager:debug("path ~p start ~p end ~p", [Basepath, Start, End]), case file:open(Basepath, [read, binary]) of {ok, File} -> {ok, Filesize} = file:position(File, eof), @@ -98,6 +111,7 @@ getrange(Basepath, Start, End) when Start =< End -> {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE * (End - Start + 1)), Entry = decodedata(EntryText), + lager:debug("entries ~p", [length(Entry)]), file:close(File), Entry; true -> diff --git a/src/perm.erl b/src/perm.erl index 466cc4f..9f02b55 100644 --- a/src/perm.erl +++ b/src/perm.erl @@ -49,25 +49,32 @@ path_for_key(Rootdir, Key) -> -spec ensurefile(string(), binary(), binary()) -> ok | differ. ensurefile(Rootdir, Key, Content) -> + lager:debug("dir ~p key ~p", [Rootdir, Key]), {Dirs, Path} = path_for_key(Rootdir, Key), case readfile_and_verify(Path, Content) of ok -> - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p existed, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); differ -> + lager:debug("key ~p existed, was different", [Key]), differ; {error, enoent} -> + lager:debug("key ~p didn't exist, add", [Key]), util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"] ++ Dirs), makedir, "Error creating directory"), NurseryName = Rootdir ++ "nursery/" ++ util:tempfilename(hex:bin_to_hexstr(Key)), util:write_tempfile_and_rename(Path, NurseryName, Content), - util:fsync([Path, Rootdir | Dirs]); + lager:debug("key ~p added, fsync", [Key]), + util:fsync([Path, Rootdir | Dirs]), + lager:debug("key ~p fsynced", [Key]); {error, Error} -> util:exit_with_error(Error, readfile, "Error reading file") end. --spec readfile(string(), binary()) -> binary(). +-spec readfile(string(), binary()) -> binary() | noentry. readfile(Rootdir, Key) -> {_Dirs, Path} = path_for_key(Rootdir, Key), atomic:readfile(Path). diff --git a/src/plop.erl b/src/plop.erl index 57febf5..0c85b21 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -189,12 +189,8 @@ get_logid() -> testing_get_pubkey() -> gen_server:call(?MODULE, {test, pubkey}). -fill_in_entry({_Index, LeafHash, notfetched}) -> - db:get_by_leaf_hash(LeafHash). - storage_nodes() -> - {ok, Value} = application:get_env(plop, storage_nodes, {ok, []}), - Value. + application:get_env(plop, storage_nodes, []). storage_nodes_quorum() -> {ok, Value} = application:get_env(plop, storage_nodes_quorum), @@ -222,6 +218,7 @@ send_storage_entrycommitted(URLBase, EntryHash, TreeLeafHash) -> [], [{sync, false}]). store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> + lager:debug("leafhash ~p", [TreeLeafHash]), OwnRequestId = make_ref(), Completion = @@ -240,6 +237,7 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> {From, Completion, storage_nodes_quorum()}}), + lager:debug("send requests to ~p", [Nodes]), RequestIds = [send_storage_sendentry(URLBase, LogEntry, TreeLeafHash) || URLBase <- Nodes], PlopWithRequests = @@ -250,6 +248,9 @@ store_at_all_nodes(Nodes, {LogEntry, TreeLeafHash, EntryHash}, From, State) -> end, PlopWithOwn, RequestIds), PlopWithRequests. +fill_in_entry({_Index, LeafHash, notfetched}) -> + db:get_by_leaf_hash(LeafHash). + %%%%%%%%%%%%%%%%%%%% handle_call(stop, _From, Plop) -> {stop, normal, stopped, Plop}; @@ -267,6 +268,7 @@ handle_call({get, logid}, _From, {reply, LogID, Plop}; handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, From, Plop) -> + lager:debug("add leafhash ~p", [TreeLeafHash]), case storage_nodes() of [] -> ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), diff --git a/src/plop_app.erl b/src/plop_app.erl index f90792d..767bf06 100644 --- a/src/plop_app.erl +++ b/src/plop_app.erl @@ -4,10 +4,6 @@ -module(plop_app). -behaviour(application). -export([start/2, stop/1]). --export([install/1]). - -install(Nodes) -> - db:init_db(Nodes). start(normal, Args) -> plop_sup:start_link(Args). diff --git a/src/storage.erl b/src/storage.erl index e09acdb..8136308 100644 --- a/src/storage.erl +++ b/src/storage.erl @@ -43,14 +43,11 @@ request(get, "ct/storage/fetchnewentries", _Input) -> {entries, Entries}]}). fetchnewhashes(Index) -> - lists:reverse(fetchnewhashes(Index, [])). - -fetchnewhashes(Index, Acc) -> - case index:get(newentries_path(), Index) of - noentry -> - Acc; - Entry -> - fetchnewhashes(Index + 1, [Entry | Acc]) + case index:indexsize(newentries_path()) of + 0 -> + []; + Size -> + index:getrange(newentries_path(), Index, Size - 1) end. %% Private functions. @@ -22,22 +22,22 @@ new() -> -spec add(tree_store(), non_neg_integer(), binary()) -> tree_store(). add(S = #tree_store{layers = Layers}, Layer, Entry) -> - {NewLayers, Array} = layer_rw(Layers, Layer), - NewArray = array:set(array:size(Array), Entry, Array), - S#tree_store{layers = array:set(Layer, NewArray, NewLayers)}. + {NewLayers, List} = layer_rw(Layers, Layer), + NewList = array:set(array:size(List), Entry, List), + S#tree_store{layers = array:set(Layer, NewList, NewLayers)}. -spec delete(tree_store(), non_neg_integer()) -> tree_store(). delete(S = #tree_store{layers = Layers}, Layer) -> - Array = layer_ro(Layers, Layer), - NewArray = array:resize(array:size(Array) - 1, Array), - S#tree_store{layers = array:set(Layer, NewArray, Layers)}. + List = layer_ro(Layers, Layer), + NewList = array:resize(array:size(List) - 1, List), + S#tree_store{layers = array:set(Layer, NewList, Layers)}. -spec retrieve(tree_store(), tuple()) -> binary() | undefined. retrieve(#tree_store{layers = Layers}, {Layer, Index}) -> - Array = layer_ro(Layers, Layer), - Len = array:size(Array), + List = layer_ro(Layers, Layer), + Len = array:size(List), case Index < Len of - true -> array:get(Index, Array); + true -> array:get(Index, List); false -> undefined end. diff --git a/src/util.erl b/src/util.erl index dd42752..435dbc8 100644 --- a/src/util.erl +++ b/src/util.erl @@ -13,15 +13,13 @@ tempfilename(Base) -> Filename. -spec fsync([string()]) -> ok. -fsync([]) -> - ok; -fsync([Name | Rest]) -> - case fsyncport:fsync(Name) of - ok -> - fsync(Rest); - {error, Error} -> - exit_with_error(fsync, Error, "Error in fsync") - end. +fsync(Paths) -> + case fsyncport:fsyncall(Paths) of + ok -> + ok; + {error, Error} -> + exit_with_error(fsync, Error, "Error in fsync") + end. -spec exit_with_error(atom(), atom(), string()) -> no_return(). exit_with_error(Operation, Error, ErrorMessage) -> |