diff options
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | README | 25 | ||||
-rw-r--r-- | c_src/Makefile | 13 | ||||
-rw-r--r-- | c_src/erlport.c | 105 | ||||
-rw-r--r-- | c_src/erlport.h | 15 | ||||
-rw-r--r-- | c_src/fsynchelper.c | 64 | ||||
-rw-r--r-- | c_src/net_read_write.c | 93 | ||||
-rw-r--r-- | c_src/net_read_write.h | 10 | ||||
-rw-r--r-- | ebin/plop.app | 2 | ||||
-rw-r--r-- | src/atomic.erl | 24 | ||||
-rw-r--r-- | src/db.erl | 184 | ||||
-rw-r--r-- | src/fsyncport.erl | 88 | ||||
-rw-r--r-- | src/hex.erl | 1 | ||||
-rw-r--r-- | src/index.erl | 87 | ||||
-rw-r--r-- | src/perm.erl | 75 | ||||
-rw-r--r-- | src/plop.erl | 2 | ||||
-rw-r--r-- | src/plop_sup.erl | 5 | ||||
-rw-r--r-- | src/util.erl | 57 |
18 files changed, 716 insertions, 139 deletions
@@ -1,4 +1,9 @@ build all: + (cd c_src && make all) + mkdir -p priv + cp c_src/fsynchelper priv/ erl -make clean: + (cd c_src && make clean) + -rm priv/fsynchelper -rm ebin/*.beam @@ -8,31 +8,10 @@ Requires Erlang/OTP 17 [erts-6.0] or later. Compile the application - $ erl -make - -Start the application locally - - Very first time, before there is a database: - $ erl -boot start_sasl -pa ebin -eval "plop_app:install([node()])." - There should now exist a directory Mnesia.nonode@nohost/ with four - files in it. - - Start the application: - $ erl -boot start_sasl -pa ebin \ - -eval "application:start(mnesia), application:start(plop)." - - FIXME: mnesia isn't starting automagically, why? - TODO: -plop Keyfile "test/rsakey.pem" -plop Passphrase "sikrit" + $ make Test the application [FIXME] -Moving the database files - - Add `-mnesia dir "/some/path"' to the list of arguments to erl. - -Debugging - - Dump the database to a file: - 1> db:dump_to_file("dump.txt"). +TODO: -plop Keyfile "test/rsakey.pem" -plop Passphrase "sikrit" diff --git a/c_src/Makefile b/c_src/Makefile new file mode 100644 index 0000000..338dc6d --- /dev/null +++ b/c_src/Makefile @@ -0,0 +1,13 @@ +CC = gcc +CFLAGS = -Wall +LDFLAGS = + +PORTS = fsynchelper + +all: $(PORTS) + +clean: + rm -f *.o $(PORTS) + +fsynchelper: net_read_write.o erlport.o fsynchelper.o + $(CC) $(LDFLAGS) -o fsynchelper net_read_write.o erlport.o fsynchelper.o diff --git a/c_src/erlport.c b/c_src/erlport.c new file mode 100644 index 0000000..5e5c17c --- /dev/null +++ b/c_src/erlport.c @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> + +#include "net_read_write.h" +#include "erlport.h" + +static ssize_t +read_length(size_t length_size) +{ + unsigned char buf[2]; + + if (length_size != 2) { + return -1; + } + + if (length_size > sizeof(buf)) { + return -1; + } + + ssize_t ret; + + ret = net_read(0, (char *)buf, length_size); + + if (ret != (ssize_t) length_size) { + return -1; + } + + return (ssize_t)(((unsigned long)buf[0] << 8) | (unsigned long)buf[1]); +} + +ssize_t +read_command(char *buf, size_t maxlen) +{ + ssize_t len; + + len = read_length(2); + + if (len < 0) { + return -1; + } + + if (len > (ssize_t) maxlen) { + return -1; + } + return net_read(0, buf, (size_t)len); +} + +static int +write_length(size_t len, size_t length_size) +{ + unsigned char buf[2]; + + if (length_size != 2) { + return -1; + } + + buf[0] = (len >> 8) & 0xff; + buf[1] = len & 0xff; + + ssize_t ret; + + ret = net_write(1, (char *)buf, length_size); + + if (ret < 0) { + return -1; + } + + if (ret != (ssize_t) length_size) { + return -1; + } + + return 0; +} + +static int +write_reply(char *msg, size_t len) +{ + ssize_t ret; + + ret = write_length(len, 2); + if (ret < 0) { + return -1; + } + ret = net_write(1, msg, len); + if (ret < 0) { + return -1; + } + + return 0; +} + +int +write_status(char *msg) +{ + return write_reply(msg, strlen(msg)); +} diff --git a/c_src/erlport.h b/c_src/erlport.h new file mode 100644 index 0000000..49e1b7c --- /dev/null +++ b/c_src/erlport.h @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#ifndef ERLPORT_H +#define ERLPORT_H + +ssize_t +read_command(char *buf, size_t len); + +int +write_status(char *msg); + +#endif diff --git a/c_src/fsynchelper.c b/c_src/fsynchelper.c new file mode 100644 index 0000000..e6a04be --- /dev/null +++ b/c_src/fsynchelper.c @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Kungliga Tekniska Högskolan + * (KTH Royal Institute of Technology, Stockholm, Sweden). + */ + +#include <stdio.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> + +#include <sys/time.h> +#include <sys/select.h> + +#include "erlport.h" + +static int +dosync(int fd) +{ +#ifdef F_FULLFSYNC + int ret = fcntl(fd, F_FULLFSYNC); +#else + int ret = fsync(fd); +#endif + return ret; +} + +int +main() +{ + char buf[100]; + ssize_t len; + + /* XXX: exits when command size is 0 */ + + while ((len = read_command(buf, sizeof(buf)-1)) > 0) { + buf[len] = '\0'; + while (1) { + int fd; + + fd = open(buf, O_RDONLY); + if (fd == -1) { + /* XXX: better errors */ + write_status("openerror"); + break; + } + + if (dosync(fd) == 0) { + write_status("ok"); + } else if (errno == EBADF) { + write_status("ebadf"); + } else if (errno == EINTR) { + close(fd); + continue; + } else { + write_status("fsyncerror"); + } + + close(fd); + break; + } + } + + return 0; +} diff --git a/c_src/net_read_write.c b/c_src/net_read_write.c new file mode 100644 index 0000000..f8f14f0 --- /dev/null +++ b/c_src/net_read_write.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 1995, 1996, 1997, 1998 Kungliga Tekniska Högskolan + * (Royal Institute of Technology, Stockholm, Sweden). + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include <stdlib.h> +#include <string.h> +#include <errno.h> + +#include "net_read_write.h" + +/* + * Like read but never return partial data. + */ + +ssize_t +net_read (int fd, void *buf, size_t nbytes) +{ + char *cbuf = (char *)buf; + ssize_t count; + size_t rem = nbytes; + + while (rem > 0) { + count = read (fd, cbuf, rem); + if (count < 0) { + if (errno == EINTR) + continue; + else + return count; + } else if (count == 0) { + return count; + } + cbuf += (size_t) count; + rem -= (size_t) count; + } + return (ssize_t)nbytes; +} + +/* + * Like write but never return partial data. + */ + +ssize_t +net_write (int fd, const void *buf, size_t nbytes) +{ + const char *cbuf = (const char *)buf; + ssize_t count; + size_t rem = nbytes; + + while (rem > 0) { + count = write (fd, cbuf, rem); + if (count < 0) { + if (errno == EINTR) + continue; + else + return count; + } + cbuf += (size_t)count; + rem -= (size_t)count; + } + return (ssize_t)nbytes; +} diff --git a/c_src/net_read_write.h b/c_src/net_read_write.h new file mode 100644 index 0000000..80b92b3 --- /dev/null +++ b/c_src/net_read_write.h @@ -0,0 +1,10 @@ +#ifndef NET_READ_WRITE_H +#define NET_READ_WRITE_H + +ssize_t +net_read (int, void *, size_t); + +ssize_t +net_write (int, const void *, size_t); + +#endif diff --git a/ebin/plop.app b/ebin/plop.app index d017331..a2a0dd4 100644 --- a/ebin/plop.app +++ b/ebin/plop.app @@ -6,7 +6,7 @@ [{description, "The plop store"}, {vsn, "0.2.0-dev"}, {modules, [plop_app, plop_sup, plop, db, ht, hex]}, - {applications, [kernel, stdlib, mnesia]}, % crypto, public_key + {applications, [kernel, stdlib]}, % crypto, public_key {registered, [plop, ht, db]}, {mod, {plop_app, ["test/eckey.pem", "test/eckey-public.pem"]}} ]}. diff --git a/src/atomic.erl b/src/atomic.erl new file mode 100644 index 0000000..5bf5670 --- /dev/null +++ b/src/atomic.erl @@ -0,0 +1,24 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(atomic). +-export([replacefile/2, readfile/1]). + +-spec replacefile(string(), binary()) -> ok. +replacefile(Path, Content) -> + TempName = util:tempfilename(Path), + util:write_tempfile_and_rename(Path, TempName, Content), + util:fsync([Path, filename:dirname(Path)]). + +-spec readfile(string()) -> binary(). +readfile(Path) -> + case file:read_file(Path) of + {ok, Contents} -> + Contents; + {error, enoent} -> + noentry; + {error, Error} -> + util:exit_with_error(readfile, Error, "Error reading file") + end. @@ -6,11 +6,8 @@ %% API. -export([start_link/0, stop/0]). --export([init_db/0, init_db/1, init_tables/0, init_tables/1]). -export([add/4, size/0]). -export([get_by_index/1, get_by_indices/3, get_by_leaf_hash/1, get_by_entry_hash/1]). -%% API for testing. --export([dump/1, destroy_tables/0, info_tables/0, dump_to_file/1]). %% gen_server callbacks. -export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, code_change/3]). @@ -19,50 +16,11 @@ -include("db.hrl"). -include("$CTROOT/plop/include/plop.hrl"). -%% @doc Set up a database schema on all nodes that are to be included -%% in the "database cluster". Has to be run _before_ mnesia has been -%% started. -init_db() -> - init_db([node()]). -init_db(Nodes) -> - ok = mnesia:create_schema(Nodes), - rpc:multicall(Nodes, application, start, [mnesia]), - init_tables(Nodes), - rpc:multicall(Nodes, application, stop, [mnesia]). - -%% @doc Run once, or rather every time you start on a new database. -%% If run more than once, we'll get {aborted, {already_exists, TABLE}}. -init_tables() -> - init_tables([node()]). -init_tables(Nodes) -> - %% We've once upon a time invoked mnesia:create_schema/1 with the - %% nodes that will be part of the database. - RamCopies = [], - DiscCopies = [], - DiscOnlyCopies = Nodes, - mnesia:start(), - {atomic, ok} = - mnesia:create_table(plop, - [{type, set}, - {ram_copies, RamCopies}, - {disc_copies, DiscCopies}, - {disc_only_copies, DiscOnlyCopies}, - {attributes, record_info(fields, plop)}, - {majority, true}]), - {atomic, ok} = mnesia:add_table_index(plop, entryhash), - {atomic, ok} = mnesia:add_table_index(plop, mtlhash). - -destroy_tables() -> - mnesia:delete_table(plop). -info_tables() -> - mnesia:table_info(plop, all). -dump_to_file(Filename) -> - mnesia:dump_to_textfile(Filename). size() -> - mnesia:table_info(plop, size). + binary_to_integer(atomic:readfile(treesize_path())). init(_Args) -> - {mnesia:wait_for_tables([plop], 5000), []}. + {ok, []}. start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -99,10 +57,6 @@ get_by_leaf_hash(LeafHash) -> get_by_entry_hash(EntryHash) -> gen_server:call(?MODULE, {get_by_entry_hash, EntryHash}). -%% Testing and debugging. -dump(Table) -> - gen_server:call(?MODULE, {dump, Table}). - %%%%%%%%%%%%%%%%%%%% %% gen_server callbacks. @@ -122,81 +76,83 @@ terminate(_Reason, _State) -> %%%%%%%%%%%%%%%%%%%% %% The meat. +% Table for Leaf hash -> Entry +entry_root_path() -> + {ok, Value} = application:get_env(plop, entry_root_path), + Value. + +% Table for Leaf hash -> Entry +indexforhash_root_path() -> + {ok, Value} = application:get_env(plop, indexforhash_root_path), + Value. + +% Table for Index -> Leaf hash +index_path() -> + {ok, Value} = application:get_env(plop, index_path), + Value. + +% Table for Entry hash -> Leaf hash +entryhash_root_path() -> + {ok, Value} = application:get_env(plop, entryhash_root_path), + Value. + +% File that stores tree size +treesize_path() -> + {ok, Value} = application:get_env(plop, treesize_path), + Value. + + +entry_for_leafhash(LeafHash) -> + perm:readfile(entry_root_path(), LeafHash). + +index_for_leafhash(LeafHash) -> + binary_to_integer(perm:readfile(indexforhash_root_path(), LeafHash)). + +leafhash_for_index(Index) -> + index:get(index_path(), Index). + +leafhash_for_entryhash(EntryHash) -> + perm:readfile(entryhash_root_path(), EntryHash). + handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call({add, {LeafHash, EntryHash, Data, Index}}, _From, State) -> - R = mnesia:transaction( - fun() -> - mnesia:write( - #plop{ - index = Index, - mtlhash = LeafHash, - entryhash = EntryHash, - logentry = Data}) - end), - {reply, R, State}; - -handle_call({get_by_indices, {Start, End, Sorted}}, _From, State) -> - R = case Sorted of - false -> - select_index(Start, End); - true -> - %% FIXME: RAM hog -- how bad is it? - lists:sort(select_index(Start, End)) - end, + ok = perm:ensurefile(entry_root_path(), LeafHash, Data), + ok = perm:ensurefile(entryhash_root_path(), EntryHash, LeafHash), + ok = perm:ensurefile(indexforhash_root_path(), + LeafHash, integer_to_binary(Index)), + ok = index:add(index_path(), Index, LeafHash), + ok = atomic:replacefile(treesize_path(), integer_to_list(Index+1)), + {reply, ok, State}; + +handle_call({get_by_indices, {Start, End, _Sorted}}, _From, State) -> + R = lists:map(fun (Index) -> + LeafHash = leafhash_for_index(Index), + Entry = entry_for_leafhash(LeafHash), + {Index, LeafHash, Entry} + end, lists:seq(Start, End)), {reply, R, State}; handle_call({get_by_index, Index}, _From, State) -> - {reply, - find_entry(fun() -> mnesia:read(plop, Index) end), - State}; + LeafHash = leafhash_for_index(Index), + Entry = entry_for_leafhash(LeafHash), + R = {Index, LeafHash, Entry}, + {reply, R, State}; handle_call({get_by_leaf_hash, LeafHash}, _From, State) -> - {reply, - find_entry(fun() -> - mnesia:index_read(plop, LeafHash, #plop.mtlhash) - end), - State}; + Entry = entry_for_leafhash(LeafHash), + Index = index_for_leafhash(LeafHash), + R = {Index, LeafHash, Entry}, + {reply, R, State}; handle_call({get_by_entry_hash, EntryHash}, _From, State) -> - {reply, - find_entry(fun() -> - mnesia:index_read(plop, EntryHash, #plop.entryhash) - end), - State}; - -handle_call({dump, Table}, _From, State) -> - R = mnesia:transaction( - fun() -> - Q = qlc:q([E || E <- mnesia:table(Table)]), - qlc:e(Q) - end), - {reply, R, State}. - -%%%%%%%%%%%%%%%%%%%% -%% Helper functions. - --spec select_index(non_neg_integer(), non_neg_integer()) -> - [{non_neg_integer(), binary(), binary()}]. -select_index(Start, End) -> - F = fun() -> - %% Get index, mtlhash and logentry. - MatchHead = {plop, '$1', '$2', '_', '$3'}, - Guard = [{'>=', '$1', Start}, {'=<', '$1', End}], - Result = ['$$'], - mnesia:select(plop, [{MatchHead, Guard, Result}]) + R = case leafhash_for_entryhash(EntryHash) of + noentry -> + notfound; + LeafHash -> + Entry = entry_for_leafhash(LeafHash), + Index = index_for_leafhash(LeafHash), + {Index, LeafHash, Entry} end, - {atomic, Res} = mnesia:transaction(F), - Res. - --spec find_entry(fun()) -> notfound | - {non_neg_integer(), binary(), binary()} | - duplicate. -find_entry(Fun) -> - {atomic, Result} = mnesia:transaction(Fun), - case Result of - [] -> notfound; - [#plop{index = I, mtlhash = H, logentry = E}] -> {I, H, E}; - _ -> duplicate - end. + {reply, R, State}. diff --git a/src/fsyncport.erl b/src/fsyncport.erl new file mode 100644 index 0000000..8bc8c60 --- /dev/null +++ b/src/fsyncport.erl @@ -0,0 +1,88 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(fsyncport). +-export([start_link/0, stop/0, init/1]). +-export([fsync/1]). + +start_link() -> + Pid = spawn(?MODULE, init, [code:priv_dir(plop) ++ "/fsynchelper"]), + {ok, Pid}. +stop() -> + fsyncport ! stop. + +fsync(Path) -> + call_port({fsync, Path}). + +call_port(Msg) -> + fsyncport ! {call, self(), Msg}, + receive + {fsyncport, Result} -> + Result + end. + +init(ExtPrg) -> + register(fsyncport, self()), + process_flag(trap_exit, true), + Ports = lists:map(fun(_N) -> open_port({spawn_executable, ExtPrg}, + [{packet, 2}]) end, + lists:seq(1, 32)), + loop(Ports). + +loop(Ports) -> + loop(Ports, dict:new(), queue:new()). +loop(IdlePorts, BusyPorts, Waiting) -> + receive + {call, Caller, {fsync, Path}} -> + case IdlePorts of + [] -> + loop(IdlePorts, + BusyPorts, + queue:in({Caller, Path}, Waiting)); + [Port | Rest] -> + Port ! {self(), {command, Path}}, + loop(Rest, + dict:store(Port, {Caller, os:timestamp()}, BusyPorts), + Waiting) + end; + + {Port, {data, Data}} when is_port(Port) -> + {Caller, Starttime} = dict:fetch(Port, BusyPorts), + Stoptime = os:timestamp(), + statreport({fsync, Stoptime, Starttime}), + Caller ! {fsyncport, list_to_atom(Data)}, + case queue:out(Waiting) of + {empty, _} -> + loop([Port | IdlePorts], + dict:erase(Port, BusyPorts), + Waiting); + {{value, {NewCaller, NewPath}}, NewWaiting} -> + IdlePorts = [], + Port ! {self(), {command, NewPath}}, + loop(IdlePorts, + dict:store(Port, {NewCaller, os:timestamp()}, + BusyPorts), + NewWaiting) + end; + stop -> + lists:foreach(fun (Port) -> + Port ! {self(), close} + end, + IdlePorts), + lists:foreach(fun ({Port, {_Caller, _Starttime}}) -> + Port ! {self(), close} + end, + dict:to_list(BusyPorts)), + receive + {Port, closed} when is_port(Port) -> + exit(normal) %% XXX exits when first port is closed + end; + {'EXIT', Port, _Reason} when is_port(Port) -> + %% XXX supervisor doesn't restart fsyncport, why? + exit(port_terminated) + end. + +statreport(_Entry) -> + none. diff --git a/src/hex.erl b/src/hex.erl index e3c8441..1eb1e6a 100644 --- a/src/hex.erl +++ b/src/hex.erl @@ -4,6 +4,7 @@ -module(hex). -export([bin_to_hexstr/1,hexstr_to_bin/1]). +-spec bin_to_hexstr(binary()) -> string(). bin_to_hexstr(Bin) -> lists:flatten([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(Bin)]). diff --git a/src/index.erl b/src/index.erl new file mode 100644 index 0000000..5fd468b --- /dev/null +++ b/src/index.erl @@ -0,0 +1,87 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +%% Implements an interface to a file pair (basename and basename.chksum) +%% that stores an ordered list of fixed-size entries. Entries can be +%% added at the end and are retrieved by index. The list can also be +%% truncated. +%% +%% Writes(add, truncate, addlast) need to be serialized. + +%% TODO: Checksums + +-module(index). +-export([get/2, add/3, addlast/2, truncate/2]). + +-define(ENTRYSIZE, 32). +-define(ENTRYSIZEINFILE, (?ENTRYSIZE*2+1)). + +-spec add(string(), integer() | last, binary()) -> ok. +add(Basepath, Index, Entry) when is_binary(Entry), size(Entry) == ?ENTRYSIZE -> + case file:open(Basepath, [read, write, binary]) of + {ok, File} -> + {ok, Position} = file:position(File, eof), + case Index of + last when Position rem ?ENTRYSIZEINFILE == 0 -> + ok; + Index when is_integer(Index), + Index * ?ENTRYSIZEINFILE == Position -> + ok + end, + EntryText = hex:bin_to_hexstr(Entry) ++ "\n", + ok = file:write(File, EntryText), + ok = file:close(File), + util:fsync([Basepath, filename:dirname(Basepath)]); + {error, Error} -> + util:exit_with_error(Error, writefile, + "Error opening file for writing") + end. + +truncate(Basepath, Index) -> + case file:open(Basepath, [read, write, binary]) of + {ok, File} -> + {ok, _Position} = file:position(File, Index * ?ENTRYSIZEINFILE), + ok = file:truncate(File), + ok = file:close(File), + util:fsync([Basepath, filename:dirname(Basepath)]); + {error, Error} -> + util:exit_with_error(Error, writefile, + "Error opening file for writing") + end. + + +-spec addlast(string(), integer()) -> ok. +addlast(Basepath, Entry) -> + add(Basepath, last, Entry). + +decodedata(EntryText) when length(EntryText) == ?ENTRYSIZEINFILE -> + case [lists:last(EntryText)] of + "\n" -> + hex:hexstr_to_bin(lists:droplast(EntryText)); + _ -> + util:exit_with_error(badformat, readindex, + "Index line not ending with linefeed") + end. + +-spec get(string(), integer()) -> binary(). +get(Basepath, Index) -> + case file:open(Basepath, [read, binary]) of + {ok, File} -> + {ok, Filesize} = file:position(File, eof), + if + Index * ?ENTRYSIZEINFILE + ?ENTRYSIZEINFILE =< Filesize -> + {ok, _Position} = file:position(File, + Index * ?ENTRYSIZEINFILE), + {ok, EntryText} = file:read(File, ?ENTRYSIZEINFILE), + Entry = decodedata(binary_to_list(EntryText)), + file:close(File), + Entry; + true -> + noentry + end; + {error, Error} -> + util:exit_with_error(Error, readfile, + "Error opening file for reading") + end. diff --git a/src/perm.erl b/src/perm.erl new file mode 100644 index 0000000..ccb23bc --- /dev/null +++ b/src/perm.erl @@ -0,0 +1,75 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(perm). +-export([ensurefile/3, readfile/2]). + +-spec readfile_and_verify(string(), binary()) -> ok | differ | {error, atom()}. +readfile_and_verify(Name, Content) -> + case file:read_file(Name) of + {ok, ContentsRead} when Content == ContentsRead -> + ok; + {ok, _ContentsRead} -> + differ; + {error, Error} -> + {error, Error} + end. + +-spec make_dir(string()) -> ok | {error, atom()}. +make_dir(Name) -> + case file:make_dir(Name) of + ok -> + ok; + {error, eexist} -> + ok; + {error, Error} -> + {error, Error} + end. + +-spec make_dirs([string()]) -> ok | {error, atom()}. +make_dirs([]) -> + ok; +make_dirs([Name | Rest]) -> + case make_dir(Name) of + ok -> + make_dirs(Rest); + {error, Error} -> + {error, Error} + end. + +-spec path_for_key(string(), binary()) -> {[string()], string()}. +path_for_key(Rootdir, Key) -> + Name = hex:bin_to_hexstr(Key), + [C1, C2, C3, C4, C5, C6 | _] = Name, + Firstlevel = Rootdir ++ [C1, C2], + Secondlevel = Firstlevel ++ "/" ++ [C3, C4], + Thirdlevel = Secondlevel ++ "/" ++ [C5, C6], + Fullpath = Thirdlevel ++ "/" ++ Name, + {[Firstlevel, Secondlevel, Thirdlevel], Fullpath}. + +-spec ensurefile(string(), binary(), binary()) -> ok | differ. +ensurefile(Rootdir, Key, Content) -> + {Dirs, Path} = path_for_key(Rootdir, Key), + case readfile_and_verify(Path, Content) of + ok -> + util:fsync([Path, Rootdir | Dirs]); + differ -> + differ; + {error, enoent} -> + util:check_error(make_dirs([Rootdir, Rootdir ++ "nursery/"] + ++ Dirs), + makedir, "Error creating directory"), + NurseryName = Rootdir ++ "nursery/" ++ + util:tempfilename(hex:bin_to_hexstr(Key)), + util:write_tempfile_and_rename(Path, NurseryName, Content), + util:fsync([Path, Rootdir | Dirs]); + {error, Error} -> + util:exit_with_error(Error, readfile, "Error reading file") + end. + +-spec readfile(string(), binary()) -> binary(). +readfile(Rootdir, Key) -> + {_Dirs, Path} = path_for_key(Rootdir, Key), + atomic:readfile(Path). diff --git a/src/plop.erl b/src/plop.erl index c93b26d..30d05ca 100644 --- a/src/plop.erl +++ b/src/plop.erl @@ -146,7 +146,7 @@ handle_call({get, logid}, _From, {reply, LogID, Plop}; handle_call({add, {LogEntry, TreeLeafHash, EntryHash}}, _From, Plop) -> - {atomic, ok} = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), + ok = db:add(TreeLeafHash, EntryHash, LogEntry, ht:size()), ok = ht:add(TreeLeafHash), {reply, ok, Plop}; diff --git a/src/plop_sup.erl b/src/plop_sup.erl index a5ce905..bcb9756 100644 --- a/src/plop_sup.erl +++ b/src/plop_sup.erl @@ -23,6 +23,11 @@ init(Args) -> permanent, 10000, worker, [db]}, + {fsync, + {fsyncport, start_link, []}, + permanent, + 10000, + worker, [fsyncport]}, {the_ht, {ht, start_link, []}, permanent, diff --git a/src/util.erl b/src/util.erl new file mode 100644 index 0000000..48ebbb0 --- /dev/null +++ b/src/util.erl @@ -0,0 +1,57 @@ +%% +%% Copyright (c) 2014 Kungliga Tekniska Högskolan +%% (KTH Royal Institute of Technology, Stockholm, Sweden). +%% + +-module(util). +-export([tempfilename/1, fsync/1, exit_with_error/3, + check_error/3, write_tempfile_and_rename/3]). + +-spec tempfilename(string()) -> string(). +tempfilename(Base) -> + {MegaSecs, Secs, MicroSecs} = now(), + Filename = io_lib:format("~s-~s-~p.~p", [Base, os:getpid(), + MegaSecs * 1000000 + Secs, MicroSecs]), + Filename. + +-spec fsync([string()]) -> ok. +fsync([]) -> + ok; +fsync([Name | Rest]) -> + case fsyncport:fsync(Name) of + ok -> + fsync(Rest); + {error, Error} -> + exit_with_error(fsync, Error, "Error in fsync") + end. + +-spec exit_with_error(atom(), atom(), string()) -> no_return(). +exit_with_error(Operation, Error, ErrorMessage) -> + io:format("~s(~w): ~w~n", [ErrorMessage, Operation, Error]), + exit({fileerror, Operation, Error, ErrorMessage}). + +-spec check_error(any(), atom(), string()) -> ok. +check_error(ReturnValue, Operation, ErrorMessage) -> + case ReturnValue of + ok -> + ok; + {error, Error} -> + exit_with_error(Operation, Error, ErrorMessage) + end. + +-spec write_tempfile_and_rename(string(), string(), binary()) -> ok. +write_tempfile_and_rename(Name, NurseryName, Content) -> + case file:open(NurseryName, [write, exclusive]) of + {ok, File} -> + ok = file:write(File, Content), + file:close(File), + check_error(file:rename(NurseryName, Name), rename, + "Error when renaming tempfile to final file"); + {error, eexist} -> + %% Should not happen, file name should be unique + exit_with_error(writefile, eexist, + "File existed when creating tempfile"); + {error, Error} -> + exit_with_error(writefile, Error, + "Error when creating tempfile") + end. |