"""Our schema module""" from typing import Any, Dict, Optional, Union import jsonschema from bson import ObjectId from bson.errors import InvalidId # fmt:off # NOTE: Commented out properties are left intentionally, so it is easier to see # what properties are optional. schema = { "$schema": "http://json-schema.org/schema#", "type": "object", "properties": { "document_version": {"type": "integer"}, "ip": {"type": "string"}, "port": {"type": "integer"}, "whois_description": {"type": "string"}, "asn": {"type": "string"}, "asn_country_code": {"type": "string"}, "ptr": {"type": "string"}, "abuse_mail": {"type": "string"}, "domain": {"type": "string"}, "timestamp": {"type": "string", "format": "date-time"}, "display_name": {"type": "string"}, "description": {"type": "string"}, "custom_data": { "type": "object", "patternProperties": { ".*": { "type": "object", "properties": { "display_name": {"type": "string"}, "data": {"type": ["string", "boolean", "integer"]}, "description": {"type": "string"}, }, "required": [ "display_name", "data", # "description" ] }, }, }, "result": { "type": "object", "patternProperties": { ".*": { "type": "object", "properties": { "display_name": {"type": "string"}, "vulnerable": {"type": "boolean"}, "investigation_needed": {"type": "boolean"}, "reliability": {"type": "integer"}, "description": {"type": "string"}, }, "oneOf": [ { "required": [ "display_name", "vulnerable", # "reliability", # TODO: reliability is required if vulnerable = true # "description", ] }, { "required": [ "display_name", "investigation_needed", # "reliability", # TODO: reliability is required if investigation_needed = true # "description", ] }, ] }, }, }, }, "required": [ "document_version", "ip", "port", "whois_description", "asn", "asn_country_code", "ptr", "abuse_mail", "domain", "timestamp", "display_name", # "description", # "custom_data", "result", ], } def valid_schema(json_data: Dict[str, Any]) -> bool: """Check if json data follows the schema. :param json_data: Json object :return: bool """ try: jsonschema.validate(json_data, schema, format_checker=jsonschema.FormatChecker()) except jsonschema.exceptions.ValidationError as exc: print(f"Validation failed with error: {exc.message}") return False return True def object_id_from_data(data: Union[str, Dict[str, Any]]) -> Optional[ObjectId]: """Get ObjectId from key. None if invalid. :param data: Key. :return: Optional[ObjectId] """ if isinstance(data, str): try: return ObjectId(data) except InvalidId: return None elif isinstance(data, Dict): if "_id" in data and isinstance(data["_id"], str): try: return ObjectId(data["_id"]) except InvalidId: return None return None