From 9b2e72510b33547794207043714f52e16239b3f5 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Mon, 28 Sep 2015 16:42:04 +0200 Subject: Added permdb --- src/permdb.erl | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 src/permdb.erl (limited to 'src/permdb.erl') diff --git a/src/permdb.erl b/src/permdb.erl new file mode 100644 index 0000000..0cb3c2f --- /dev/null +++ b/src/permdb.erl @@ -0,0 +1,202 @@ +%%% Copyright (c) 2015, NORDUnet A/S. +%%% See LICENSE for licensing information. + +-module(permdb). + +-behaviour(gen_server). + +-export([start_link/2, stop/1]). +-export([getvalue/2, addvalue/3, commit/1, commit/2]). + +%% gen_server callbacks. +-export([init/1, handle_call/3, terminate/2, handle_cast/2, handle_info/2, + code_change/3]). + +-define(KEYSIZE, 32). +-define(BITSPERLEVEL, 2). +-define(ENTRIESPERNODE, 4). +-define(BYTESPERENTRY, 8). +-define(BITSPERENTRY, (?BYTESPERENTRY*8-1)). +-define(MAGICSIZE, 2). +-define(NODEMAGIC, <<16#8a, 16#44>>). +-define(DATAMAGIC, <<16#cb, 16#0e>>). +-define(NODESIZE, (?MAGICSIZE+(?ENTRIESPERNODE*?BYTESPERENTRY))). + +-record(state, {cachename, port, datafile, indexfile, requests}). + +openfile(Filename) -> + {ok, File} = file:open(Filename, [read, write, binary, raw]), + File. + +getvalue_port_command(Port, Key) -> + Port ! {self(), {command, <<0:8, Key/binary>>}}. + +addvalue_port_command(Port, Key, Value) -> + Port ! {self(), {command, <<1:8, Key:32/binary, Value/binary>>}}. + +commit_port_command(Port) -> + Port ! {self(), {command, <<2:8>>}}. + +getdatakey(State, Offset) -> + {ok, DataBinary} = file:pread(State#state.datafile, Offset, ?MAGICSIZE+?KEYSIZE+?BYTESPERENTRY), + Datamagic = ?DATAMAGIC, + <> = DataBinary, + {Key, Length}. + +getdata(State, Offset, Length) -> + {ok, DataBinary} = file:pread(State#state.datafile, Offset+?MAGICSIZE+?KEYSIZE+?BYTESPERENTRY, Length), + DataBinary. + +getnode(State, Offset) -> + case ets:lookup(State#state.cachename, Offset) of + [] -> + {ok, NodeBinary} = file:pread(State#state.indexfile, Offset, ?NODESIZE), + Nodemagic = ?NODEMAGIC, + <> = NodeBinary, + ets:insert(State#state.cachename, {Offset, Node}), + Node; + [{_, Node}] -> + Node + end. + +getroot(State) -> + case ets:lookup(State#state.cachename, root) of + [] -> + {ok, _Position} = file:position(State#state.indexfile, {eof, -?NODESIZE}), + {ok, RootNodeBinary} = file:read(State#state.indexfile, ?NODESIZE), + Nodemagic = ?NODEMAGIC, + <> = RootNodeBinary, + ets:insert(State#state.cachename, {root, Node}), + Node; + [{root, Node}] -> + Node + end. + +getendentry(State, Key) -> + Root = getroot(State), + %io:format("Root: ~p~n", [Root]), + getendentry(State, Key, Root). + +getendentry(State, <>, Node) -> + case binary_part(Node, KeyHead*?BYTESPERENTRY, ?BYTESPERENTRY) of + <<0:?BYTESPERENTRY/integer-unit:8>> -> + none; + <<0:1, Entry:?BITSPERENTRY>> -> + NewNode = getnode(State, Entry), + getendentry(State, KeyRest, NewNode); + <<1:1, Entry:?BITSPERENTRY>> -> + Entry + end. + +getvalue_file(State, Key) -> + case getendentry(State, Key) of + none -> + none; + Entry -> + case getdatakey(State, Entry) of + {Key, Length} -> + getdata(State, Entry, Length); + _ -> + none + end + end. + + +getvalue(Name, Key) -> + gen_server:call(Name, {getvalue, Key}). + +addvalue(Name, Key, Value) -> + gen_server:call(Name, {addvalue, Key, Value}). + +commit(Name) -> + gen_server:call(Name, {commit}). +commit(Name, Timeout) -> + gen_server:call(Name, {commit}, Timeout). + +init([Name, Filename]) -> + Cachename = list_to_atom(atom_to_list(Name) ++ "_cache"), + ets:new(Cachename, [set, public, named_table]), + process_flag(trap_exit, true), + Port = open_port({spawn_executable, code:priv_dir(plop) ++ "/permdbport"}, + [{packet, 4}, {args, [Filename]}, binary]), + DataFile = none,%%openfile(Filename), + IndexFile = none,%%openfile(Filename ++ ".idx"), + {ok, #state{cachename = Cachename, + port = Port, + datafile = DataFile, + indexfile = IndexFile, + requests = queue:new()}}. + +start_link(Name, Filename) -> + gen_server:start_link({local, Name}, ?MODULE, + [Name, Filename], []). + +stop(Name) -> + gen_server:call(Name, stop). + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({Port, {data, Data}}, State) when is_port(Port) -> + lager:debug("response: ~p", [Data]), + {{value, {From, Action}}, Requests} = queue:out(State#state.requests), + lager:debug("response: ~p", [Action]), + gen_server:reply(From, case Action of + getvalue -> + case Data of + <<>> -> + noentry; + _ -> + Data + end; + addvalue -> + case Data of + <<>> -> + util:exit_with_error(putvalue, "Error in putvalue"); + _ -> + ok + end; + commit -> + Data + end), + {noreply, State#state{requests = Requests}}; +handle_info(_Info, State) -> + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + io:format("~p terminating~n", [?MODULE]), + State#state.port ! {self(), {command, <<>>}}, + ok. + +add_request(State, From, Action) -> + State#state{ + requests = queue:in({From, Action}, State#state.requests) + }. + +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; + +handle_call({getvalue, Key}, From, State) -> + lager:debug("getvalue: ~p", [Key]), + Method = port, + case Method of + port -> + getvalue_port_command(State#state.port, Key), + {noreply, add_request(State, From, getvalue)}; + file -> + Value = getvalue_file(State, Key), + {reply, Value, State} + end; + +handle_call({addvalue, Key, Value}, From, State) -> + lager:debug("addvalue: ~p ~p", [Key, Value]), + addvalue_port_command(State#state.port, Key, Value), + {noreply, add_request(State, From, addvalue)}; + +handle_call({commit}, From, State) -> + lager:debug("commit", []), + commit_port_command(State#state.port), + {noreply, add_request(State, From, commit)}. -- cgit v1.1