summaryrefslogtreecommitdiff
path: root/src/soc_collector
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-16 19:09:46 +0100
committerVictor Näslund <victor@sunet.se>2022-11-16 19:09:46 +0100
commit43e87d84b15d12d52a4dcde6e80426cbd17e3d6f (patch)
tree18cef29b77973053522a67677121789ebe032285 /src/soc_collector
parent4a56b3aae4114db731eff725e2c6292371a9b8ae (diff)
auth and CLI done
Diffstat (limited to 'src/soc_collector')
-rw-r--r--src/soc_collector/__init__.py4
-rw-r--r--src/soc_collector/auth.py46
-rw-r--r--src/soc_collector/db.py178
-rw-r--r--src/soc_collector/healthcheck.py38
-rwxr-xr-xsrc/soc_collector/healthcheck.sh11
-rwxr-xr-xsrc/soc_collector/main.py183
-rw-r--r--src/soc_collector/py.typed0
-rw-r--r--src/soc_collector/schema.py107
-rw-r--r--src/soc_collector/soc_collector_cli.py259
9 files changed, 826 insertions, 0 deletions
diff --git a/src/soc_collector/__init__.py b/src/soc_collector/__init__.py
new file mode 100644
index 0000000..6530fdd
--- /dev/null
+++ b/src/soc_collector/__init__.py
@@ -0,0 +1,4 @@
+"""Collector
+"""
+
+__version__ = "1.03"
diff --git a/src/soc_collector/auth.py b/src/soc_collector/auth.py
new file mode 100644
index 0000000..4aacb52
--- /dev/null
+++ b/src/soc_collector/auth.py
@@ -0,0 +1,46 @@
+"""Auth module"""
+from typing import List
+from fastapi import Request, HTTPException
+
+
+def load_api_keys(path: str) -> List[str]:
+ """Load API keys from file
+
+ :param path: Path to API keys file.
+ :return: List[str]
+ """
+ keys: List[str] = []
+
+ with open(path, encoding="utf-8") as f_data:
+ key_file = f_data.readlines()
+
+ for line in key_file:
+ if line[0] == "#" or len(line) < 3:
+ continue
+
+ key = line.split(";")[0]
+ keys.append(key)
+
+ return keys
+
+
+def authorize_client(request: Request, api_keys: List[str]) -> None:
+ """Authorize a client request.
+
+ Parameters:
+ request (Request): The HTTP request.
+ api_keys (List[str]): List of accepted api_keys.
+
+ Raises HTTPException with status_code=401 if authorize fail.
+
+ Returns:
+ None
+ """
+
+ if "API-KEY" not in request.headers:
+ raise HTTPException(status_code=401, detail="API-KEY header missing")
+
+ request_key = request.headers["API-KEY"].strip()
+
+ if request_key not in api_keys:
+ raise HTTPException(status_code=401, detail="API-KEY header invalid")
diff --git a/src/soc_collector/db.py b/src/soc_collector/db.py
new file mode 100644
index 0000000..d601a82
--- /dev/null
+++ b/src/soc_collector/db.py
@@ -0,0 +1,178 @@
+"""Our database module"""
+from typing import List, Dict, Optional, Any
+from time import sleep
+from sys import exit as app_exit
+from dataclasses import dataclass
+
+from fastapi import HTTPException
+from pydantic import BaseModel
+from pymongo.errors import OperationFailure
+from pymongo import (
+ ASCENDING,
+ DESCENDING,
+)
+from motor.motor_asyncio import (
+ AsyncIOMotorClient,
+ AsyncIOMotorCollection,
+)
+from bson import ObjectId
+
+
+class SearchInput(BaseModel):
+ """Handle search data for HTTP request"""
+
+ search: Optional[Dict[str, Any]]
+ limit: int = 25
+ skip: int = 0
+
+
+@dataclass()
+class DBClient:
+ """Class to hold database connections for us."""
+
+ client: AsyncIOMotorClient
+ collection: AsyncIOMotorCollection
+
+ def __init__(self, username: str, password: str, collection: str) -> None:
+ self.client = AsyncIOMotorClient(
+ f"mongodb://{username}:{password}@mongodb:27017/production",
+ maxConnecting=4,
+ timeoutMS=3000,
+ serverSelectionTimeoutMS=3000,
+ )
+ self.collection = self.client["production"][collection]
+
+ async def startup(self) -> None:
+ """Try query the DB and exit the program if we fail after 5 times.
+
+ :return: None
+ """
+
+ for i in range(5):
+ try:
+ await self.collection.find_one({"_id": ObjectId("507f1f77bcf86cd799439011")})
+ print("Connection to DB - OK")
+ break
+ except Exception: # pylint: disable=broad-except
+ print(f"WARNING failed to connect to DB - {i} / 4", flush=True)
+ sleep(1)
+ else:
+ print("Could not connect to DB - mongodb://REDACTED_USERNAME:REDACTED_PASSWORD@mongodb:27017/production")
+ app_exit(1)
+
+ async def find(self, search_data: SearchInput) -> List[Dict[str, Any]]:
+ """Wrap the find() method, handling timeouts and return data type.
+
+ :param search_data: Instance of SearchInput.
+ :return: Optional[List[Dict[str, Any]]]
+ """
+
+ data: List[Dict[str, Any]] = []
+ cursor = self.collection.find(search_data.search)
+
+ cursor.sort({"ip": ASCENDING, "timestamp": DESCENDING}).limit(search_data.limit).skip(search_data.skip)
+
+ try:
+ async for document in cursor:
+ if document is not None:
+ document["_id"] = str(document["_id"])
+ data.append(document)
+
+ return data
+
+ except OperationFailure as exc:
+ print(f"DB failed to process: {exc.details}")
+ raise HTTPException(
+ status_code=400,
+ detail="Probably wrong syntax, note the dictionary for find: "
+ + "https://motor.readthedocs.io/en/stable/tutorial-asyncio.html#async-for",
+ ) from exc
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def find_one(self, object_id: ObjectId) -> Optional[Dict[str, Any]]:
+ """Wrap the find_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to find.
+ :return: Optional[Dict[str, Any]]
+ """
+
+ try:
+ document = await self.collection.find_one({"_id": object_id})
+ if isinstance(document, Dict):
+ document["_id"] = str(document["_id"])
+ return document
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def insert_one(self, data: Dict[str, Any]) -> Optional[ObjectId]:
+ """Wrap the insert_one() method, handling timeouts and return data type.
+
+ :param data: The data to insert into the DB.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.insert_one(data)
+ if isinstance(result.inserted_id, ObjectId) and len(str(result.inserted_id)) == 24:
+ return result.inserted_id
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def replace_one(self, object_id: ObjectId, data: Dict[str, Any]) -> Optional[ObjectId]:
+ """Wrap the replace_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to replace.
+ :param data: The data to replace with.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.replace_one({"_id": object_id}, data)
+ if result.matched_count == 1:
+ return object_id
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def delete_one(self, object_id: ObjectId) -> Optional[ObjectId]:
+ """Wrap the delete_one() method, handling timeouts and return data type.
+
+ :param object_id: The object id to delete from the DB.
+ :return: Optional[ObjectId]
+ """
+
+ try:
+ result = await self.collection.delete_one({"_id": object_id})
+ if isinstance(result.deleted_count, int) and result.deleted_count == 1:
+ return object_id
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
+
+ async def estimated_document_count(self) -> Optional[int]:
+ """Wrap the estimated_document_count() method, handling timeouts and return data type.
+
+ :return: Optional[int]
+ """
+
+ try:
+ result = await self.collection.estimated_document_count()
+ if isinstance(result, int):
+ return result
+ return None
+
+ except BaseException as exc:
+ print(f"DB connection failed: {exc}")
+ raise HTTPException(status_code=500, detail="DB connection failed") from exc
diff --git a/src/soc_collector/healthcheck.py b/src/soc_collector/healthcheck.py
new file mode 100644
index 0000000..e561fa0
--- /dev/null
+++ b/src/soc_collector/healthcheck.py
@@ -0,0 +1,38 @@
+"""
+Send a healthcheck request
+"""
+import sys
+import time
+import json
+
+import requests
+
+
+def check_collector() -> bool:
+ """Check our collector using /info
+
+ :return: bool
+ """
+ time.sleep(2) # Prevent race condition with redis container healthcheck
+
+ req = requests.get(
+ "https://localhost:8000/info",
+ timeout=3,
+ verify="./collector_root_ca.crt",
+ )
+
+ if req.status_code != 200:
+ return False
+
+ data = json.loads(req.text)
+ if isinstance(data["Estimated document count"], int) and data["Estimated document count"] >= 0:
+ return req.status_code == 200
+
+ return False
+
+
+if __name__ == "__main__":
+ if sys.argv[1] == "COLLECTOR":
+ if check_collector():
+ sys.exit(0)
+ sys.exit(1)
diff --git a/src/soc_collector/healthcheck.sh b/src/soc_collector/healthcheck.sh
new file mode 100755
index 0000000..10d599c
--- /dev/null
+++ b/src/soc_collector/healthcheck.sh
@@ -0,0 +1,11 @@
+#!/bin/bash
+
+# If mongodb container
+if [ "$1" = "MONGODB" ]
+then
+ /usr/bin/mongosh --eval 'disableTelemetry()' -u "$MONGODB_USERNAME" -p "$MONGODB_PASSWORD" localhost:27017/production /healthcheck-mongodb.js
+ exit $?
+fi
+
+# If collector
+/usr/bin/python3 ./src/soc_collector/healthcheck.py "$1" || exit 1
diff --git a/src/soc_collector/main.py b/src/soc_collector/main.py
new file mode 100755
index 0000000..eb6041f
--- /dev/null
+++ b/src/soc_collector/main.py
@@ -0,0 +1,183 @@
+"""Our main module"""
+from os import environ
+from asyncio import get_running_loop
+from sys import exit as app_exit
+from json.decoder import JSONDecodeError
+
+from fastapi import FastAPI, Request
+from fastapi.responses import JSONResponse
+from bson import ObjectId
+from .db import (
+ DBClient,
+ SearchInput,
+)
+from .schema import valid_schema
+from .auth import authorize_client, load_api_keys
+
+# Get credentials
+if "MONGODB_USERNAME" not in environ or "MONGODB_PASSWORD" not in environ or "MONGODB_COLLECTION" not in environ:
+ print("Missing MONGODB_USERNAME or MONGODB_PASSWORD or MONGODB_COLLECTION in env")
+ app_exit(1)
+
+# Create DB object
+db = DBClient(environ["MONGODB_USERNAME"], environ["MONGODB_PASSWORD"], environ["MONGODB_COLLECTION"])
+
+# Check DB
+loop = get_running_loop()
+startup_task = loop.create_task(db.startup())
+
+# Load API keys
+API_KEYS = load_api_keys("./api_keys.txt")
+
+# Disable redoc and swagger endpoints
+app = FastAPI(docs_url=None, redoc_url=None)
+
+
+@app.post("/sc/v0/search")
+async def search(request: Request, search_data: SearchInput) -> JSONResponse:
+ """/sc/v0/search, POST method
+
+ :param request: The client request.
+ :param search_data: The search data.
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ data = await db.find(search_data)
+
+ return JSONResponse(content={"status": "success", "docs": data})
+
+
+@app.post("/sc/v0")
+async def create(request: Request) -> JSONResponse:
+ """/sc/v0, POST method
+
+ :param request: The request where we get the json body.
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ try:
+ json_data = await request.json()
+ except JSONDecodeError:
+ return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
+
+ if not valid_schema(json_data):
+ return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
+
+ key = await db.insert_one(json_data)
+
+ if key is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=500)
+
+ return JSONResponse(content={"status": "success", "key": str(key)})
+
+
+@app.put("/sc/v0")
+async def replace(request: Request) -> JSONResponse: # pylint: disable=too-many-return-statements
+ """/sc/v0, PUT method
+
+ :param request: The request where we get the json body.
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ try:
+ json_data = await request.json()
+ except JSONDecodeError:
+ return JSONResponse(content={"status": "error", "message": "Invalid JSON"}, status_code=400)
+
+ if "_id" not in json_data:
+ return JSONResponse(content={"status": "error", "message": "Missing key '_id'"}, status_code=400)
+
+ # Get the key
+ if isinstance(json_data["_id"], str):
+ object_id = ObjectId(json_data["_id"])
+ elif (
+ isinstance(json_data["_id"], dict) and "$oid" in json_data["_id"] and isinstance(json_data["_id"]["$oid"], str)
+ ):
+ object_id = ObjectId(json_data["_id"]["$oid"])
+ else:
+ return JSONResponse(content={"status": "error", "message": "Missing key '_id' with valid id"}, status_code=400)
+
+ # Ensure the updating key exist
+ document = await db.find_one(object_id)
+
+ if document is None:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=404)
+
+ # Ensure valid schema
+ del json_data["_id"]
+ if not valid_schema(json_data):
+ return JSONResponse(content={"status": "error", "message": "Not our JSON schema"}, status_code=400)
+
+ # Replace the data
+ json_data["_id"] = object_id
+ returned_object_id = await db.replace_one(object_id, json_data)
+
+ if returned_object_id is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=500)
+
+ return JSONResponse(content={"status": "success", "key": str(object_id)})
+
+
+@app.get("/sc/v0/{key}")
+async def get(request: Request, key: str) -> JSONResponse:
+ """/sc/v0/{key}, GET method
+
+ :param key: The document key in the database.
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ document = await db.find_one(ObjectId(key))
+
+ if document is None:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=404)
+
+ return JSONResponse(content={"status": "success", "doc": document})
+
+
+@app.delete("/sc/v0/{key}")
+async def delete(request: Request, key: str) -> JSONResponse:
+ """/sc/v0/{key}, DELETE method
+
+ :param key: The document key in the database.
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ result = await db.delete_one(ObjectId(key))
+
+ if result is None:
+ return JSONResponse(content={"status": "error", "message": "Document not found"}, status_code=404)
+
+ return JSONResponse(content={"status": "success", "key": str(key)})
+
+
+@app.get("/info")
+async def info(request: Request) -> JSONResponse:
+ """/info, GET method
+
+ :return: JSONResponse
+ """
+
+ # Ensure authorization
+ authorize_client(request, API_KEYS)
+
+ count = await db.estimated_document_count()
+
+ if count is None:
+ return JSONResponse(content={"status": "error", "message": "DB error"}, status_code=500)
+
+ return JSONResponse(content={"status": "success", "Estimated document count": count})
diff --git a/src/soc_collector/py.typed b/src/soc_collector/py.typed
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/soc_collector/py.typed
diff --git a/src/soc_collector/schema.py b/src/soc_collector/schema.py
new file mode 100644
index 0000000..221990a
--- /dev/null
+++ b/src/soc_collector/schema.py
@@ -0,0 +1,107 @@
+"""Our schema module"""
+from typing import Any, Dict
+import jsonschema
+
+# fmt:off
+# NOTE: Commented out properties are left intentionally, so it is easier to see
+# what properties are optional.
+schema = {
+ "$schema": "http://json-schema.org/schema#",
+ "type": "object",
+ "properties": {
+ "document_version": {"type": "integer"},
+ "ip": {"type": "string"},
+ "port": {"type": "integer"},
+ "whois_description": {"type": "string"},
+ "asn": {"type": "string"},
+ "asn_country_code": {"type": "string"},
+ "ptr": {"type": "string"},
+ "abuse_mail": {"type": "string"},
+ "domain": {"type": "string"},
+ "timestamp": {"type": "string", "format": "date-time"},
+ "display_name": {"type": "string"},
+ "description": {"type": "string"},
+ "custom_data": {
+ "type": "object",
+ "patternProperties": {
+ ".*": {
+ "type": "object",
+ "properties": {
+ "display_name": {"type": "string"},
+ "data": {"type": ["string", "boolean", "integer"]},
+ "description": {"type": "string"},
+ },
+ "required": [
+ "display_name",
+ "data",
+ # "description"
+ ]
+ },
+ },
+ },
+ "result": {
+ "type": "object",
+ "patternProperties": {
+ ".*": {
+ "type": "object",
+ "properties": {
+ "display_name": {"type": "string"},
+ "vulnerable": {"type": "boolean"},
+ "investigation_needed": {"type": "boolean"},
+ "reliability": {"type": "integer"},
+ "description": {"type": "string"},
+ },
+ "oneOf": [
+ {
+ "required": [
+ "display_name",
+ "vulnerable",
+ # "reliability", # TODO: reliability is required if vulnerable = true
+ # "description",
+ ]
+ },
+ {
+ "required":
+ [
+ "display_name",
+ "investigation_needed",
+ # "reliability", # TODO: reliability is required if investigation_needed = true
+ # "description",
+ ]
+ },
+ ]
+ },
+ },
+ },
+ },
+ "required": [
+ "document_version",
+ "ip",
+ "port",
+ "whois_description",
+ "asn",
+ "asn_country_code",
+ "ptr",
+ "abuse_mail",
+ "domain",
+ "timestamp",
+ "display_name",
+ # "description",
+ # "custom_data",
+ "result",
+ ],
+}
+
+
+def valid_schema(json_data: Dict[str, Any]) -> bool:
+ """Check if json data follows the schema.
+
+ :param json_data: Json object
+ :return: bool
+ """
+ try:
+ jsonschema.validate(json_data, schema, format_checker=jsonschema.FormatChecker())
+ except jsonschema.exceptions.ValidationError as exc:
+ print(f"Validation failed with error: {exc.message}")
+ return False
+ return True
diff --git a/src/soc_collector/soc_collector_cli.py b/src/soc_collector/soc_collector_cli.py
new file mode 100644
index 0000000..e32b5ad
--- /dev/null
+++ b/src/soc_collector/soc_collector_cli.py
@@ -0,0 +1,259 @@
+#!/usr/bin/env python3
+"""Our database module"""
+from os.path import isfile
+from os import environ
+from typing import Dict, Any
+from argparse import ArgumentParser, RawTextHelpFormatter
+from sys import exit as app_exit
+import json
+import requests
+
+if "COLLECTOR_API_KEY" not in environ:
+ print("Missing 'COLLECTOR_API_KEY' in environment")
+ app_exit(1)
+API_KEY = environ["COLLECTOR_API_KEY"]
+
+API_URL = "https://collector-dev.soc.sunet.se:8000"
+ROOT_CA_FILE = __file__.replace("soc_collector_cli.py", "data/collector_root_ca.crt")
+
+
+def json_load_data(data: str) -> Dict[str, Any]:
+ """Load json from argument, json data or path to json file
+
+ :param data: String with either json or path to a json file.
+ :return: json dict as a Dict[str, Any].
+ """
+
+ ret: Dict[str, Any]
+ try:
+ if isfile(data):
+ with open(data, encoding="utf-8") as f_data:
+ ret = json.loads(f_data.read())
+ else:
+ ret = json.loads(data)
+ return ret
+
+ except Exception: # pylint: disable=broad-except
+ print(f"No such file or invalid json data: {data}")
+ app_exit(1)
+
+
+def info_action() -> None:
+ """Get database info, currently number of documents."""
+
+ req = requests.get(f"{API_URL}/info", headers={"API-KEY": API_KEY}, timeout=5)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ # Print data
+ json_data = json.loads(req.text)
+ print(f"Estimated document count: {json_data['Estimated document count']}")
+
+
+def search_action(data: str) -> None:
+ """Search for documents in the database.
+
+ :param data: String with either json or path to a json file.
+ """
+
+ search_data = json_load_data(data)
+
+ req = requests.post(f"{API_URL}/sc/v0/search", headers={"API-KEY": API_KEY}, json=search_data, timeout=5)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ # Print data
+ json_data = json.loads(req.text)
+ print(json.dumps(json_data["docs"], indent=4))
+
+
+def delete_action(data: str) -> None:
+ """Delete a document in the DB.
+
+ :param data: key or path to a json file containing "_id".
+ """
+
+ if isfile(data):
+ data = json_load_data(data)["_id"]
+ req = requests.delete(f"{API_URL}/sc/v0/{data}", headers={"API-KEY": API_KEY}, timeout=5)
+
+ # Check status
+ if req.status_code == 404:
+ print("ERROR: Document not found")
+ app_exit(1)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ print(f"Deleted data OK - key: {data}")
+
+
+def update_local_action(data: str, update_data: str) -> None:
+ """Update keys and their data in this document. Does not modify the database.
+
+ :param data: json blob or path to json file.
+ :param update_data: json blob or path to json file.
+ """
+
+ json_data = json_load_data(data)
+ json_update_data = json_load_data(update_data)
+
+ json_data.update(json_update_data)
+
+ # Print data
+ print(json.dumps(json_data, indent=4))
+
+
+def replace_action(data: str) -> None:
+ """Replace the entire document in the database with this document, "_id" must exist as a key.
+
+ :param data: json blob or path to json file, "_id" key must exist.
+ """
+
+ json_data = json_load_data(data)
+ req = requests.put(f"{API_URL}/sc/v0", json=json_data, headers={"API-KEY": API_KEY}, timeout=5)
+
+ # Check status
+ if req.status_code == 404:
+ print("ERROR: Document not found")
+ app_exit(1)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ json_data = json.loads(req.text)
+ print(f'Inserted data OK - key: {json_data["_id"]}')
+
+
+def insert_action(data: str) -> None:
+ """Insert a new document into the database, "_id" must not exist in the document.
+
+ :param data: json blob or path to json file, "_id" key must not exist.
+ """
+
+ json_data = json_load_data(data)
+ req = requests.post(f"{API_URL}/sc/v0", json=json_data, headers={"API-KEY": API_KEY}, timeout=5)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ data = json.loads(req.text)
+ print(f'Inserted data OK - key: {json_data["_id"]}')
+
+
+def get_action(data: str) -> None:
+ """Get a document from the database.
+
+ :param data: key or path to a json file containing "_id".
+ """
+ if isfile(data):
+ data = json_load_data(data)["_id"]
+ req = requests.get(f"{API_URL}/sc/v0/{data}", headers={"API-KEY": API_KEY}, timeout=5)
+
+ # Check status
+ if req.status_code == 404:
+ print("ERROR: Document not found")
+ app_exit(1)
+
+ # Ensure ok status
+ req.raise_for_status()
+
+ # Print data
+ json_data = json.loads(req.text)
+ print(json.dumps(json_data["doc"], indent=4))
+
+
+def main() -> None:
+ """Main function."""
+ parser = ArgumentParser(formatter_class=RawTextHelpFormatter, description="SOC Collector CLI")
+ parser.add_argument(
+ "action",
+ choices=["info", "search", "get", "insert", "replace", "update_local", "delete"],
+ help="""Action to take
+
+ info: Show info about the database, currently number of documents.
+
+ search: json blog OR path to file. skip defaults to 0 and limit to 25.
+ '{"search": {"port": 111, "ip": "192.0.2.28"}, "skip": 0, "limit": 25}' OR ./search_data.json
+ '{"search": {"asn_country_code": "SE", "result": {"$exists": "cve_2015_0002"}}}'
+
+ get: key OR path to document using its "_id".
+ 637162378c92893fff92bf7e OR ./data.json
+
+ insert: json blob OR path to file. Document MUST NOT contain "_id".
+ '{json_blob_here...}' OR ./data.json
+
+ replace: json blob OR path to file. Document MUST contain "_id".
+ '{json_blob_here...}' OR ./updated_document.json
+
+ update_local: json blob OR path to file json blob or path to file with json to update with.
+ This does NOT send data to the database, use replace for that.
+ 1st ARG: '{json_blob_here...}' OR ./data.json 2th ARG:'{"port": 555, "some_key": "some_data"}' OR ./data.json
+
+ delete: key OR path to file using its "_id".
+ 637162378c92893fff92bf7e OR ./data.json
+
+
+ Pro tip, in ~/.bashrc:
+
+ _soc_collector_cli()
+ {
+ local cur prev words cword
+ _init_completion || return
+
+ local commands command
+
+ commands='info search get insert replace update_local delete'
+
+ if ((cword == 1)); then
+ COMPREPLY=($(compgen -W "$commands" -- "$cur"))
+ else
+
+ case $prev in
+ search)
+ COMPREPLY="'{\\"search\\": {\\"asn_country_code\\": \\"SE\\", \\"ip\\": \\"8.8.8.8\\"}}'"
+ return
+ ;;
+ esac
+ command=${words[1]}
+ _filedir
+ return
+ fi
+ }
+ complete -F _soc_collector_cli -o default soc_collector_cli
+
+ """,
+ )
+ parser.add_argument("data", default="info", help="json blob or path to file")
+ parser.add_argument(
+ "extra_data",
+ nargs="?",
+ default=None,
+ help="json blob or path to file, only used for local_edit",
+ )
+
+ args = parser.parse_args()
+
+ if args.action == "get":
+ get_action(args.data)
+ elif args.action == "insert":
+ insert_action(args.data)
+ elif args.action == "replace":
+ replace_action(args.data)
+ elif args.action == "update_local" and args.extra_data is not None:
+ update_local_action(args.data, args.extra_data)
+ elif args.action == "delete":
+ delete_action(args.data)
+ elif args.action == "search":
+ search_action(args.data)
+ elif args.action == "info":
+ info_action()
+ else:
+ print("ERROR: Wrong action")
+ app_exit(1)
+
+
+if __name__ == "__main__":
+ main()