import json import sys import traceback import jsonschema # fmt:off # NOTE: Commented out properties are left intentionally, so it is easier to see # what properties are optional. schema = { "$schema": "http://json-schema.org/schema#", "type": "object", "properties": { "document_version": {"type": "integer"}, "ip": {"type": "string"}, "port": {"type": "integer"}, "whois_description": {"type": "string"}, "asn": {"type": "string"}, "asn_country_code": {"type": "string"}, "ptr": {"type": "string"}, "abuse_mail": {"type": "string"}, "domain": {"type": "string"}, "timestamp_in_utc": {"type": "string"}, "display_name": {"type": "string"}, "description": {"type": "string"}, "custom_data": { "type": "object", "patternProperties": { ".*": { "type": "object", "properties": { "display_name": {"type": "string"}, "data": {"type": ["string", "boolean", "integer"]}, "description": {"type": "string"}, }, "required": [ "display_name", "data", # "description" ] }, }, }, "result": { "type": "object", "patternProperties": { ".*": { "type": "object", "properties": { "display_name": {"type": "string"}, "vulnerable": {"type": "boolean"}, "investigation_needed": {"type": "boolean"}, "reliability": {"type": "integer"}, "description": {"type": "string"}, }, "oneOf": [ { "required": [ "display_name", "vulnerable", # "reliability", # TODO: reliability is required if vulnerable = true # "description", ] }, { "required": [ "display_name", "investigation_needed", # "reliability", # TODO: reliability is required if investigation_needed = true # "description", ] }, ] }, }, }, }, "required": [ "document_version", "ip", "port", "whois_description", "asn", "asn_country_code", "ptr", "abuse_mail", "domain", "timestamp_in_utc", "display_name", # "description", # "custom_data", "result", ], } # fmt:on def get_index_keys(): keys = list() for key in schema["properties"]: keys.append(key) return keys def as_index_list(): index_list = list() for key in schema["properties"]: name = f"{key}-json-index" index = { "index": { "fields": [ key, ] }, "name": name, "type": "json" } index_list.append(index) return index_list def validate_collector_data(json_blob): try: jsonschema.validate(json_blob, schema) except jsonschema.exceptions.ValidationError as e: return f"Validation failed with error: {e.message}" return "" if __name__ == "__main__": with open(sys.argv[1]) as fd: json_data = json.loads(fd.read()) print(validate_collector_data(json_data))