"""Our main module""" from typing import Dict, Optional, List, Any from os import environ import asyncio import sys from json.decoder import JSONDecodeError from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from pymongo.errors import OperationFailure from bson import ( ObjectId, json_util, ) from dotenv import load_dotenv from .db import DBClient from .schema import valid_schema load_dotenv() # Get credentials if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ: print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env") sys.exit(1) # Create DB object db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"]) # Check DB loop = asyncio.get_running_loop() startup_task = loop.create_task(db.check_server()) app = FastAPI() # @app.exception_handler(RuntimeError) # def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: # print(exc, flush=True) # return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) # return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400) class SearchInput(BaseModel): """Handle search data for HTTP request""" search: Optional[Dict[str, Any]] limit: int = 25 skip: int = 0 @app.post("/sc/v0/search") async def search(search_data: SearchInput) -> JSONResponse: """/sc/v0/search, POST method :param search_data: The search data. :return: JSONResponse """ data: List[Dict[str, Any]] = [] cursor = db.collection.find(search_data.search) cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) try: async for document in cursor: data.append(document) except OperationFailure as exc: print(f"DB failed to process: {exc.details}") return JSONResponse( content={ "status": "error", "message": "Probably wrong syntax, note the dictionary for find: " + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", }, status_code=400, ) if not data: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)}) @app.post("/sc/v0") async def create(request: Request) -> JSONResponse: """/sc/v0, POST method :param request: The request where we get the json body. :return: JSONResponse """ try: json_data = await request.json() except JSONDecodeError: return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400) if not valid_schema(json_data): return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) result = await db.collection.insert_one(json_data) return JSONResponse(content={"status": "success", "key": str(result.inserted_id)}) @app.put("/sc/v0") async def update(request: Request) -> JSONResponse: """/sc/v0, PUT method :param request: The request where we get the json body. :return: JSONResponse """ try: json_data = await request.json() except JSONDecodeError: return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400) if "_id" not in json_data: return JSONResponse(content={"status": "error", "message": "Missing key '_id'"}, status_code=400) # Get the key if isinstance(json_data["_id"], str): object_id = ObjectId(json_data["_id"]) elif ( isinstance(json_data["_id"], dict) and "$oid" in json_data["_id"] and isinstance(json_data["_id"]["$oid"], str) ): object_id = ObjectId(json_data["_id"]["$oid"]) else: return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400) # Ensure the updating key exist document = await db.collection.find_one({"_id": object_id}) if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) # Ensure valid schema del json_data["_id"] if not valid_schema(json_data): return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) # Replace the data json_data["_id"] = object_id await db.collection.replace_one({"_id": object_id}, json_data) return JSONResponse(content={"status": "success", "key": str(object_id)}) @app.get("/sc/v0/{key}") async def get(key: str) -> JSONResponse: """/sc/v0, POST method :param key: The document key in the database. :return: JSONResponse """ document = await db.collection.find_one({"_id": ObjectId(key)}) if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) return JSONResponse(content={"status": "success", "docs": json_util.dumps(document)}) @app.delete("/sc/v0/{key}") async def delete(key: str) -> JSONResponse: """/sc/v0, POST method :param key: The document key in the database. :return: JSONResponse """ result = await db.collection.delete_one({"_id": ObjectId(key)}) if result.deleted_count == 0: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) return JSONResponse(content={"status": "success", "key": key})