summaryrefslogtreecommitdiff
path: root/src/main.py
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-01 01:55:25 +0100
committerVictor Näslund <victor@sunet.se>2022-11-01 01:55:25 +0100
commitffb26f4a81a9ca61c4105df037f7e1beb8dc5fb0 (patch)
tree41094f051edbf300a6cd2c2de8dfb8435bfc18a4 /src/main.py
parent1b836e78db2737ba5d1ae43da9828601a5a5c114 (diff)
initial fresh up
Diffstat (limited to 'src/main.py')
-rwxr-xr-xsrc/main.py45
1 files changed, 29 insertions, 16 deletions
diff --git a/src/main.py b/src/main.py
index 9de8eb8..2730b83 100755
--- a/src/main.py
+++ b/src/main.py
@@ -1,3 +1,4 @@
+from typing import Dict, Union, List, Any
import json
import os
import sys
@@ -48,7 +49,7 @@ else:
sys.exit(-1)
-def get_pubkey():
+def get_pubkey() -> str:
try:
if 'JWT_PUBKEY_PATH' in os.environ:
keypath = os.environ['JWT_PUBKEY_PATH']
@@ -64,12 +65,18 @@ def get_pubkey():
return pubkey
-def get_data(key=None, limit=25, skip=0, ip=None,
- port=None, asn=None, domain=None):
+
+def get_data(key: Union[int, None] = None,
+ limit: int = 25,
+ skip: int = 0,
+ ip: Union[str, None] = None,
+ port: Union[int, None] = None,
+ asn: Union[str, None] = None,
+ domain: Union[str, None] = None) -> List[Dict[str, Any]]:
if key:
- return db.get(key)
+ return [db.get(key)]
- selectors = dict()
+ selectors: Dict[str, Any] = {}
indexes = get_index_keys()
selectors['domain'] = domain
@@ -80,7 +87,7 @@ def get_data(key=None, limit=25, skip=0, ip=None,
if asn and 'asn' in indexes:
selectors['asn'] = asn
- data = db.search(**selectors, limit=limit, skip=skip)
+ data: List[Dict[str, Any]] = db.search(**selectors, limit=limit, skip=skip)
return data
@@ -96,21 +103,20 @@ def jwt_config():
@app.exception_handler(AuthJWTException)
-def authjwt_exception_handler(request: Request, exc: AuthJWTException):
+def authjwt_exception_handler(request: Request, exc: AuthJWTException) -> JSONResponse:
return JSONResponse(content={"status": "error", "message":
exc.message}, status_code=400)
@app.exception_handler(RuntimeError)
-def app_exception_handler(request: Request, exc: RuntimeError):
+def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse:
return JSONResponse(content={"status": "error", "message":
str(exc.with_traceback(None))},
status_code=400)
@app.get('/sc/v0/get')
-async def get(key=None, limit=25, skip=0, ip=None, port=None,
- asn=None, Authorize: AuthJWT = Depends()):
+async def get(key: Union[int, None] = None, limit: int = 25, skip: int = 0, ip: Union[str, None] = None, port: Union[int, None] = None, asn: Union[str, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse:
Authorize.jwt_required()
@@ -135,7 +141,7 @@ async def get(key=None, limit=25, skip=0, ip=None, port=None,
@app.get('/sc/v0/get/{key}')
-async def get_key(key=None, Authorize: AuthJWT = Depends()):
+async def get_key(key: Union[int, None] = None, Authorize: AuthJWT = Depends()) -> JSONResponse:
Authorize.jwt_required()
@@ -152,7 +158,10 @@ async def get_key(key=None, Authorize: AuthJWT = Depends()):
else:
allowed_domains = raw_jwt["read"]
- data = get_data(key)
+ data_list = get_data(key)
+
+ # Handle if missing
+ data = data_list[0]
if data and data["domain"] not in allowed_domains:
return JSONResponse(
@@ -166,8 +175,9 @@ async def get_key(key=None, Authorize: AuthJWT = Depends()):
return JSONResponse(content={"status": "success", "docs": data})
+# WHY IS AUTH OUTCOMMENTED???
@app.post('/sc/v0/add')
-async def add(data: Request, Authorize: AuthJWT = Depends()):
+async def add(data: Request, Authorize: AuthJWT = Depends()) -> JSONResponse:
# Authorize.jwt_required()
try:
@@ -196,7 +206,7 @@ async def add(data: Request, Authorize: AuthJWT = Depends()):
@app.delete('/sc/v0/delete/{key}')
-async def delete(key, Authorize: AuthJWT = Depends()):
+async def delete(key: int, Authorize: AuthJWT = Depends()) -> JSONResponse:
Authorize.jwt_required()
@@ -213,7 +223,10 @@ async def delete(key, Authorize: AuthJWT = Depends()):
else:
allowed_domains = raw_jwt["write"]
- data = get_data(key)
+ data_list = get_data(key)
+
+ # Handle if missing
+ data = data_list[0]
if data and data["domain"] not in allowed_domains:
return JSONResponse(
@@ -232,7 +245,7 @@ async def delete(key, Authorize: AuthJWT = Depends()):
return JSONResponse(content={"status": "success", "docs": data})
-def main(standalone=False):
+def main(standalone: bool = False):
if not standalone:
return app