1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
"""Our main module"""
from typing import Dict, Optional, List, Any
from os import environ
import asyncio
import sys
from json.decoder import JSONDecodeError
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pymongo.errors import OperationFailure
from bson import (
ObjectId,
json_util,
)
from dotenv import load_dotenv
from .db import DBClient
from .schema import valid_schema
load_dotenv()
# Get credentials
if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ:
print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env")
sys.exit(1)
# Create DB object
db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"])
# Check DB
loop = asyncio.get_running_loop()
startup_task = loop.create_task(db.check_server())
app = FastAPI()
# @app.exception_handler(RuntimeError)
# def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse:
# print(exc, flush=True)
# return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400)
# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400)
class SearchInput(BaseModel):
"""Handle search data for HTTP request"""
search: Optional[Dict[str, Any]]
limit: int = 25
skip: int = 0
@app.post("/sc/v0/search")
async def search(search_data: SearchInput) -> JSONResponse:
"""/sc/v0/search, POST method
:param search_data: The search data.
:return: JSONResponse
"""
data: List[Dict[str, Any]] = []
cursor = db.collection.find(search_data.search)
cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip)
try:
async for document in cursor:
data.append(document)
except OperationFailure as exc:
print(f"DB failed to process: {exc.details}")
return JSONResponse(
content={
"status": "error",
"message": "Probably wrong syntax, note the dictionary for find: "
+ "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
},
status_code=400,
)
if not data:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
return JSONResponse(content={"status": "success", "docs": json_util.dumps(data)})
@app.post("/sc/v0")
async def create(request: Request) -> JSONResponse:
"""/sc/v0, POST method
:param request: The request where we get the json body.
:return: JSONResponse
"""
try:
json_data = await request.json()
except JSONDecodeError:
return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
if not valid_schema(json_data):
return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
result = await db.collection.insert_one(json_data)
return JSONResponse(content={"status": "success", "key": str(result.inserted_id)})
@app.put("/sc/v0")
async def update(request: Request) -> JSONResponse:
"""/sc/v0, PUT method
:param request: The request where we get the json body.
:return: JSONResponse
"""
try:
json_data = await request.json()
except JSONDecodeError:
return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
if "_id" not in json_data:
return JSONResponse(content={"status": "error", "message": "Missing key '_id'"}, status_code=400)
# Get the key
if isinstance(json_data["_id"], str):
object_id = ObjectId(json_data["_id"])
elif (
isinstance(json_data["_id"], dict) and "$oid" in json_data["_id"] and isinstance(json_data["_id"]["$oid"], str)
):
object_id = ObjectId(json_data["_id"]["$oid"])
else:
return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400)
# Ensure the updating key exist
document = await db.collection.find_one({"_id": object_id})
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
# Ensure valid schema
del json_data["_id"]
if not valid_schema(json_data):
return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
# Replace the data
json_data["_id"] = object_id
await db.collection.replace_one({"_id": object_id}, json_data)
return JSONResponse(content={"status": "success", "key": str(object_id)})
@app.get("/sc/v0/{key}")
async def get(key: str) -> JSONResponse:
"""/sc/v0, POST method
:param key: The document key in the database.
:return: JSONResponse
"""
document = await db.collection.find_one({"_id": ObjectId(key)})
if document is None:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
return JSONResponse(content={"status": "success", "docs": json_util.dumps(document)})
@app.delete("/sc/v0/{key}")
async def delete(key: str) -> JSONResponse:
"""/sc/v0, POST method
:param key: The document key in the database.
:return: JSONResponse
"""
result = await db.collection.delete_one({"_id": ObjectId(key)})
if result.deleted_count == 0:
return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=400)
return JSONResponse(content={"status": "success", "key": key})
|