#! /usr/bin/env python3 from leveldb import LevelDB, WriteBatch import time from pathlib import PurePath import os class KVStore: """Wraps a Python wrapper for LevelDB in case we want to change wrapper """ def __init__(self, name, basedir='.'): os.makedirs(basedir, exist_ok = True) path = str(PurePath(basedir).joinpath(name)) self._db = LevelDB(path) def get(self, key): try: val = self._db.Get(key) except KeyError: val = None # You can thus not store None! return val def put(self, key, val): self._db.Put(key, val) def delete(self, key): self._db.Delete(key) def range_iter(self, key_from=None, key_to=None): return self._db.RangeIter(key_from=key_from, key_to=key_to) def batch(self): return WriteBatch() def write(self, batch): self._db.Write(batch, sync=True) def timestamp_asc(): return str(time.time()).encode('ascii') def test_store(name): # TODO: rm -r name db = KVStore(name) ts0 = KVStore.timestamp_asc() db.put(ts0, b'Bar') assert(db.get(ts0) == b'Bar') ts1 = KVStore.timestamp_asc() db.put(ts1, b'Foo') assert(db.get(ts1) == b'Foo') assert(list(db.range_iter()) == [(ts0, b'Bar'), (ts1, b'Foo')]) db.delete(ts0) assert(db.get(ts0) is None) return db if __name__ == '__main__': test_store('test_store.db')