import os import sys import time from contextlib import contextmanager from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from db import couch from db.index import CouchIindex class DictDB(): def __init__(self): """ Check if the database exists, otherwise we will create it together with the indexes specified in index.py. """ try: self.database = os.environ['COUCHDB_NAME'] self.hostname = os.environ['COUCHDB_HOSTNAME'] self.username = os.environ['COUCHDB_USER'] self.password = os.environ['COUCHDB_PASSWORD'] except KeyError: print('The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME,' + ' COUCHDB_USER and COUCHDB_PASSWORD must be set.') sys.exit(-1) if 'COUCHDB_PORT' in os.environ: couchdb_port = os.environ['COUCHDB_PORT'] else: couchdb_port = 5984 self.server = couch.client.Server( f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") try: self.couchdb = self.server.database(self.database) except couch.exceptions.NotFound: print("Creating database and indexes.") self.couchdb = self.server.create(self.database) for i in CouchIindex(): self.couchdb.index(i) self._ts = time.time() def unique_key(self): """ Create a unique key based on the current time. We will use this as the ID for any new documents we store in CouchDB. """ ts = time.time() while round(ts * 1000) == self._ts: ts = time.time() self._ts = round(ts * 1000) return self._ts def add(self, data, batch_write=False): """ Store a document in CouchDB. """ if type(data) is list: for item in data: item['_id'] = str(self.unique_key()) ret = self.couchdb.save_bulk(data) else: data['_id'] = str(self.unique_key()) ret = self.couchdb.save(data) return ret def get(self, key): """ Get a document based on its ID, return an empty dict if not found. """ try: doc = self.couchdb.get(key) except couch.exceptions.NotFound: doc = {} return doc def slice(self, key_from=None, key_to=None): pass def search(self, limit=25, skip=0, **kwargs): """ Execute a Mango query, ideally we should have an index matching the query otherwise things will be slow. """ data = list() selector = dict() try: limit = int(limit) skip = int(skip) except ValueError: limit = 25 skip = 0 if kwargs: selector = { "limit": limit, "skip": skip, "selector": {} } for key in kwargs: if kwargs[key] and kwargs[key].isnumeric(): kwargs[key] = int(kwargs[key]) selector['selector'][key] = {'$eq': kwargs[key]} for doc in self.couchdb.find(selector, wrapper=None, limit=5): data.append(doc) return data def delete(self, key): """ Delete a document based on its ID. """ try: self.couchdb.delete(key) except couch.exceptions.NotFound: return None return key def get_conn_str(): try: dialect = os.environ['SQL_DIALECT'] database = os.environ['SQL_DATABASE'] except KeyError: print('The environment variables SQL_DIALECT and SQL_DATABASE must ' + 'be set.') sys.exit(-1) if dialect != 'sqlite': try: hostname = os.environ['SQL_HOSTNAME'] username = os.environ['SQL_USERNAME'] password = os.environ['SQL_PASSWORD'] except KeyError: print('The environment variables SQL_DIALECT, SQL_NAME, ' + 'SQL_HOSTNAME, SQL_USERNAME and SQL_PASSWORD must ' + 'be set.') sys.exit(-1) if dialect == 'sqlite': conn_str = f"{dialect}:///{database}.db" else: conn_str = f"{dialect}://{username}:{password}@{hostname}" + \ "/{database}" return conn_str class SqlDB(): def get_session(conn_str): if 'sqlite' in conn_str: engine = create_engine(conn_str) else: engine = create_engine(conn_str, pool_size=50, max_overflow=0) Session = sessionmaker(bind=engine) return Session() @classmethod @contextmanager def sql_session(cls, **kwargs): session = cls.get_session(get_conn_str()) try: yield session session.commit() except Exception: session.rollback() raise finally: session.close()