#! /usr/bin/env python3 # A database storing dictionaries, keyed on a timestamp. # key = 8 octets timestamp | 1 octet version # struct.pack('!dB', time.time(), 0) # value = A dict which will be stored as a JSON object encoded in # UTF-8. Note that dict keys of type integer or float will become # strings while values will keep their type. # Note that there's a (slim) chance that you'd stomp on the previous # value if you're too quick with generating the timestamps, ie # invoking time.time() several times quickly enough. from store import KVStore import json from pathlib import PurePath import time import struct import os class DictDB(KVStore): VERSION = 0 # TODO: implement indexes # TODO: implement search(dict key) for indexed fields def __init__(self, name, basedir='.'): super().__init__(name, basedir) self._ts = time.time() self._index = {} def unique_key(self): ts = time.time() while ts == self._ts: ts = time.time() self._ts = ts return struct.pack('!dB', ts, DictDB.VERSION) def index_add(self, path): name = PurePath(path).name self._index[name] = DBIndex(path) def add(self, data, batch_write=False): if type(data) is list: ret = [] if batch_write: # Supposedly makes the update atomic. batch = self.batch() for e in data: ret += [self.add(e)] if batch_write: self.write(batch) return ret else: key = self.unique_key() json_data = json.dumps(data).encode('UTF-8') self.put(key, json_data) return key def get(self, key): enc = super().get(key) # name collision, specify super class ret = json.loads(enc.decode('UTF-8')) return ret def slice(self, key_from=None, key_to=None): ret = [] for key_raw, val_raw in list(self.range_iter(key_from=key_from, key_to=key_to)): (key, ver) = struct.unpack('!dB', key_raw) if ver == DictDB.VERSION: val = json.loads(val_raw.decode('UTF-8')) ret.append((key, val)) return ret # Search top level dict for objects with a name matching DICT_NAME # and optionally value DICT_VAL. def search(self, dict_name, dict_val=None): res = [] for key, obj in self.slice(): if dict_name in obj: if dict_val is None: res.append((key, obj)) elif dict_val == obj[dict_name]: res.append((key, obj)) return res def timestamp_from_key(key): ts, _ = struct.unpack('!dB', key) return ts class DBIndex(KVStore): def __init__(self, name, basedir='.'): super().__init__(name, basedir) # def update_or_create(self, key, val): # curval = [] # try: # curval = self._db.Get(key) # except KeyError: # self._db.Put(key, [val]) # return # if curval is list: # self._db.Put(key, curval + [val]) # else: # self._db.Put(key, json.dumps([curval, val])) # def index_full(self, name): # kv = self._index[name]) # for key, val in self.range_iter(): # valdict = json.loads(val) # field = valdict.get(name) # if field: # ix = kv.get(key) # if ix: # kv.put(ix + [key]) # else: # kv.put([key]) if __name__ == '__main__': DBDIR = 'test_db.db' # TODO: rm -r DBDIR db = DictDB('db', basedir = DBDIR) #ix = DBIndex('foo', basedir = DBDIR) key = db.add({'foo': 'Bar'}) assert(db.get(key) == {'foo': 'Bar'}) key = db.add({0: 'Foo'}) # NOTE: int keys become strings assert(db.get(key) == {'0': 'Foo'}) d = {'4711': 'Large number', '7': 'Small number', '0': 'Bar'} key = db.add(d) res = db.get(key) assert(dict_eq(d, res)) key = db.add({'an int': 0}) assert(db.get(key) == {'an int': 0}) key = db.add({'a float': 1.1}) assert(db.get(key) == {'a float': 1.1}) # TODO: verify slice() too for key, val in db.slice(): print(key, val) res = db.search('an int') assert(dict_eq(res[0][1], {'an int': 0})) res = db.search('0') assert(dict_eq(res[0][1], {'0': 'Foo'})) assert(dict_eq(res[1][1], d)) res = db.search('7', dict_val = 'Small number') # FIXME: verify better -- do we hit only '7' here, f.ex.? assert('7' in res[0][1]) res = db.search('7', dict_val = 'not matching') assert(not res) N = 10 * 1000 # 10k takes ~0.2s. data = [{str(x): x} for x in range(N)] keys = db.add(data, batch_write = False) assert(len(keys) == N) for k in range(len(keys)): assert(db.get(keys[k]) == data[k]) from operator import itemgetter def dict_eq(a, b): sort_on_key = lambda d: sorted(d.items(), key=itemgetter(0)) return sort_on_key(a) == sort_on_key(b)