summaryrefslogtreecommitdiff
path: root/src/collector/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/collector/main.py')
-rwxr-xr-xsrc/collector/main.py89
1 files changed, 42 insertions, 47 deletions
diff --git a/src/collector/main.py b/src/collector/main.py
index 096b788..36a8ebd 100755
--- a/src/collector/main.py
+++ b/src/collector/main.py
@@ -1,21 +1,21 @@
"""Our main module"""
-from typing import Dict, Optional, List, Any
from os import environ
-import asyncio
-import sys
+from asyncio import get_running_loop
+from sys import exit as app_exit
from json.decoder import JSONDecodeError
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
-from pydantic import BaseModel
-from pymongo.errors import OperationFailure
from bson import (
ObjectId,
json_util,
)
from dotenv import load_dotenv
-from .db import DBClient
+from .db import (
+ DBClient,
+ SearchInput,
+)
from .schema import valid_schema
@@ -23,14 +23,14 @@ load_dotenv()
# Get credentials
if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ:
print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env")
- sys.exit(1)
+ app_exit(1)
# Create DB object
db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"])
# Check DB
-loop = asyncio.get_running_loop()
-startup_task = loop.create_task(db.check_server())
+loop = get_running_loop()
+startup_task = loop.create_task(db.startup())
app = FastAPI()
@@ -42,14 +42,6 @@ app = FastAPI()
# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400)
-class SearchInput(BaseModel):
- """Handle search data for HTTP request"""
-
- search: Optional[Dict[str, Any]]
- limit: int = 25
- skip: int = 0
-
-
@app.post("/sc/v0/search")
async def search(search_data: SearchInput) -> JSONResponse:
"""/sc/v0/search, POST method
@@ -57,26 +49,9 @@ async def search(search_data: SearchInput) -> JSONResponse:
:param search_data: The search data.
:return: JSONResponse
"""
- data: List[Dict[str, Any]] = []
-
- cursor = db.collection.find(search_data.search)
- cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip)
+ data = await db.find(search_data)
- try:
- async for document in cursor:
- data.append(document)
- except OperationFailure as exc:
- print(f"DB failed to process: {exc.details}")
- return JSONResponse(
- content={
- "status": "error",
- "message": "Probably wrong syntax, note the dictionary for find: "
- + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
- },
- status_code=400,
- )
-
- if not data:
+ if data is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)})
@@ -98,12 +73,16 @@ async def create(request: Request) -> JSONResponse:
if not valid_schema(json_data):
return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
- result = await db.collection.insert_one(json_data)
- return JSONResponse(content={"status": "success", "key": str(result.inserted_id)})
+ key = await db.insert_one(json_data)
+
+ if key is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400)
+
+ return JSONResponse(content={"status": "success", "key": str(key)})
@app.put("/sc/v0")
-async def update(request: Request) -> JSONResponse:
+async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many-return-statements
"""/sc/v0, PUT method
:param request: The request where we get the json body.
@@ -129,7 +108,8 @@ async def update(request: Request) -> JSONResponse:
return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400)
# Ensure the updating key exist
- document = await db.collection.find_one({"_id": object_id})
+ document = await db.find_one(object_id)
+
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
@@ -140,19 +120,23 @@ async def update(request: Request) -> JSONResponse:
# Replace the data
json_data["_id"] = object_id
- await db.collection.replace_one({"_id": object_id}, json_data)
+ returned_object_id = await db.replace_one(object_id, json_data)
+
+ if returned_object_id is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400)
+
return JSONResponse(content={"status": "success", "key": str(object_id)})
@app.get("/sc/v0/{key}")
async def get(key: str) -> JSONResponse:
- """/sc/v0, POST method
+ """/sc/v0, GET method
:param key: The document key in the database.
:return: JSONResponse
"""
- document = await db.collection.find_one({"_id": ObjectId(key)})
+ document = await db.find_one(ObjectId(key))
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
@@ -162,14 +146,25 @@ async def get(key: str) -> JSONResponse:
@app.delete("/sc/v0/{key}")
async def delete(key: str) -> JSONResponse:
- """/sc/v0, POST method
+ """/sc/v0, DELETE method
:param key: The document key in the database.
:return: JSONResponse
"""
- result = await db.collection.delete_one({"_id": ObjectId(key)})
+ result = await db.delete_one(ObjectId(key))
- if result.deleted_count == 0:
+ if result is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
- return JSONResponse(content={"status": "success", "key": key})
+ return JSONResponse(content={"status": "success", "key": str(key)})
+
+
+@app.get("/info")
+async def info() -> JSONResponse:
+ """/info, GET method
+
+ :return: JSONResponse
+ """
+
+ count = await db.estimated_document_count()
+ return JSONResponse(content={"status": "success", "Estimated document count": count})