summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-13 17:49:45 +0100
committerVictor Näslund <victor@sunet.se>2022-11-13 17:49:45 +0100
commit563607809d993c9e496423829b1f93def22a4aac (patch)
tree6d7f3b7dc9466fa70750c67a661751f9d1fffa56
parent60029b36e9bdd773be923a3cd7d5d30170c669b3 (diff)
more stuff
-rw-r--r--.gitignore3
-rw-r--r--data/collector_container/Dockerfile4
-rw-r--r--data/init-mongodb.js4
-rwxr-xr-xdata/mongodb_entrypoint.sh2
-rwxr-xr-xdev-run.sh28
-rw-r--r--docker-compose.yml20
-rw-r--r--src/collector/db.py143
-rwxr-xr-xsrc/collector/main.py89
8 files changed, 228 insertions, 65 deletions
diff --git a/.gitignore b/.gitignore
index 97bc330..0069e59 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,9 @@ demo/wsgi_demo.db
*~
\#*\#
+# secrets
+.env
+
# Byte-compiled / optimized / DLL files
.idea
__pycache__/
diff --git a/data/collector_container/Dockerfile b/data/collector_container/Dockerfile
index e02a5d2..a9bb4e5 100644
--- a/data/collector_container/Dockerfile
+++ b/data/collector_container/Dockerfile
@@ -29,6 +29,10 @@ WORKDIR /app/
USER collector
+# Add healthcheck
+HEALTHCHECK --interval=30s --timeout=15s --retries=1 --start-period=30s \
+ CMD sh healthcheck.sh || bash -c 'kill -s 15 1 && (sleep 7; kill -s 9 1)'
+
ENTRYPOINT ["uvicorn", "src.collector.main:app", "--host", "0.0.0.0", "--workers", "1", "--header", "server:collector"]
diff --git a/data/init-mongodb.js b/data/init-mongodb.js
index 4b64674..6057d84 100644
--- a/data/init-mongodb.js
+++ b/data/init-mongodb.js
@@ -4,7 +4,7 @@
disableTelemetry()
// Create the DB by inserting some data
-db.v0.insertOne({init_key: "init_data"})
+db.REPLACE_COLLECTION.insertOne({init_key: "init_data"})
// Create user
db.createUser(
@@ -21,7 +21,7 @@ db.createUser(
)
// Delete the init data
-db.v0.deleteOne({init_key: "init_data"})
+db.REPLACE_COLLECTION.deleteOne({init_key: "init_data"})
// Disable the ad about monitoring
db.disableFreeMonitoring()
diff --git a/data/mongodb_entrypoint.sh b/data/mongodb_entrypoint.sh
index 3db507a..7a81abc 100755
--- a/data/mongodb_entrypoint.sh
+++ b/data/mongodb_entrypoint.sh
@@ -8,7 +8,9 @@ then
cp /init-mongodb.js /data/db/init-mongodb.js
sed -i "s/REPLACE_USERNAME/$MONGODB_USERNAME/g" /data/db/init-mongodb.js
sed -i "s/REPLACE_PASSWORD/$MONGODB_PASSWORD/g" /data/db/init-mongodb.js
+ sed -i "s/REPLACE_COLLECTION/$MONGODB_COLLECTION/g" /data/db/init-mongodb.js
+ # Update and shutdown our DB with changes
/usr/bin/mongosh localhost:27015/production /data/db/init-mongodb.js
sleep 1 # Allow DB to shutdown
/usr/bin/touch /data/db/user_exist
diff --git a/dev-run.sh b/dev-run.sh
index df337d8..084e38b 100755
--- a/dev-run.sh
+++ b/dev-run.sh
@@ -12,47 +12,51 @@ docker-compose -f docker-compose.yml build
sudo chown -R 101 data/mongodb_data
docker-compose -f docker-compose.yml up -d
-sleep 2
+sleep 3
echo
echo
-curl --data-binary @data/example_data_3.json http://localhost:8000/sc/v0
+curl -v --data-binary @data/example_data_3.json http://127.0.0.1:8000/sc/v0
echo
echo
-curl -X DELETE http://localhost:8000/sc/v0/63702570e004d2b0b2254d27
+curl -v -X DELETE http://127.0.0.1:8000/sc/v0/63702570e004d2b0b2254d27
echo
echo
-curl -X DELETE http://localhost:8000/sc/v0/63702570e004d2b0b2254d27
+curl -v -X DELETE http://127.0.0.1:8000/sc/v0/63702570e004d2b0b2254d27
echo
echo
-curl -d '{"search": {"port": {"$lt": 4}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": {"$lt": 4}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"port": 112}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": 112}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"port": {"$gt": 4}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": {"$gt": 4}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"port": 111}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": 111}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"port": {"sdfsf": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": {"sdfsf": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"port": {"$sdfsf": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"port": {"$sdfsf": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
-curl -d '{"search": {"portfdv": {"$asa": 7}}}' -H 'Content-Type: application/json' http://localhost:8000/sc/v0/search
+curl -v -d '{"search": {"portfdv": {"$asa": 7}}}' -H 'Content-Type: application/json' http://127.0.0.1:8000/sc/v0/search
echo
echo
echo
echo
-curl -X PUT --data-binary @data/example_data_3_replace_test.json http://localhost:8000/sc/v0
+curl -v -X PUT --data-binary @data/example_data_3_replace_test.json http://127.0.0.1:8000/sc/v0
+
+echo
+echo
+curl -v http://127.0.0.1:8000/info
# bash quickstart.sh -b || exit 1
diff --git a/docker-compose.yml b/docker-compose.yml
index 729a1ec..47760ef 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -12,7 +12,16 @@ services:
- MONGODB_COLLECTION
depends_on:
- mongodb
+ restart: always
+ cap_drop: [ALL]
+ security_opt:
+ - no-new-privileges:true
read_only: true
+ mem_limit: "256m"
+ mem_reservation: "128m"
+ cpus: "1.75"
+ tmpfs:
+ - /dev/shm:ro,noexec,nosuid,nodev
mongodb:
build:
@@ -23,7 +32,16 @@ services:
environment:
- MONGODB_USERNAME
- MONGODB_PASSWORD
+ - MONGODB_COLLECTION
volumes:
- ./data/mongodb_data:/data/db
+ restart: always
read_only: true
-
+ cap_drop: [ALL]
+ security_opt:
+ - no-new-privileges:true
+ mem_limit: "1g"
+ mem_reservation: "200m"
+ cpus: "1.75"
+ tmpfs:
+ - /dev/shm:ro,noexec,nosuid,nodev
diff --git a/src/collector/db.py b/src/collector/db.py
index 3b16ef5..2f16e12 100644
--- a/src/collector/db.py
+++ b/src/collector/db.py
@@ -1,8 +1,12 @@
"""Our database module"""
+from typing import List, Dict, Optional, Any
from time import sleep
from sys import exit as app_exit
from dataclasses import dataclass
+from fastapi import HTTPException
+from pydantic import BaseModel
+from pymongo.errors import OperationFailure
from motor.motor_asyncio import (
AsyncIOMotorClient,
AsyncIOMotorCollection,
@@ -10,6 +14,14 @@ from motor.motor_asyncio import (
from bson import ObjectId
+class SearchInput(BaseModel):
+ """Handle search data for HTTP request"""
+
+ search: Optional[Dict[str, Any]]
+ limit: int = 25
+ skip: int = 0
+
+
@dataclass()
class DBClient:
"""Class to hold database connections for us."""
@@ -18,22 +30,147 @@ class DBClient:
collection: AsyncIOMotorCollection
def __init__(self, username: str, password: str, collection: str) -> None:
- self.client = AsyncIOMotorClient(f"mongodb://{username}:{password}@mongodb:27017/production", timeoutMS=2000)
+ self.client = AsyncIOMotorClient(
+ f"mongodb://{username}:{password}@mongodb:27017/production",
+ maxConnecting=4,
+ timeoutMS=3000,
+ serverSelectionTimeoutMS=3000,
+ )
self.collection = self.client["production"][collection]
- async def check_server(self) -> None:
+ async def startup(self) -> None:
"""Try query the DB and exit the program if we fail after 5 times.
:return: None
"""
+
for i in range(5):
try:
await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")})
print("Connection to DB - OK")
break
- except: # pylint: disable=bare-except
+ except Exception: # pylint: disable=braod-except
print(f"WARNING failed to connect to DB - {i} / 4", flush=True)
sleep(1)
else:
print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production")
app_exit(1)
+
+ async def find(self, search_data: SearchInput) -> Optional[List[Dict[str, Any]]]:
+ """Wrap the find() method, handling timeouts and return data type.
+
+ :param search_data: Instance of SearchInput.
+ :return: Optional[List[Dict[str, Any]]]
+ """
+
+ data: List[Dict[str, Any]] = []
+ cursor = self.collection.find(search_data.search)
+
+ # Sort on timestamp
+ cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip)
+
+ try:
+ async for document in cursor:
+ data.append(document)
+
+ if data:
+ return data
+
+ except OperationFailure as exc:
+ print(f"DB failed to process: {exc.details}")
+ raise HTTPException(
+ status_code=400,
+ detail="Probably wrong syntax, note the dictionary for find: "
+ + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
+ ) from exc
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ return None
+
+ async def find_one(self, object_id: ObjectId) -> Optional[Dict[str, Any]]:
+ """Wrap the find_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to find.
+ :return: Optional[Dict[str, Any]]
+ """
+
+ try:
+ document = await self.collection.find_one({"_id": object_id})
+ if isinstance(document, Dict):
+ return document
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def insert_one(self, data: Dict[str, Any]) -> Optional[ObjectId]:
+ """Wrap the insert_one() method, handling timeouts and return data type.
+
+ :param data: The data to insert into the DB.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.insert_one(data)
+ if isinstance(result.inserted_id, ObjectId) and len(str(result.inserted_id)) == 24:
+ return result.inserted_id
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def replace_one(self, object_id: ObjectId, data: Dict[str, Any]) -> Optional[ObjectId]:
+ """Wrap the replace_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to replace.
+ :param data: The data to replace with.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.replace_one({"_id": object_id}, data)
+ if result.matched_count == 1:
+ return object_id
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def delete_one(self, object_id: ObjectId) -> Optional[ObjectId]:
+ """Wrap the delete_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to delete from the DB.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.delete_one({"_id": object_id})
+ if result.deleted_count == 1:
+ return object_id
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ return None
+
+ async def estimated_document_count(self) -> Optional[int]:
+ """Wrap the estimated_document_count() method, handling timeouts and return data type.
+
+ :return: Optional[int]
+ """
+
+ try:
+ result = await self.collection.estimated_document_count()
+ if isinstance(result, int):
+ return result
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
diff --git a/src/collector/main.py b/src/collector/main.py
index 096b788..36a8ebd 100755
--- a/src/collector/main.py
+++ b/src/collector/main.py
@@ -1,21 +1,21 @@
"""Our main module"""
-from typing import Dict, Optional, List, Any
from os import environ
-import asyncio
-import sys
+from asyncio import get_running_loop
+from sys import exit as app_exit
from json.decoder import JSONDecodeError
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
-from pydantic import BaseModel
-from pymongo.errors import OperationFailure
from bson import (
ObjectId,
json_util,
)
from dotenv import load_dotenv
-from .db import DBClient
+from .db import (
+ DBClient,
+ SearchInput,
+)
from .schema import valid_schema
@@ -23,14 +23,14 @@ load_dotenv()
# Get credentials
if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ:
print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env")
- sys.exit(1)
+ app_exit(1)
# Create DB object
db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"])
# Check DB
-loop = asyncio.get_running_loop()
-startup_task = loop.create_task(db.check_server())
+loop = get_running_loop()
+startup_task = loop.create_task(db.startup())
app = FastAPI()
@@ -42,14 +42,6 @@ app = FastAPI()
# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400)
-class SearchInput(BaseModel):
- """Handle search data for HTTP request"""
-
- search: Optional[Dict[str, Any]]
- limit: int = 25
- skip: int = 0
-
-
@app.post("/sc/v0/search")
async def search(search_data: SearchInput) -> JSONResponse:
"""/sc/v0/search, POST method
@@ -57,26 +49,9 @@ async def search(search_data: SearchInput) -> JSONResponse:
:param search_data: The search data.
:return: JSONResponse
"""
- data: List[Dict[str, Any]] = []
-
- cursor = db.collection.find(search_data.search)
- cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip)
+ data = await db.find(search_data)
- try:
- async for document in cursor:
- data.append(document)
- except OperationFailure as exc:
- print(f"DB failed to process: {exc.details}")
- return JSONResponse(
- content={
- "status": "error",
- "message": "Probably wrong syntax, note the dictionary for find: "
- + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
- },
- status_code=400,
- )
-
- if not data:
+ if data is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)})
@@ -98,12 +73,16 @@ async def create(request: Request) -> JSONResponse:
if not valid_schema(json_data):
return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
- result = await db.collection.insert_one(json_data)
- return JSONResponse(content={"status": "success", "key": str(result.inserted_id)})
+ key = await db.insert_one(json_data)
+
+ if key is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400)
+
+ return JSONResponse(content={"status": "success", "key": str(key)})
@app.put("/sc/v0")
-async def update(request: Request) -> JSONResponse:
+async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many-return-statements
"""/sc/v0, PUT method
:param request: The request where we get the json body.
@@ -129,7 +108,8 @@ async def update(request: Request) -> JSONResponse:
return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400)
# Ensure the updating key exist
- document = await db.collection.find_one({"_id": object_id})
+ document = await db.find_one(object_id)
+
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
@@ -140,19 +120,23 @@ async def update(request: Request) -> JSONResponse:
# Replace the data
json_data["_id"] = object_id
- await db.collection.replace_one({"_id": object_id}, json_data)
+ returned_object_id = await db.replace_one(object_id, json_data)
+
+ if returned_object_id is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=400)
+
return JSONResponse(content={"status": "success", "key": str(object_id)})
@app.get("/sc/v0/{key}")
async def get(key: str) -> JSONResponse:
- """/sc/v0, POST method
+ """/sc/v0, GET method
:param key: The document key in the database.
:return: JSONResponse
"""
- document = await db.collection.find_one({"_id": ObjectId(key)})
+ document = await db.find_one(ObjectId(key))
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
@@ -162,14 +146,25 @@ async def get(key: str) -> JSONResponse:
@app.delete("/sc/v0/{key}")
async def delete(key: str) -> JSONResponse:
- """/sc/v0, POST method
+ """/sc/v0, DELETE method
:param key: The document key in the database.
:return: JSONResponse
"""
- result = await db.collection.delete_one({"_id": ObjectId(key)})
+ result = await db.delete_one(ObjectId(key))
- if result.deleted_count == 0:
+ if result is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
- return JSONResponse(content={"status": "success", "key": key})
+ return JSONResponse(content={"status": "success", "key": str(key)})
+
+
+@app.get("/info")
+async def info() -> JSONResponse:
+ """/info, GET method
+
+ :return: JSONResponse
+ """
+
+ count = await db.estimated_document_count()
+ return JSONResponse(content={"status": "success", "Estimated document count": count})