diff options
Diffstat (limited to 'src/collector/db.py')
-rw-r--r--[-rwxr-xr-x] | src/collector/db.py | 185 |
1 files changed, 38 insertions, 147 deletions
diff --git a/src/collector/db.py b/src/collector/db.py index 0bfa014..3b16ef5 100755..100644 --- a/src/collector/db.py +++ b/src/collector/db.py @@ -1,148 +1,39 @@ -# A database storing dictionaries, keyed on a timestamp. value = A -# dict which will be stored as a JSON object encoded in UTF-8. Note -# that dict keys of type integer or float will become strings while -# values will keep their type. - -# Note that there's a (slim) chance that you'd stomp on the previous -# value if you're too quick with generating the timestamps, ie -# invoking time.time() several times quickly enough. - -from typing import Dict, List, Tuple, Union, Any -import os -import sys -import time - -from src import couch -from .schema import as_index_list, validate_collector_data - - -class DictDB: - def __init__(self) -> None: - """ - Check if the database exists, otherwise we will create it together - with the indexes specified in index.py. - """ - - print(os.environ) - - try: - self.database = os.environ["COUCHDB_NAME"] - self.hostname = os.environ["COUCHDB_HOSTNAME"] - self.username = os.environ["COUCHDB_USER"] - self.password = os.environ["COUCHDB_PASSWORD"] - except KeyError: - print( - "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME," - + " COUCHDB_USER and COUCHDB_PASSWORD must be set." - ) - sys.exit(-1) - - if "COUCHDB_PORT" in os.environ: - couchdb_port = os.environ["COUCHDB_PORT"] - else: - couchdb_port = "5984" - - self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/") - - try: - self.couchdb = self.server.database(self.database) - print("Database already exists") - except couch.exceptions.NotFound: - print("Creating database and indexes.") - self.couchdb = self.server.create(self.database) - - for i in as_index_list(): - self.couchdb.index(i) - - self._ts = time.time() - - def unique_key(self) -> int: - """ - Create a unique key based on the current time. We will use this as - the ID for any new documents we store in CouchDB. - """ - - ts = time.time() - while round(ts * 1000) == self._ts: - ts = time.time() - self._ts = round(ts * 1000) - - return self._ts - - # Why batch_write??? - def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]: - """ - Store a document in CouchDB. - """ - - if isinstance(data, List): - for item in data: - error = validate_collector_data(item) - if error != "": - return error - item["_id"] = str(self.unique_key()) - ret: Tuple[str, str] = self.couchdb.save_bulk(data) +"""Our database module""" +from time import sleep +from sys import exit as app_exit +from dataclasses import dataclass + +from motor.motor_asyncio import ( + AsyncIOMotorClient, + AsyncIOMotorCollection, +) +from bson import ObjectId + + +@dataclass() +class DBClient: + """Class to hold database connections for us.""" + + client: AsyncIOMotorClient + collection: AsyncIOMotorCollection + + def __init__(self, username: str, password: str, collection: str) -> None: + self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000) + self.collection = self.client["production"][collection] + + async def check_server(self) -> None: + """Try query the DB and exit the program if we fail after 5 times. + + :return: None + """ + for i in range(5): + try: + await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")}) + print("Connection to DB - OK") + break + except: # pylint: disable=bare-except + print(f"WARNING failed to connect to DB - {i} / 4", flush=True) + sleep(1) else: - error = validate_collector_data(data) - if error != "": - return error - data["_id"] = str(self.unique_key()) - ret = self.couchdb.save(data) - - return ret - - def get(self, key: int) -> Dict[str, Any]: - """ - Get a document based on its ID, return an empty dict if not found. - """ - - try: - doc: Dict[str, Any] = self.couchdb.get(key) - except couch.exceptions.NotFound: - doc = {} - - return doc - - # - # def slice(self, key_from=None, key_to=None): - # pass - - def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]: - """ - Execute a Mango query, ideally we should have an index matching - the query otherwise things will be slow. - """ - - data: List[Dict[str, Any]] = [] - selector: Dict[str, Any] = {} - - try: - limit = int(limit) - skip = int(skip) - except ValueError: - limit = 25 - skip = 0 - - if kwargs: - selector = {"limit": limit, "skip": skip, "selector": {}} - - for key in kwargs: - if kwargs[key] and kwargs[key].isnumeric(): - kwargs[key] = int(kwargs[key]) - selector["selector"][key] = {"$eq": kwargs[key]} - - for doc in self.couchdb.find(selector, wrapper=None, limit=5): - data.append(doc) - - return data - - def delete(self, key: int) -> Union[int, None]: - """ - Delete a document based on its ID. - """ - try: - self.couchdb.delete(key) - except couch.exceptions.NotFound: - return None - - return key + print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production") + app_exit(1) |