diff options
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | data/collector_container/Dockerfile | 4 | ||||
-rw-r--r-- | data/init-mongodb.js | 4 | ||||
-rwxr-xr-x | data/mongodb_entrypoint.sh | 2 | ||||
-rwxr-xr-x | dev-run.sh | 28 | ||||
-rw-r--r-- | docker-compose.yml | 20 | ||||
-rw-r--r-- | src/collector/db.py | 143 | ||||
-rwxr-xr-x | src/collector/main.py | 89 |
8 files changed, 228 insertions, 65 deletions
@@ -4,6 +4,9 @@ demo/wsgi_demo.db *~ \#*\# +# secrets +.env + # Byte-compiled / optimized / DLL files .idea __pycache__/ diff --git a/data/collector_container/Dockerfile b/data/collector_container/Dockerfile index e02a5d2..a9bb4e5 100644 --- a/data/collector_container/Dockerfile +++ b/data/collector_container/Dockerfile @@ -29,6 +29,10 @@ WORKDIR /app/ USER collector +# Add healthcheck +HEALTHCHECK --interval=30s --timeout=15s --retries=1 --start-period=30s \ + CMD sh healthcheck.sh || bash -c 'kill -s 15 1 && (sleep 7; kill -s 9 1)' + ENTRYPOINT ["uvicorn", "src.collector.main:app", "--host", "0.0.0.0", "--workers", "1", "--header", "server:collector"] diff --git a/data/init-mongodb.js b/data/init-mongodb.js index 4b64674..6057d84 100644 --- a/data/init-mongodb.js +++ b/data/init-mongodb.js @@ -4,7 +4,7 @@ disableTelemetry() // Create the DB by inserting some data -db.v0.insertOne({init_key: "init_data"}) +db.REPLACE_COLLECTION.insertOne({init_key: "init_data"}) // Create user db.createUser( @@ -21,7 +21,7 @@ db.createUser( ) // Delete the init data -db.v0.deleteOne({init_key: "init_data"}) +db.REPLACE_COLLECTION.deleteOne({init_key: "init_data"}) // Disable the ad about monitoring db.disableFreeMonitoring() diff --git a/data/mongodb_entrypoint.sh b/data/mongodb_entrypoint.sh index 3db507a..7a81abc 100755 --- a/data/mongodb_entrypoint.sh +++ b/data/mongodb_entrypoint.sh @@ -8,7 +8,9 @@ then cp /init-mongodb.js /data/db/init-mongodb.js sed -i "s/REPLACE_USERNAME/$MONGODB_USERNAME/g" /data/db/init-mongodb.js sed -i "s/REPLACE_PASSWORD/$MONGODB_PASSWORD/g" /data/db/init-mongodb.js + sed -i "s/REPLACE_COLLECTION/$MONGODB_COLLECTION/g" /data/db/init-mongodb.js + # Update and shutdown our DB with changes /usr/bin/mongosh localhost:27015/production /data/db/init-mongodb.js sleep 1 # Allow DB to shutdown /usr/bin/touch /data/db/user_exist @@ -12,47 +12,51 @@ docker-compose -f docker-compose.yml build sudo chown -R 101 data/mongodb_data docker-compose -f docker-compose.yml up -d -sleep 2 +sleep 3 echo echo -curl --data-binary @data/example_data_3.json http://localhost:8000/sc/v0 +curl -v --data-binary @data/example_data_3.json http://127.0.0.1:8000/sc/v0 echo echo -curl -X DELETE http://localhost:8000/sc/v0/63702570e004d2b0b2254d27 +curl -v -X DELETE http://127.0.0.1:8000/sc/v0/63702570e004d2b0b2254d27 echo echo -curl -X DELETE http://localhost:8000/sc/v0/63702570e004d2b0b2254d27 +curl -v -X DELETE http://127.0.0.1:8000/sc/v0/63702570e004d2b0b2254d27 echo echo -curl -d '{"search": {"port": {"$lt": 4}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": {"$lt": 4}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"port": 112}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": 112}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"port": {"$gt": 4}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": {"$gt": 4}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"port": 111}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": 111}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"port": {"sdfsf": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": {"sdfsf": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"port": {"$sdfsf": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"port": {"$sdfsf": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo -curl -d '{"search": {"portfdv": {"$asa": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search +curl -v -d '{"search": {"portfdv": {"$asa": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search echo echo echo echo -curl -X PUT --data-binary @data/example_data_3_replace_test.json http://localhost:8000/sc/v0 +curl -v -X PUT --data-binary @data/example_data_3_replace_test.json http://127.0.0.1:8000/sc/v0 + +echo +echo +curl -v http://127.0.0.1:8000/info # bash quickstart.sh -b || exit 1 diff --git a/docker-compose.yml b/docker-compose.yml index 729a1ec..47760ef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,16 @@ services: - MONGODB_COLLECTION depends_on: - mongodb + restart: always + cap_drop: [ALL] + security_opt: + - no-new-privileges:true read_only: true + mem_limit: "256m" + mem_reservation: "128m" + cpus: "1.75" + tmpfs: + - /dev/shm:ro,noexec,nosuid,nodev mongodb: build: @@ -23,7 +32,16 @@ services: environment: - MONGODB_USERNAME - MONGODB_PASSWORD + - MONGODB_COLLECTION volumes: - ./data/mongodb_data:/data/db + restart: always read_only: true - + cap_drop: [ALL] + security_opt: + - no-new-privileges:true + mem_limit: "1g" + mem_reservation: "200m" + cpus: "1.75" + tmpfs: + - /dev/shm:ro,noexec,nosuid,nodev diff --git a/src/collector/db.py b/src/collector/db.py index 3b16ef5..2f16e12 100644 --- a/src/collector/db.py +++ b/src/collector/db.py @@ -1,8 +1,12 @@ """Our database module""" +from typing import List, Dict, Optional, Any from time import sleep from sys import exit as app_exit from dataclasses import dataclass +from fastapi import HTTPException +from pydantic import BaseModel +from pymongo.errors import OperationFailure from motor.motor_asyncio import ( AsyncIOMotorClient, AsyncIOMotorCollection, @@ -10,6 +14,14 @@ from motor.motor_asyncio import ( from bson import ObjectId +class SearchInput(BaseModel): + """Handle search data for HTTP request""" + + search: Optional[Dict[str, Any]] + limit: int = 25 + skip: int = 0 + + @dataclass() class DBClient: """Class to hold database connections for us.""" @@ -18,22 +30,147 @@ class DBClient: collection: AsyncIOMotorCollection def __init__(self, username: str, password: str, collection: str) -> None: - self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000) + self.client = AsyncIOMotorClient( + f"mongodb://{username}:{password}@mongodb:27017/production", + maxConnecting=4, + timeoutMS=3000, + serverSelectionTimeoutMS=3000, + ) self.collection = self.client["production"][collection] - async def check_server(self) -> None: + async def startup(self) -> None: """Try query the DB and exit the program if we fail after 5 times. :return: None """ + for i in range(5): try: await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")}) print("Connection to DB - OK") break - except: # pylint: disable=bare-except + except Exception: # pylint: disable=braod-except print(f"WARNING failed to connect to DB - {i} / 4", flush=True) sleep(1) else: print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production") app_exit(1) + + async def find(self, search_data: SearchInput) -> Optional[List[Dict[str, Any]]]: + """Wrap the find() method, handling timeouts and return data type. + + :param search_data: Instance of SearchInput. + :return: Optional[List[Dict[str, Any]]] + """ + + data: List[Dict[str, Any]] = [] + cursor = self.collection.find(search_data.search) + + # Sort on timestamp + cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + + try: + async for document in cursor: + data.append(document) + + if data: + return data + + except OperationFailure as exc: + print(f"DB failed to process: {exc.details}") + raise HTTPException( + status_code=400, + detail="Probably wrong syntax, note the dictionary for find: " + + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", + ) from exc + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def find_one(self, object_id: ObjectId) -> Optional[Dict[str, Any]]: + """Wrap the find_one() method, handling timeouts and return data type. + + :param object_id: The object id to find. + :return: Optional[Dict[str, Any]] + """ + + try: + document = await self.collection.find_one({"_id": object_id}) + if isinstance(document, Dict): + return document + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def insert_one(self, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the insert_one() method, handling timeouts and return data type. + + :param data: The data to insert into the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.insert_one(data) + if isinstance(result.inserted_id, ObjectId) and len(str(result.inserted_id)) == 24: + return result.inserted_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def replace_one(self, object_id: ObjectId, data: Dict[str, Any]) -> Optional[ObjectId]: + """Wrap the replace_one() method, handling timeouts and return data type. + + :param object_id: The object id to replace. + :param data: The data to replace with. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.replace_one({"_id": object_id}, data) + if result.matched_count == 1: + return object_id + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + async def delete_one(self, object_id: ObjectId) -> Optional[ObjectId]: + """Wrap the delete_one() method, handling timeouts and return data type. + + :param object_id: The object id to delete from the DB. + :return: Optional[ObjectId] + """ + + try: + result = await self.collection.delete_one({"_id": object_id}) + if result.deleted_count == 1: + return object_id + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc + + return None + + async def estimated_document_count(self) -> Optional[int]: + """Wrap the estimated_document_count() method, handling timeouts and return data type. + + :return: Optional[int] + """ + + try: + result = await self.collection.estimated_document_count() + if isinstance(result, int): + return result + return None + + except BaseException as exc: + print(f"DB connection failed: {exc}") + raise HTTPException(status_code=500, detail="DB connection failed") from exc diff --git a/src/collector/main.py b/src/collector/main.py index 096b788..36a8ebd 100755 --- a/src/collector/main.py +++ b/src/collector/main.py @@ -1,21 +1,21 @@ """Our main module""" -from typing import Dict, Optional, List, Any from os import environ -import asyncio -import sys +from asyncio import get_running_loop +from sys import exit as app_exit from json.decoder import JSONDecodeError from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from pydantic import BaseModel -from pymongo.errors import OperationFailure from bson import ( ObjectId, json_util, ) from dotenv import load_dotenv -from .db import DBClient +from .db import ( + DBClient, + SearchInput, +) from .schema import valid_schema @@ -23,14 +23,14 @@ load_dotenv() # Get credentials if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ: print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env") - sys.exit(1) + app_exit(1) # Create DB object db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"]) # Check DB -loop = asyncio.get_running_loop() -startup_task = loop.create_task(db.check_server()) +loop = get_running_loop() +startup_task = loop.create_task(db.startup()) app = FastAPI() @@ -42,14 +42,6 @@ app = FastAPI() # return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400) -class SearchInput(BaseModel): - """Handle search data for HTTP request""" - - search: Optional[Dict[str, Any]] - limit: int = 25 - skip: int = 0 - - @app.post("/sc/v0/search") async def search(search_data: SearchInput) -> JSONResponse: """/sc/v0/search, POST method @@ -57,26 +49,9 @@ async def search(search_data: SearchInput) -> JSONResponse: :param search_data: The search data. :return: JSONResponse """ - data: List[Dict[str, Any]] = [] - - cursor = db.collection.find(search_data.search) - cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + data = await db.find(search_data) - try: - async for document in cursor: - data.append(document) - except OperationFailure as exc: - print(f"DB failed to process: {exc.details}") - return JSONResponse( - content={ - "status": "error", - "message": "Probably wrong syntax, note the dictionary for find: " - + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for", - }, - status_code=400, - ) - - if not data: + if data is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)}) @@ -98,12 +73,16 @@ async def create(request: Request) -> JSONResponse: if not valid_schema(json_data): return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400) - result = await db.collection.insert_one(json_data) - return JSONResponse(content={"status": "success", "key": str(result.inserted_id)}) + key = await db.insert_one(json_data) + + if key is None: + return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400) + + return JSONResponse(content={"status": "success", "key": str(key)}) @app.put("/sc/v0") -async def update(request: Request) -> JSONResponse: +async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many-return-statements """/sc/v0, PUT method :param request: The request where we get the json body. @@ -129,7 +108,8 @@ async def update(request: Request) -> JSONResponse: return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400) # Ensure the updating key exist - document = await db.collection.find_one({"_id": object_id}) + document = await db.find_one(object_id) + if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) @@ -140,19 +120,23 @@ async def update(request: Request) -> JSONResponse: # Replace the data json_data["_id"] = object_id - await db.collection.replace_one({"_id": object_id}, json_data) + returned_object_id = await db.replace_one(object_id, json_data) + + if returned_object_id is None: + return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400) + return JSONResponse(content={"status": "success", "key": str(object_id)}) @app.get("/sc/v0/{key}") async def get(key: str) -> JSONResponse: - """/sc/v0, POST method + """/sc/v0, GET method :param key: The document key in the database. :return: JSONResponse """ - document = await db.collection.find_one({"_id": ObjectId(key)}) + document = await db.find_one(ObjectId(key)) if document is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) @@ -162,14 +146,25 @@ async def get(key: str) -> JSONResponse: @app.delete("/sc/v0/{key}") async def delete(key: str) -> JSONResponse: - """/sc/v0, POST method + """/sc/v0, DELETE method :param key: The document key in the database. :return: JSONResponse """ - result = await db.collection.delete_one({"_id": ObjectId(key)}) + result = await db.delete_one(ObjectId(key)) - if result.deleted_count == 0: + if result is None: return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400) - return JSONResponse(content={"status": "success", "key": key}) + return JSONResponse(content={"status": "success", "key": str(key)}) + + +@app.get("/info") +async def info() -> JSONResponse: + """/info, GET method + + :return: JSONResponse + """ + + count = await db.estimated_document_count() + return JSONResponse(content={"status": "success", "Estimated document count": count}) |