diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/soc_collector/__init__.py (renamed from src/collector/__init__.py) | 0 | ||||
-rw-r--r-- | src/soc_collector/auth.py | 46 | ||||
-rw-r--r-- | src/soc_collector/db.py (renamed from src/collector/db.py) | 8 | ||||
-rw-r--r-- | src/soc_collector/healthcheck.py (renamed from src/collector/healthcheck.py) | 4 | ||||
-rwxr-xr-x | src/soc_collector/healthcheck.sh (renamed from src/collector/healthcheck.sh) | 2 | ||||
-rwxr-xr-x | src/soc_collector/main.py (renamed from src/collector/main.py) | 47 | ||||
-rw-r--r-- | src/soc_collector/py.typed (renamed from src/collector/py.typed) | 0 | ||||
-rw-r--r-- | src/soc_collector/schema.py (renamed from src/collector/schema.py) | 0 | ||||
-rw-r--r-- | src/soc_collector/soc_collector_cli.py | 259 |
9 files changed, 344 insertions, 22 deletions
diff --git a/src/collector/__init__.py b/src/soc_collector/__init__.py index 6530fdd..6530fdd 100644 --- a/src/collector/__init__.py +++ b/src/soc_collector/__init__.py diff --git a/src/soc_collector/auth.py b/src/soc_collector/auth.py new file mode 100644 index 0000000..4aacb52 --- /dev/null +++ b/src/soc_collector/auth.py @@ -0,0 +1,46 @@ +"""Auth module""" +from typing import List +from fastapi import Request, HTTPException + + +def load_api_keys(path: str) -> List[str]: + """Load API keys from file + + :param path: Path to API keys file. + :return: List[str] + """ + keys: List[str] = [] + + with open(path, encoding="utf-8") as f_data: + key_file = f_data.readlines() + + for line in key_file: + if line[0] == "#" or len(line) < 3: + continue + + key = line.split(";")[0] + keys.append(key) + + return keys + + +def authorize_client(request: Request, api_keys: List[str]) -> None: + """Authorize a client request. + + Parameters: + request (Request): The HTTP request. + api_keys (List[str]): List of accepted api_keys. + + Raises HTTPException with status_code=401 if authorize fail. + + Returns: + None + """ + + if "API-KEY" not in request.headers: + raise HTTPException(status_code=401, detail="API-KEY header missing") + + request_key = request.headers["API-KEY"].strip() + + if request_key not in api_keys: + raise HTTPException(status_code=401, detail="API-KEY header invalid") diff --git a/src/collector/db.py b/src/soc_collector/db.py index 641e5da..d601a82 100644 --- a/src/collector/db.py +++ b/src/soc_collector/db.py @@ -7,6 +7,10 @@ from dataclasses import dataclass from fastapi import HTTPException from pydantic import BaseModel from pymongo.errors import OperationFailure +from pymongo import ( + ASCENDING, + DESCENDING, +) from motor.motor_asyncio import ( AsyncIOMotorClient, AsyncIOMotorCollection, @@ -66,9 +70,7 @@ class DBClient: data: List[Dict[str, Any]] = [] cursor = self.collection.find(search_data.search) - # Sort on timestamp - # TODO: Also sort on IP as well - cursor.sort("timestamp", -1).limit(search_data.limit).skip(search_data.skip) + cursor.sort({"ip": ASCENDING, "timestamp": DESCENDING}).limit(search_data.limit).skip(search_data.skip) try: async for document in cursor: diff --git a/src/collector/healthcheck.py b/src/soc_collector/healthcheck.py index 3091d98..e561fa0 100644 --- a/src/collector/healthcheck.py +++ b/src/soc_collector/healthcheck.py @@ -16,9 +16,9 @@ def check_collector() -> bool: time.sleep(2) # Prevent race condition with redis container healthcheck req = requests.get( - "http://localhost:8000/info", + "https://localhost:8000/info", timeout=3, - # TODO: verify="./rootCA.crt", + verify="./collector_root_ca.crt", ) if req.status_code != 200: diff --git a/src/collector/healthcheck.sh b/src/soc_collector/healthcheck.sh index aacc906..10d599c 100755 --- a/src/collector/healthcheck.sh +++ b/src/soc_collector/healthcheck.sh @@ -8,4 +8,4 @@ then fi # If collector -/usr/bin/python3 ./src/collector/healthcheck.py "$1" || exit 1 +/usr/bin/python3 ./src/soc_collector/healthcheck.py "$1" || exit 1 diff --git a/src/collector/main.py b/src/soc_collector/main.py index a2d55c7..eb6041f 100755 --- a/src/collector/main.py +++ b/src/soc_collector/main.py @@ -6,16 +6,13 @@ from json.decoder import JSONDecodeError from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from bson import ( - ObjectId, - json_util, -) +from bson import ObjectId from .db import ( DBClient, SearchInput, ) from .schema import valid_schema - +from .auth import authorize_client, load_api_keys # Get credentials if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ: @@ -29,23 +26,25 @@ db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ[ loop = get_running_loop() startup_task = loop.create_task(db.startup()) -app = FastAPI() - +# Load API keys +API_KEYS = load_api_keys("./api_keys.txt") -# @app.exception_handler(RuntimeError) -# def app_exception_handler(request: Request, exc: RuntimeError) -> JSONResponse: -# print(exc, flush=True) -# return JSONResponse(content={"status": "error", "message": str(exc.with_traceback(None))}, status_code=400) -# return JSONResponse(content={"status": "error", "message": "Error during processing"}, status_code=400) +# Disable redoc and swagger endpoints +app = FastAPI(docs_url=None, redoc_url=None) @app.post("/sc/v0/search") -async def search(search_data: SearchInput) -> JSONResponse: +async def search(request: Request, search_data: SearchInput) -> JSONResponse: """/sc/v0/search, POST method + :param request: The client request. :param search_data: The search data. :return: JSONResponse """ + + # Ensure authorization + authorize_client(request, API_KEYS) + data = await db.find(search_data) return JSONResponse(content={"status": "success", "docs": data}) @@ -59,6 +58,9 @@ async def create(request: Request) -> JSONResponse: :return: JSONResponse """ + # Ensure authorization + authorize_client(request, API_KEYS) + try: json_data = await request.json() except JSONDecodeError: @@ -83,6 +85,9 @@ async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many :return: JSONResponse """ + # Ensure authorization + authorize_client(request, API_KEYS) + try: json_data = await request.json() except JSONDecodeError: @@ -123,13 +128,16 @@ async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many @app.get("/sc/v0/{key}") -async def get(key: str) -> JSONResponse: +async def get(request: Request, key: str) -> JSONResponse: """/sc/v0/{key}, GET method :param key: The document key in the database. :return: JSONResponse """ + # Ensure authorization + authorize_client(request, API_KEYS) + document = await db.find_one(ObjectId(key)) if document is None: @@ -139,12 +147,16 @@ async def get(key: str) -> JSONResponse: @app.delete("/sc/v0/{key}") -async def delete(key: str) -> JSONResponse: +async def delete(request: Request, key: str) -> JSONResponse: """/sc/v0/{key}, DELETE method :param key: The document key in the database. :return: JSONResponse """ + + # Ensure authorization + authorize_client(request, API_KEYS) + result = await db.delete_one(ObjectId(key)) if result is None: @@ -154,12 +166,15 @@ async def delete(key: str) -> JSONResponse: @app.get("/info") -async def info() -> JSONResponse: +async def info(request: Request) -> JSONResponse: """/info, GET method :return: JSONResponse """ + # Ensure authorization + authorize_client(request, API_KEYS) + count = await db.estimated_document_count() if count is None: diff --git a/src/collector/py.typed b/src/soc_collector/py.typed index e69de29..e69de29 100644 --- a/src/collector/py.typed +++ b/src/soc_collector/py.typed diff --git a/src/collector/schema.py b/src/soc_collector/schema.py index 221990a..221990a 100644 --- a/src/collector/schema.py +++ b/src/soc_collector/schema.py diff --git a/src/soc_collector/soc_collector_cli.py b/src/soc_collector/soc_collector_cli.py new file mode 100644 index 0000000..e32b5ad --- /dev/null +++ b/src/soc_collector/soc_collector_cli.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +"""Our database module""" +from os.path import isfile +from os import environ +from typing import Dict, Any +from argparse import ArgumentParser, RawTextHelpFormatter +from sys import exit as app_exit +import json +import requests + +if "COLLECTOR_API_KEY" not in environ: + print("Missing 'COLLECTOR_API_KEY' in environment") + app_exit(1) +API_KEY = environ["COLLECTOR_API_KEY"] + +API_URL = "https://collector-dev.soc.sunet.se:8000" +ROOT_CA_FILE = __file__.replace("soc_collector_cli.py", "data/collector_root_ca.crt") + + +def json_load_data(data: str) -> Dict[str, Any]: + """Load json from argument, json data or path to json file + + :param data: String with either json or path to a json file. + :return: json dict as a Dict[str, Any]. + """ + + ret: Dict[str, Any] + try: + if isfile(data): + with open(data, encoding="utf-8") as f_data: + ret = json.loads(f_data.read()) + else: + ret = json.loads(data) + return ret + + except Exception: # pylint: disable=broad-except + print(f"No such file or invalid json data: {data}") + app_exit(1) + + +def info_action() -> None: + """Get database info, currently number of documents.""" + + req = requests.get(f"{API_URL}/info", headers={"API-KEY": API_KEY}, timeout=5) + + # Ensure ok status + req.raise_for_status() + + # Print data + json_data = json.loads(req.text) + print(f"Estimated document count: {json_data['Estimated document count']}") + + +def search_action(data: str) -> None: + """Search for documents in the database. + + :param data: String with either json or path to a json file. + """ + + search_data = json_load_data(data) + + req = requests.post(f"{API_URL}/sc/v0/search", headers={"API-KEY": API_KEY}, json=search_data, timeout=5) + + # Ensure ok status + req.raise_for_status() + + # Print data + json_data = json.loads(req.text) + print(json.dumps(json_data["docs"], indent=4)) + + +def delete_action(data: str) -> None: + """Delete a document in the DB. + + :param data: key or path to a json file containing "_id". + """ + + if isfile(data): + data = json_load_data(data)["_id"] + req = requests.delete(f"{API_URL}/sc/v0/{data}", headers={"API-KEY": API_KEY}, timeout=5) + + # Check status + if req.status_code == 404: + print("ERROR: Document not found") + app_exit(1) + + # Ensure ok status + req.raise_for_status() + + print(f"Deleted data OK - key: {data}") + + +def update_local_action(data: str, update_data: str) -> None: + """Update keys and their data in this document. Does not modify the database. + + :param data: json blob or path to json file. + :param update_data: json blob or path to json file. + """ + + json_data = json_load_data(data) + json_update_data = json_load_data(update_data) + + json_data.update(json_update_data) + + # Print data + print(json.dumps(json_data, indent=4)) + + +def replace_action(data: str) -> None: + """Replace the entire document in the database with this document, "_id" must exist as a key. + + :param data: json blob or path to json file, "_id" key must exist. + """ + + json_data = json_load_data(data) + req = requests.put(f"{API_URL}/sc/v0", json=json_data, headers={"API-KEY": API_KEY}, timeout=5) + + # Check status + if req.status_code == 404: + print("ERROR: Document not found") + app_exit(1) + + # Ensure ok status + req.raise_for_status() + + json_data = json.loads(req.text) + print(f'Inserted data OK - key: {json_data["_id"]}') + + +def insert_action(data: str) -> None: + """Insert a new document into the database, "_id" must not exist in the document. + + :param data: json blob or path to json file, "_id" key must not exist. + """ + + json_data = json_load_data(data) + req = requests.post(f"{API_URL}/sc/v0", json=json_data, headers={"API-KEY": API_KEY}, timeout=5) + + # Ensure ok status + req.raise_for_status() + + data = json.loads(req.text) + print(f'Inserted data OK - key: {json_data["_id"]}') + + +def get_action(data: str) -> None: + """Get a document from the database. + + :param data: key or path to a json file containing "_id". + """ + if isfile(data): + data = json_load_data(data)["_id"] + req = requests.get(f"{API_URL}/sc/v0/{data}", headers={"API-KEY": API_KEY}, timeout=5) + + # Check status + if req.status_code == 404: + print("ERROR: Document not found") + app_exit(1) + + # Ensure ok status + req.raise_for_status() + + # Print data + json_data = json.loads(req.text) + print(json.dumps(json_data["doc"], indent=4)) + + +def main() -> None: + """Main function.""" + parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description="SOC Collector CLI") + parser.add_argument( + "action", + choices=["info", "search", "get", "insert", "replace", "update_local", "delete"], + help="""Action to take + + info: Show info about the database, currently number of documents. + + search: json blog OR path to file. skip defaults to 0 and limit to 25. + '{"search": {"port": 111, "ip": "192.0.2.28"}, "skip": 0, "limit": 25}' OR ./search_data.json + '{"search": {"asn_country_code": "SE", "result": {"$exists": "cve_2015_0002"}}}' + + get: key OR path to document using its "_id". + 637162378c92893fff92bf7e OR ./data.json + + insert: json blob OR path to file. Document MUST NOT contain "_id". + '{json_blob_here...}' OR ./data.json + + replace: json blob OR path to file. Document MUST contain "_id". + '{json_blob_here...}' OR ./updated_document.json + + update_local: json blob OR path to file json blob or path to file with json to update with. + This does NOT send data to the database, use replace for that. + 1st ARG: '{json_blob_here...}' OR ./data.json 2th ARG:'{"port": 555, "some_key": "some_data"}' OR ./data.json + + delete: key OR path to file using its "_id". + 637162378c92893fff92bf7e OR ./data.json + + + Pro tip, in ~/.bashrc: + + _soc_collector_cli() + { + local cur prev words cword + _init_completion || return + + local commands command + + commands='info search get insert replace update_local delete' + + if ((cword == 1)); then + COMPREPLY=($(compgen -W "$commands" -- "$cur")) + else + + case $prev in + search) + COMPREPLY="'{\\"search\\": {\\"asn_country_code\\": \\"SE\\", \\"ip\\": \\"8.8.8.8\\"}}'" + return + ;; + esac + command=${words[1]} + _filedir + return + fi + } + complete -F _soc_collector_cli -o default soc_collector_cli + + """, + ) + parser.add_argument("data", default="info", help="json blob or path to file") + parser.add_argument( + "extra_data", + nargs="?", + default=None, + help="json blob or path to file, only used for local_edit", + ) + + args = parser.parse_args() + + if args.action == "get": + get_action(args.data) + elif args.action == "insert": + insert_action(args.data) + elif args.action == "replace": + replace_action(args.data) + elif args.action == "update_local" and args.extra_data is not None: + update_local_action(args.data, args.extra_data) + elif args.action == "delete": + delete_action(args.data) + elif args.action == "search": + search_action(args.data) + elif args.action == "info": + info_action() + else: + print("ERROR: Wrong action") + app_exit(1) + + +if __name__ == "__main__": + main() |