diff options
author | Victor Näslund <victor@sunet.se> | 2022-11-13 17:49:45 +0100 |
---|---|---|
committer | Victor Näslund <victor@sunet.se> | 2022-11-13 17:49:45 +0100 |
commit | 563607809d993c9e496423829b1f93def22a4aac (patch) | |
tree | 6d7f3b7dc9466fa70750c67a661751f9d1fffa56 /src/collector/db.py | |
parent | 60029b36e9bdd773be923a3cd7d5d30170c669b3 (diff) |
more stuff
Diffstat (limited to 'src/collector/db.py')
-rw-r--r-- | src/collector/db.py | 143 |
1 files changed, 140 insertions, 3 deletions
diff --git a/src/collector/db.py b/src/collector/db.py index 3b16ef5..2f16e12 100644 --- a/src/collector/db.py +++ b/src/collector/db.py @@ -1,8 +1,12 @@ """Our database module""" +from typing import List, Dict, Optional, Any from time import sleep from sys import exit as app_exit from dataclasses import dataclass +from fastapi import HTTPException +from pydantic import BaseModel +from pymongo.errors import OperationFailure from motor.motor_asyncio import ( AsyncIOMotorClient, AsyncIOMotorCollection, @@ -10,6 +14,14 @@ from motor.motor_asyncio import ( from bson import ObjectId +class SearchInput(BaseModel): + """Handle search data for HTTP request""" + + search: Optional[Dict[str, Any]] + limit: int = 25 + skip: int = 0 + + @dataclass() class DBClient: """Class to hold database connections for us.""" @@ -18,22 +30,147 @@ class DBClient: collection: AsyncIOMotorCollection def __init__(self, username: str, password: str, collection: str) -> None: - self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000) + self.client = AsyncIOMotorClient( + f"mongodb://{username}:{password}@mongodb:27017/production", + maxConnecting=4, + timeoutMS=3000, + serverSelectionTimeoutMS=3000, + ) self.collection = self.client["production"][collection] - async def check_server(self) -> None: + async def startup(self) -> None: """Try query the DB and exit the program if we fail after 5 times. :return: None """ + for i in range(5): try: await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")}) print("Connection to DB - OK") break - except: # pylint: disable=bare-except + except Exception: # pylint: disable=braod-except print(f"WARNING failed to connect to DB - {i} / 4", flush=True) sleep(1) else: print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production") app_exit(1) + + async def find(self, search_data: SearchInput) -> Optional[List[Dict[str, Any]]]: + """Wrap the find() method, handling timeouts and return data type. + + :param search_data: Instance of SearchInput. + :return: Optional[List[Dict[str, Any]]] + """ + + data: List[Dict[str, Any]] = [] + cursor = self.collection.find(search_data.search) + + # Sort on timestamp + cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + + try: + async for document in cursor: + data.append(document) + + if data: + return data + + except OperationFailure as exc: + print(f"DB failed to process: {exc.details}") + raise HTTPException( + status_code=400, + detail="Probably wrong syntax, note the dictionary for find: " + + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", + ) from exc + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def find_one(self, object_id: ObjectId) -> Optional[Dict[str, Any]]: + """Wrap the find_one() method, handling timeouts and return data type. + + :param object_id: The object id to find. + :return: Optional[Dict[str, Any]] + """ + + try: + document = await self.collection.find_one({"_id": object_id}) + if isinstance(document, Dict): + return document + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def insert_one(self, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the insert_one() method, handling timeouts and return data type. + + :param data: The data to insert into the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.insert_one(data) + if isinstance(result.inserted_id, ObjectId) and len(str(result.inserted_id)) == 24: + return result.inserted_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def replace_one(self, object_id: ObjectId, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the replace_one() method, handling timeouts and return data type. + + :param object_id: The object id to replace. + :param data: The data to replace with. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.replace_one({"_id": object_id}, data) + if result.matched_count == 1: + return object_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def delete_one(self, object_id: ObjectId) -> Optional[ObjectId]: + """Wrap the delete_one() method, handling timeouts and return data type. + + :param object_id: The object id to delete from the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.delete_one({"_id": object_id}) + if result.deleted_count == 1: + return object_id + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def estimated_document_count(self) -> Optional[int]: + """Wrap the estimated_document_count() method, handling timeouts and return data type. + + :return: Optional[int] + """ + + try: + result = await self.collection.estimated_document_count() + if isinstance(result, int): + return result + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc |