import uvicorn from fastapi import FastAPI, Depends, Request from fastapi.responses import JSONResponse from fastapi_jwt_auth import AuthJWT from fastapi_jwt_auth.exceptions import AuthJWTException from pydantic import BaseModel from index import CouchIindex from db import DictDB app = FastAPI() db = DictDB() public_key = """-----BEGIN PUBLIC KEY----- MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPW8bkkVIq4BX8eWwlUOUYbJhiGDv K/6xY5T0BsvV6pbMoIUfgeThVOq5I3CmXxLt+qyPska6ol9fTN7woZLsCg== -----END PUBLIC KEY-----""" def get_data(key=None, limit=25, skip=0, ip=None, port=None, asn=None): selectors = dict() indexes = CouchIindex().dict() selectors['domain'] = 'sunet.se' if ip and 'ip' in indexes: selectors['ip'] = ip if port and 'port' in indexes: selectors['port'] = port if asn and 'asn' in indexes: selectors['asn'] = asn data = db.search(**selectors, limit=limit, skip=skip) return JSONResponse(content={"status": "success", "data": data}) class JWTConfig(BaseModel): authjwt_algorithm: str = "ES256" authjwt_public_key: str = public_key @AuthJWT.load_config def jwt_config(): return JWTConfig() @app.exception_handler(AuthJWTException) def authjwt_exception_handler(request: Request, exc: AuthJWTException): return JSONResponse(content={"status": "error", "message": exc.message}, status_code=400) @app.exception_handler(RuntimeError) def app_exception_handler(request: Request, exc: RuntimeError): return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) @app.get('/sc/v0/get') async def get(key=None, limit=25, skip=0, ip=None, port=None, asn=None, Authorize: AuthJWT = Depends()): Authorize.jwt_required() return get_data(key, limit, skip, ip, port, asn) @app.get('/sc/v0/get/{key}') async def get_key(key=None, Authorize: AuthJWT = Depends()): Authorize.jwt_required() return get_data(key) @app.post('/sc/v0/add') async def add(data: Request, Authorize: AuthJWT = Depends()): Authorize.jwt_required() orgs = ['sunet.se'] if not orgs: return JSONResponse(content={"status": "error", "message": "Could not find an organization"}, status_code=400) json_data = await data.json() key = db.add(json_data) return JSONResponse(content={"status": "success", "docs": key}) def main(standalone=False): if not standalone: return app uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug") if __name__ == '__main__': main(standalone=True) else: app = main()