From 0398e77a809abcaf78c6f7d3e6064a5bee50be23 Mon Sep 17 00:00:00 2001 From: Kristofer Hallin Date: Thu, 23 Sep 2021 11:56:34 +0200 Subject: Use CouchDB, this might break things. --- src/authn.py | 7 ++- src/db.py | 183 +++++++++++++---------------------------------------------- src/wsgi.py | 32 ++++++++--- 3 files changed, 69 insertions(+), 153 deletions(-) (limited to 'src') diff --git a/src/authn.py b/src/authn.py index 8227e2c..d57e382 100755 --- a/src/authn.py +++ b/src/authn.py @@ -17,6 +17,7 @@ class Authz: def write_p(self): return 'w' in self._perms + class User: def __init__(self, username, pw, authz): self._username = username @@ -37,18 +38,19 @@ class User: def read_perms(self): acc = [] - for k,v in self._authz.items(): + for k, v in self._authz.items(): if v.read_p(): acc.append(k) return acc def write_perms(self): acc = [] - for k,v in self._authz.items(): + for k, v in self._authz.items(): if v.write_p(): acc.append(k) return acc + class UserDB: def __init__(self, yamlfile): self._users = {} @@ -105,5 +107,6 @@ def self_test(): assert(len(wp) == 1) assert('sunet.se' in wp) + if __name__ == '__main__': self_test() diff --git a/src/db.py b/src/db.py index 980505d..0c7e998 100755 --- a/src/db.py +++ b/src/db.py @@ -1,10 +1,6 @@ #! /usr/bin/env python3 # A database storing dictionaries, keyed on a timestamp. - -# key = 8 octets timestamp | 1 octet version -# struct.pack('!dB', time.time(), 0) - # value = A dict which will be stored as a JSON object encoded in # UTF-8. Note that dict keys of type integer or float will become # strings while values will keep their type. @@ -13,165 +9,66 @@ # value if you're too quick with generating the timestamps, ie # invoking time.time() several times quickly enough. -from store import KVStore -import json -from pathlib import PurePath import time import struct -import os +import couchdb + -class DictDB(KVStore): - VERSION = 0 +class DictDB(): + def __init__(self, database, hostname, username, password): + self.server = couchdb.Server( + f"http://{username}:{password}@{hostname}:5984/") - # TODO: implement indexes - # TODO: implement search(dict key) for indexed fields + if database not in self.server: + self.server.create(database) - def __init__(self, name, basedir='.'): - super().__init__(name, basedir) + self.couchdb = self.server[database] self._ts = time.time() - self._index = {} def unique_key(self): ts = time.time() while ts == self._ts: ts = time.time() - self._ts = ts - return struct.pack('!dB', ts, DictDB.VERSION) + self._ts = round(ts * 1000) + + return self._ts def index_add(self, path): - name = PurePath(path).name - self._index[name] = DBIndex(path) + pass def add(self, data, batch_write=False): + key = str(self.unique_key()) + if type(data) is list: - ret = [] - if batch_write: # Supposedly makes the update atomic. - batch = self.batch() - for e in data: - ret += [self.add(e)] - if batch_write: - self.write(batch) - return ret + for item in data: + self.couchdb[key] = item else: - key = self.unique_key() - json_data = json.dumps(data).encode('UTF-8') - self.put(key, json_data) - return key + self.couchdb[key] = data + + return key def get(self, key): - enc = super().get(key) # name collision, specify super class - ret = json.loads(enc.decode('UTF-8')) - return ret + return self.couchdb[key] def slice(self, key_from=None, key_to=None): - ret = [] - for key_raw, val_raw in list(self.range_iter(key_from=key_from, key_to=key_to)): - (key, ver) = struct.unpack('!dB', key_raw) - if ver == DictDB.VERSION: - val = json.loads(val_raw.decode('UTF-8')) - ret.append((key, val)) - return ret - - # Search top level dict for objects with a name matching DICT_NAME - # and optionally value DICT_VAL. - def search(self, dict_name, dict_val=None): - res = [] - for key, obj in self.slice(): - if dict_name in obj: - if dict_val is None: - res.append((key, obj)) - elif dict_val == obj[dict_name]: - res.append((key, obj)) - return res - - def timestamp_from_key(key): - ts, _ = struct.unpack('!dB', key) - return ts - -class DBIndex(KVStore): - def __init__(self, name, basedir='.'): - super().__init__(name, basedir) - -# def update_or_create(self, key, val): -# curval = [] -# try: -# curval = self._db.Get(key) -# except KeyError: -# self._db.Put(key, [val]) -# return - -# if curval is list: -# self._db.Put(key, curval + [val]) -# else: -# self._db.Put(key, json.dumps([curval, val])) - - # def index_full(self, name): - # kv = self._index[name]) - # for key, val in self.range_iter(): - # valdict = json.loads(val) - # field = valdict.get(name) - # if field: - # ix = kv.get(key) - # if ix: - # kv.put(ix + [key]) - # else: - # kv.put([key]) - -from operator import itemgetter -def dict_eq(a, b): - sort_on_key = lambda d: sorted(d.items(), key=itemgetter(0)) - return sort_on_key(a) == sort_on_key(b) - -if __name__ == '__main__': - DBDIR = 'test_db.db' - # TODO: rm -r DBDIR - db = DictDB('db', basedir = DBDIR) - #ix = DBIndex('foo', basedir = DBDIR) - - key = db.add({'foo': 'Bar'}) - assert(db.get(key) == {'foo': 'Bar'}) - - key = db.add({0: 'Foo'}) # NOTE: int keys become strings - assert(db.get(key) == {'0': 'Foo'}) - - d2 = {'4711': 'Large number', '7': 'Small number', '0': 'Bar'} - key = db.add(d2) - res = db.get(key) - assert(dict_eq(d2, res)) - - key = db.add({'an int': 0}) - assert(db.get(key) == {'an int': 0}) - - key = db.add({'a float': 1.1}) - assert(db.get(key) == {'a float': 1.1}) - - d5 = { "ip": {"foo": "192.0.2.10" }} - key = db.add(d5) - res = db.get(key) - assert(res == d5) - - # TODO: verify slice() too - for key, val in db.slice(): - print(key, val) - - res = db.search('an int') - assert(dict_eq(res[0][1], {'an int': 0})) - - res = db.search('0') - assert(dict_eq(res[0][1], {'0': 'Foo'})) - assert(dict_eq(res[1][1], d2)) - - res = db.search('7', dict_val = 'Small number') # FIXME: verify better -- do we hit only '7' here, f.ex.? - assert('7' in res[0][1]) - - res = db.search('7', dict_val = 'not matching') - assert(not res) - - N = 10 * 1000 # 10k takes ~0.2s. - data = [{str(x): x} for x in range(N)] - keys = db.add(data, batch_write = False) - assert(len(keys) == N) - for k in range(len(keys)): - assert(db.get(keys[k]) == data[k]) + pass + + def search(self, **kwargs): + data = list() + selector = dict() + + if kwargs: + selector = { + "selector": { + } + } + + for key in kwargs: + if kwargs[key].isnumeric(): + kwargs[key] = int(kwargs[key]) + selector['selector'][key] = {'$eq': kwargs[key]} + for doc in self.couchdb.find(selector): + data.append(doc) + return data diff --git a/src/wsgi.py b/src/wsgi.py index 98efc6f..49c1c17 100755 --- a/src/wsgi.py +++ b/src/wsgi.py @@ -1,15 +1,16 @@ #! /usr/bin/env python3 +import os import sys from wsgiref.simple_server import make_server import falcon import json from db import DictDB -import time from base64 import b64decode import authn + class CollectorResource(): def __init__(self, db, users): self._db = db @@ -28,7 +29,7 @@ class CollectorResource(): try: user = userbytes.decode('utf-8') pw = pwbytes.decode('utf-8') - except: + except Exception: return None, None # Fail return authfun(user, pw) @@ -38,15 +39,20 @@ class EPGet(CollectorResource): resp.status = falcon.HTTP_200 resp.content_type = falcon.MEDIA_JSON out = [] - orgs = self.user_auth(req.auth, self._users.read_perms) + if not orgs: resp.status = falcon.HTTP_401 resp.text = 'Invalid username or password\n' return + # We really should rely on req.params in its pure form since + # it might contain garbage. + selectors = req.params + for org in orgs: - out += [{time.ctime(key): dict} for (key, dict) in self._db.search('domain', dict_val=org)] + selectors['domain'] = org + out.append(self._db.search(**selectors)) resp.text = json.dumps(out) + '\n' @@ -75,7 +81,7 @@ class EPAdd(CollectorResource): rawin = req.bounded_stream.read() try: decodedin = rawin.decode('UTF-8') - except: + except Exception: resp.status = falcon.HTTP_400 resp.text = 'Need UTF-8\n' return @@ -97,11 +103,10 @@ class EPAdd(CollectorResource): resp.text = repr(key) + '\n' -def init(url_res_map, addr = '', port = 8000): +def init(url_res_map, addr='', port=8000): app = falcon.App() for url, res in url_res_map: app.add_route(url, res) - return make_server(addr, port, app) @@ -123,7 +128,17 @@ def main(): # # $ curl -s -u user1:pw1 http://localhost:8000/sc/v0/get | json_pp -json_opt utf8,pretty - db = DictDB('wsgi_demo.db') + try: + database = os.environ['DB_NAME'] + hostname = os.environ['DB_HOSTNAME'] + username = os.environ['DB_USERNAME'] + password = os.environ['DB_PASSWORD'] + except KeyError: + print('The environment variables DB_NAME, DB_HOSTNAME, DB_USERNAME ' + + 'and DB_PASSWORD must be set.') + sys.exit(-1) + + db = DictDB(database, hostname, username, password) users = authn.UserDB('wsgi_demo_users.yaml') httpd = init([('/sc/v0/add', EPAdd(db, users)), @@ -131,5 +146,6 @@ def main(): print('Serving on port 8000...') httpd.serve_forever() + if __name__ == '__main__': sys.exit(main()) -- cgit v1.1