summaryrefslogtreecommitdiff
path: root/src/collector/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/collector/db.py')
-rw-r--r--[-rwxr-xr-x]src/collector/db.py185
1 files changed, 38 insertions, 147 deletions
diff --git a/src/collector/db.py b/src/collector/db.py
index 0bfa014..3b16ef5 100755..100644
--- a/src/collector/db.py
+++ b/src/collector/db.py
@@ -1,148 +1,39 @@
-# A database storing dictionaries, keyed on a timestamp. value = A
-# dict which will be stored as a JSON object encoded in UTF-8. Note
-# that dict keys of type integer or float will become strings while
-# values will keep their type.
-
-# Note that there's a (slim) chance that you'd stomp on the previous
-# value if you're too quick with generating the timestamps, ie
-# invoking time.time() several times quickly enough.
-
-from typing import Dict, List, Tuple, Union, Any
-import os
-import sys
-import time
-
-from src import couch
-from .schema import as_index_list, validate_collector_data
-
-
-class DictDB:
- def __init__(self) -> None:
- """
- Check if the database exists, otherwise we will create it together
- with the indexes specified in index.py.
- """
-
- print(os.environ)
-
- try:
- self.database = os.environ["COUCHDB_NAME"]
- self.hostname = os.environ["COUCHDB_HOSTNAME"]
- self.username = os.environ["COUCHDB_USER"]
- self.password = os.environ["COUCHDB_PASSWORD"]
- except KeyError:
- print(
- "The environment variables COUCHDB_NAME, COUCHDB_HOSTNAME,"
- + " COUCHDB_USER and COUCHDB_PASSWORD must be set."
- )
- sys.exit(-1)
-
- if "COUCHDB_PORT" in os.environ:
- couchdb_port = os.environ["COUCHDB_PORT"]
- else:
- couchdb_port = "5984"
-
- self.server = couch.client.Server(f"http://{self.username}:{self.password}@{self.hostname}:{couchdb_port}/")
-
- try:
- self.couchdb = self.server.database(self.database)
- print("Database already exists")
- except couch.exceptions.NotFound:
- print("Creating database and indexes.")
- self.couchdb = self.server.create(self.database)
-
- for i in as_index_list():
- self.couchdb.index(i)
-
- self._ts = time.time()
-
- def unique_key(self) -> int:
- """
- Create a unique key based on the current time. We will use this as
- the ID for any new documents we store in CouchDB.
- """
-
- ts = time.time()
- while round(ts * 1000) == self._ts:
- ts = time.time()
- self._ts = round(ts * 1000)
-
- return self._ts
-
- # Why batch_write???
- def add(self, data: Union[List[Dict[str, Any]], Dict[str, Any]]) -> Union[str, Tuple[str, str]]:
- """
- Store a document in CouchDB.
- """
-
- if isinstance(data, List):
- for item in data:
- error = validate_collector_data(item)
- if error != "":
- return error
- item["_id"] = str(self.unique_key())
- ret: Tuple[str, str] = self.couchdb.save_bulk(data)
+"""Our database module"""
+from time import sleep
+from sys import exit as app_exit
+from dataclasses import dataclass
+
+from motor.motor_asyncio import (
+ AsyncIOMotorClient,
+ AsyncIOMotorCollection,
+)
+from bson import ObjectId
+
+
+@dataclass()
+class DBClient:
+ """Class to hold database connections for us."""
+
+ client: AsyncIOMotorClient
+ collection: AsyncIOMotorCollection
+
+ def __init__(self, username: str, password: str, collection: str) -> None:
+ self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000)
+ self.collection = self.client["production"][collection]
+
+ async def check_server(self) -> None:
+ """Try query the DB and exit the program if we fail after 5 times.
+
+ :return: None
+ """
+ for i in range(5):
+ try:
+ await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")})
+ print("Connection to DB - OK")
+ break
+ except: # pylint: disable=bare-except
+ print(f"WARNING failed to connect to DB - {i} / 4", flush=True)
+ sleep(1)
else:
- error = validate_collector_data(data)
- if error != "":
- return error
- data["_id"] = str(self.unique_key())
- ret = self.couchdb.save(data)
-
- return ret
-
- def get(self, key: int) -> Dict[str, Any]:
- """
- Get a document based on its ID, return an empty dict if not found.
- """
-
- try:
- doc: Dict[str, Any] = self.couchdb.get(key)
- except couch.exceptions.NotFound:
- doc = {}
-
- return doc
-
- #
- # def slice(self, key_from=None, key_to=None):
- # pass
-
- def search(self, limit: int = 25, skip: int = 0, **kwargs: Any) -> List[Dict[str, Any]]:
- """
- Execute a Mango query, ideally we should have an index matching
- the query otherwise things will be slow.
- """
-
- data: List[Dict[str, Any]] = []
- selector: Dict[str, Any] = {}
-
- try:
- limit = int(limit)
- skip = int(skip)
- except ValueError:
- limit = 25
- skip = 0
-
- if kwargs:
- selector = {"limit": limit, "skip": skip, "selector": {}}
-
- for key in kwargs:
- if kwargs[key] and kwargs[key].isnumeric():
- kwargs[key] = int(kwargs[key])
- selector["selector"][key] = {"$eq": kwargs[key]}
-
- for doc in self.couchdb.find(selector, wrapper=None, limit=5):
- data.append(doc)
-
- return data
-
- def delete(self, key: int) -> Union[int, None]:
- """
- Delete a document based on its ID.
- """
- try:
- self.couchdb.delete(key)
- except couch.exceptions.NotFound:
- return None
-
- return key
+ print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production")
+ app_exit(1)