diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/collector/db.py | 143 | ||||
-rwxr-xr-x | src/collector/main.py | 89 |
2 files changed, 182 insertions, 50 deletions
diff --git a/src/collector/db.py b/src/collector/db.py index 3b16ef5..2f16e12 100644 --- a/src/collector/db.py +++ b/src/collector/db.py @@ -1,8 +1,12 @@ """Our database module""" +from typing import List, Dict, Optional, Any from time import sleep from sys import exit as app_exit from dataclasses import dataclass +from fastapi import HTTPException +from pydantic import BaseModel +from pymongo.errors import OperationFailure from motor.motor_asyncio import ( AsyncIOMotorClient, AsyncIOMotorCollection, @@ -10,6 +14,14 @@ from motor.motor_asyncio import ( from bson import ObjectId +class SearchInput(BaseModel): + """Handle search data for HTTP request""" + + search: Optional[Dict[str, Any]] + limit: int = 25 + skip: int = 0 + + @dataclass() class DBClient: """Class to hold database connections for us.""" @@ -18,22 +30,147 @@ class DBClient: collection: AsyncIOMotorCollection def __init__(self, username: str, password: str, collection: str) -> None: - self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000) + self.client = AsyncIOMotorClient( + f"mongodb://{username}:{password}@mongodb:27017/production", + maxConnecting=4, + timeoutMS=3000, + serverSelectionTimeoutMS=3000, + ) self.collection = self.client["production"][collection] - async def check_server(self) -> None: + async def startup(self) -> None: """Try query the DB and exit the program if we fail after 5 times. :return: None """ + for i in range(5): try: await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")}) print("Connection to DB - OK") break - except: # pylint: disable=bare-except + except Exception: # pylint: disable=braod-except print(f"WARNING failed to connect to DB - {i} / 4", flush=True) sleep(1) else: print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production") app_exit(1) + + async def find(self, search_data: SearchInput) -> Optional[List[Dict[str, Any]]]: + """Wrap the find() method, handling timeouts and return data type. + + :param search_data: Instance of SearchInput. + :return: Optional[List[Dict[str, Any]]] + """ + + data: List[Dict[str, Any]] = [] + cursor = self.collection.find(search_data.search) + + # Sort on timestamp + cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + + try: + async for document in cursor: + data.append(document) + + if data: + return data + + except OperationFailure as exc: + print(f"DB failed to process: {exc.details}") + raise HTTPException( + status_code=400, + detail="Probably wrong syntax, note the dictionary for find: " + + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", + ) from exc + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def find_one(self, object_id: ObjectId) -> Optional[Dict[str, Any]]: + """Wrap the find_one() method, handling timeouts and return data type. + + :param object_id: The object id to find. + :return: Optional[Dict[str, Any]] + """ + + try: + document = await self.collection.find_one({"_id": object_id}) + if isinstance(document, Dict): + return document + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def insert_one(self, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the insert_one() method, handling timeouts and return data type. + + :param data: The data to insert into the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.insert_one(data) + if isinstance(result.inserted_id, ObjectId) and len(str(result.inserted_id)) == 24: + return result.inserted_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def replace_one(self, object_id: ObjectId, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the replace_one() method, handling timeouts and return data type. + + :param object_id: The object id to replace. + :param data: The data to replace with. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.replace_one({"_id": object_id}, data) + if result.matched_count == 1: + return object_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def delete_one(self, object_id: ObjectId) -> Optional[ObjectId]: + """Wrap the delete_one() method, handling timeouts and return data type. + + :param object_id: The object id to delete from the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.delete_one({"_id": object_id}) + if result.deleted_count == 1: + return object_id + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def estimated_document_count(self) -> Optional[int]: + """Wrap the estimated_document_count() method, handling timeouts and return data type. + + :return: Optional[int] + """ + + try: + result = await self.collection.estimated_document_count() + if isinstance(result, int): + return result + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc diff --git a/src/collector/main.py b/src/collector/main.py index 096b788..36a8ebd 100755 --- a/src/collector/main.py +++ b/src/collector/main.py @@ -1,21 +1,21 @@ """Our main module""" -from typing import Dict, Optional, List, Any from os import environ -import asyncio -import sys +from asyncio import get_running_loop +from sys import exit as app_exit from json.decoder import JSONDecodeError from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from pydantic import BaseModel -from pymongo.errors import OperationFailure from bson import ( ObjectId, json_util, ) from dotenv import load_dotenv -from .db import DBClient +from .db import ( + DBClient, + SearchInput, +) from .schema import valid_schema @@ -23,14 +23,14 @@ load_dotenv() # Get credentials if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ: print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env") - sys.exit(1) + app_exit(1) # Create DB object db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"]) # Check DB -loop = asyncio.get_running_loop() -startup_task = loop.create_task(db.check_server()) +loop = get_running_loop() +startup_task = loop.create_task(db.startup()) app = FastAPI() @@ -42,14 +42,6 @@ app = FastAPI() # return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400) -class SearchInput(BaseModel): - """Handle search data for HTTP request""" - - search: Optional[Dict[str, Any]] - limit: int = 25 - skip: int = 0 - - @app.post("/sc/v0/search") async def search(search_data: SearchInput) -> JSONResponse: """/sc/v0/search, POST method @@ -57,26 +49,9 @@ async def search(search_data: SearchInput) -> JSONResponse: :param search_data: The search data. :return: JSONResponse """ - data: List[Dict[str, Any]] = [] - - cursor = db.collection.find(search_data.search) - cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + data = await db.find(search_data) - try: - async for document in cursor: - data.append(document) - except OperationFailure as exc: - print(f"DB failed to process: {exc.details}") - return JSONResponse( - content={ - "status": "error", - "message": "Probably wrong syntax, note the dictionary for find: " - + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", - }, - status_code=400, - ) - - if not data: + if data is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)}) @@ -98,12 +73,16 @@ async def create(request: Request) -> JSONResponse: if not valid_schema(json_data): return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) - result = await db.collection.insert_one(json_data) - return JSONResponse(content={"status": "success", "key": str(result.inserted_id)}) + key = await db.insert_one(json_data) + + if key is None: + return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400) + + return JSONResponse(content={"status": "success", "key": str(key)}) @app.put("/sc/v0") -async def update(request: Request) -> JSONResponse: +async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many-return-statements """/sc/v0, PUT method :param request: The request where we get the json body. @@ -129,7 +108,8 @@ async def update(request: Request) -> JSONResponse: return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400) # Ensure the updating key exist - document = await db.collection.find_one({"_id": object_id}) + document = await db.find_one(object_id) + if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) @@ -140,19 +120,23 @@ async def update(request: Request) -> JSONResponse: # Replace the data json_data["_id"] = object_id - await db.collection.replace_one({"_id": object_id}, json_data) + returned_object_id = await db.replace_one(object_id, json_data) + + if returned_object_id is None: + return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400) + return JSONResponse(content={"status": "success", "key": str(object_id)}) @app.get("/sc/v0/{key}") async def get(key: str) -> JSONResponse: - """/sc/v0, POST method + """/sc/v0, GET method :param key: The document key in the database. :return: JSONResponse """ - document = await db.collection.find_one({"_id": ObjectId(key)}) + document = await db.find_one(ObjectId(key)) if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) @@ -162,14 +146,25 @@ async def get(key: str) -> JSONResponse: @app.delete("/sc/v0/{key}") async def delete(key: str) -> JSONResponse: - """/sc/v0, POST method + """/sc/v0, DELETE method :param key: The document key in the database. :return: JSONResponse """ - result = await db.collection.delete_one({"_id": ObjectId(key)}) + result = await db.delete_one(ObjectId(key)) - if result.deleted_count == 0: + if result is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) - return JSONResponse(content={"status": "success", "key": key}) + return JSONResponse(content={"status": "success", "key": str(key)}) + + +@app.get("/info") +async def info() -> JSONResponse: + """/info, GET method + + :return: JSONResponse + """ + + count = await db.estimated_document_count() + return JSONResponse(content={"status": "success", "Estimated document count": count}) |