diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/data/example_data_1.json | 57 | ||||
-rw-r--r-- | tests/data/example_data_1_replace_test.json | 57 | ||||
-rw-r--r-- | tests/data/example_data_3.json | 51 | ||||
-rw-r--r-- | tests/data/example_data_3_replace_test.json | 52 | ||||
-rw-r--r-- | tests/test_auth.py | 275 | ||||
-rw-r--r-- | tests/test_delete.py | 86 | ||||
-rw-r--r-- | tests/test_get.py | 82 | ||||
-rw-r--r-- | tests/test_info.py | 37 | ||||
-rw-r--r-- | tests/test_insert.py | 110 | ||||
-rw-r--r-- | tests/test_replace.py | 102 | ||||
-rw-r--r-- | tests/test_search.py | 125 | ||||
-rw-r--r-- | tests/test_soc_collector_cli.py | 244 |
13 files changed, 1278 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/data/example_data_1.json b/tests/data/example_data_1.json new file mode 100644 index 0000000..69f5d85 --- /dev/null +++ b/tests/data/example_data_1.json @@ -0,0 +1,57 @@ +{ + "document_version": 1, + "ip": "192.0.2.10", + "port": 443, + "whois_description": "SOMENET", + "asn": "AS65001", + "asn_country_code": "SE", + "ptr": "host10.test.soc.sunet.se", + "abuse_mail": "abuse@test.soc.sunet.se", + "domain": "sunet.se", + "timestamp": "2021-06-21T14:06:00Z", + "display_name": "Apache 2.1.3", + "description": "The Apache HTTP Server is a free and open-source cross-platform web server software, released under the terms of Apache License 2.0.", + "custom_data": { + "subject_cn": { + "data": "Apache", + "display_name": "Subject Common Name" + }, + "end_of_general_support": { + "data": false, + "display_name": "End of general support", + "description": "Is the software currently supported?" + } + }, + "result": { + "cve_2015_0049": { + "display_name": "CVE-2015-0049", + "vulnerable": false, + "description": "Allows remote attackers to execute arbitrary code or cause a denial of service (memory corruption)." + }, + "cve_2015_0050": { + "display_name": "CVE-2015-0050", + "vulnerable": false + }, + "cve_2015_0060": { + "display_name": "CVE-2015-0060", + "vulnerable": true, + "reliability": 2 + }, + "cve_2015_0063": { + "display_name": "CVE-2015-0063", + "vulnerable": false + }, + "insecure_cryptography": { + "display_name": "Insecure cryptography", + "vulnerable": true, + "reliability": 5, + "description": "Uses RSA instead of elliptic curve." + }, + "possible_webshell": { + "display_name": "Webshells (PST)", + "investigation_needed": true, + "reliability": 1, + "description": "A webshell of type PST was confirmed at /test/webshell.php" + } + } +} diff --git a/tests/data/example_data_1_replace_test.json b/tests/data/example_data_1_replace_test.json new file mode 100644 index 0000000..f56d82c --- /dev/null +++ b/tests/data/example_data_1_replace_test.json @@ -0,0 +1,57 @@ +{ + "document_version": 2, + "ip": "192.0.2.10", + "port": 444, + "whois_description": "SOMENET", + "asn": "AS65001", + "asn_country_code": "SE", + "ptr": "host10.test.soc.sunet.se", + "abuse_mail": "abuse@test.soc.sunet.se", + "domain": "sunet.se", + "timestamp": "2021-06-21T14:06:00Z", + "display_name": "Apache 2.1.3", + "description": "The Apache HTTP Server is a free and open-source cross-platform web server software, released under the terms of Apache License 2.0.", + "custom_data": { + "subject_cn": { + "data": "Apache", + "display_name": "Subject Common Name" + }, + "end_of_general_support": { + "data": false, + "display_name": "End of general support", + "description": "Is the software currently supported?" + } + }, + "result": { + "cve_2015_0049": { + "display_name": "CVE-2015-0049", + "vulnerable": false, + "description": "Allows remote attackers to execute arbitrary code or cause a denial of service (memory corruption)." + }, + "cve_2015_0050": { + "display_name": "CVE-2015-0050", + "vulnerable": false + }, + "cve_2015_0060": { + "display_name": "CVE-2015-0060", + "vulnerable": true, + "reliability": 2 + }, + "cve_2015_0063": { + "display_name": "CVE-2015-0063", + "vulnerable": false + }, + "insecure_cryptography": { + "display_name": "Insecure cryptography", + "vulnerable": true, + "reliability": 5, + "description": "Uses RSA instead of elliptic curve." + }, + "possible_webshell": { + "display_name": "Webshells (PST)", + "investigation_needed": true, + "reliability": 1, + "description": "A webshell of type PST was confirmed at /test/webshell.php" + } + } +} diff --git a/tests/data/example_data_3.json b/tests/data/example_data_3.json new file mode 100644 index 0000000..44d483b --- /dev/null +++ b/tests/data/example_data_3.json @@ -0,0 +1,51 @@ +{ + "document_version": 1, + "ip": "192.0.2.28", + "port": 111, + "whois_description": "SOMENET", + "asn": "AS65001", + "asn_country_code": "SE", + "ptr": "host111.test.soc.sunet.se", + "abuse_mail": "abuse@test.soc.sunet.se", + "domain": "sunet.se", + "timestamp": "2021-06-30T15:00:00Z", + "display_name": "VMware ESXi 6.7.0 build-17700523", + "description": "VMware ESXi is an enterprise-class, type-1 hypervisor developed by VMware for deploying and serving virtual computers. As a type-1 hypervisor, ESXi is not a software application that is installed on an operating system; instead, it includes and integrates vital OS components, such as a kernel.", + "custom_data": { + "subject_cn": { + "data": "VMware ESXi", + "display_name": "Subject Common Name" + }, + "end_of_general_support": { + "data": true, + "display_name": "End of general support", + "description": "Is the software currently supported?" + } + }, + "result": { + "cve_2019_0001": { + "display_name": "CVE-2019-0001", + "vulnerable": false + }, + "cve_2015_0002": { + "display_name": "CVE-2015-0002", + "vulnerable": false, + "description": "There is a use of insufficiently random values vulnerability. An unauthenticated, remote attacker can guess information by a large number of attempts. Successful exploitation may cause information leak." + }, + "cve_2015_0003": { + "display_name": "CVE-2015-0003", + "vulnerable": true, + "reliability": 2, + "description": "A carefully crafted request body can cause a read to a random memory area which could cause the process to crash." + }, + "cve_2015_0004": { + "display_name": "CVE-2015-0004", + "vulnerable": false + }, + "cve_2015_0005": { + "display_name": "CVE-2015-0005", + "vulnerable": true, + "reliability": 4 + } + } +} diff --git a/tests/data/example_data_3_replace_test.json b/tests/data/example_data_3_replace_test.json new file mode 100644 index 0000000..31cc64d --- /dev/null +++ b/tests/data/example_data_3_replace_test.json @@ -0,0 +1,52 @@ +{ + "_id": "6370498050845fac09e0fc01", + "document_version": 2, + "ip": "192.0.2.28", + "port": 112, + "whois_description": "SOMENET", + "asn": "AS65001", + "asn_country_code": "SE", + "ptr": "host111.test.soc.sunet.se", + "abuse_mail": "abuse@test.soc.sunet.se", + "domain": "sunet.se", + "timestamp": "2021-06-30T15:00:00Z", + "display_name": "VMware ESXi 6.7.0 build-17700523", + "description": "VMware ESXi is an enterprise-class, type-1 hypervisor developed by VMware for deploying and serving virtual computers. As a type-1 hypervisor, ESXi is not a software application that is installed on an operating system; instead, it includes and integrates vital OS components, such as a kernel.", + "custom_data": { + "subject_cn": { + "data": "VMware ESXi", + "display_name": "Subject Common Name" + }, + "end_of_general_support": { + "data": true, + "display_name": "End of general support", + "description": "Is the software currently supported?" + } + }, + "result": { + "cve_2019_0001": { + "display_name": "CVE-2019-0001", + "vulnerable": false + }, + "cve_2015_0002": { + "display_name": "CVE-2015-0002", + "vulnerable": false, + "description": "There is a use of insufficiently random values vulnerability. An unauthenticated, remote attacker can guess information by a large number of attempts. Successful exploitation may cause information leak." + }, + "cve_2015_0003": { + "display_name": "CVE-2015-0003", + "vulnerable": true, + "reliability": 2, + "description": "A carefully crafted request body can cause a read to a random memory area which could cause the process to crash." + }, + "cve_2015_0004": { + "display_name": "CVE-2015-0004", + "vulnerable": false + }, + "cve_2015_0005": { + "display_name": "CVE-2015-0005", + "vulnerable": true, + "reliability": 4 + } + } +} diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..d3fbf23 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,275 @@ +""" +Test our auth +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + +BASE_URL = "https://localhost:8000" + + +class TestAuth(unittest.TestCase): + """ + Test our auth + """ + + def test_auth_info(self) -> None: + """ + Test auth info + """ + + api_keys = load_api_keys("data/api_keys.txt") + + # Test no key + req = requests.get(f"{BASE_URL}/info", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.get( + f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.get( + f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_insert(self) -> None: + """ + Test auth insert + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + # Test no key + req = requests.post(f"{BASE_URL}/sc/v0", json=insert_data, timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + key = json.loads(req.text)["_id"] + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_replace(self) -> None: + """ + Test auth replace + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + replace_data = json_load_data("./tests/data/example_data_1_replace_test.json") + + request_headers = {"API-KEY": api_keys[0]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + replace_data["_id"] = json.loads(req.text)["_id"] + + # Test no key + req = requests.put(f"{BASE_URL}/sc/v0", json=replace_data, timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.put( + f"{BASE_URL}/sc/v0", + json=replace_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.put( + f"{BASE_URL}/sc/v0", + json=replace_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{replace_data['_id']}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + def test_auth_get(self) -> None: + """ + Test auth get + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + # Test no key + req = requests.get(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_delete(self) -> None: + """ + Test auth delete + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + # Test no key + req = requests.delete(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[0]} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_search(self) -> None: + """ + Test auth search + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + insert_data["timestamp"] = "2021-06-21T15:06:00Z" + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + search_data = {"filter": {"timestamp": insert_data["timestamp"]}} + + # Test no key + req = requests.post( + f"{BASE_URL}/sc/v0/search", json=search_data, timeout=5, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[0]} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + request_headers = {"API-KEY": api_keys[0]} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) diff --git a/tests/test_delete.py b/tests/test_delete.py new file mode 100644 index 0000000..855d987 --- /dev/null +++ b/tests/test_delete.py @@ -0,0 +1,86 @@ +""" +Test our delete +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our delete + """ + + def test_delete(self) -> None: + """ + Test delete + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + req = requests.delete( + f"{BASE_URL}/sc/v0/dummy", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + req = requests.delete( + f"{BASE_URL}/sc/v0/63765238890b48a0c3118f4f", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 404) + + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 404) + + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 404) diff --git a/tests/test_get.py b/tests/test_get.py new file mode 100644 index 0000000..377f35a --- /dev/null +++ b/tests/test_get.py @@ -0,0 +1,82 @@ +""" +Test our get +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our get + """ + + def test_get(self) -> None: + """ + Test get + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + req = requests.get( + f"{BASE_URL}/sc/v0/dummy", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + req = requests.get( + f"{BASE_URL}/sc/v0/63765238890b48a0c3118f4f", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 404) + + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data1 = json.loads(req.text) + + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data2 = json.loads(req.text) + self.assertTrue(data1 == data2) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) diff --git a/tests/test_info.py b/tests/test_info.py new file mode 100644 index 0000000..ef775d7 --- /dev/null +++ b/tests/test_info.py @@ -0,0 +1,37 @@ +""" +Test our info +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our info + """ + + def test_info(self) -> None: + """ + Test info + """ + + api_keys = load_api_keys("data/api_keys.txt") + request_headers = {"API-KEY": api_keys[-1]} + + req = requests.get( + f"{BASE_URL}/info", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data = json.loads(req.text) + self.assertTrue("Estimated document count" in data) + self.assertTrue(isinstance(data["Estimated document count"], int)) + self.assertTrue(data["Estimated document count"] >= 0) diff --git a/tests/test_insert.py b/tests/test_insert.py new file mode 100644 index 0000000..5bee72d --- /dev/null +++ b/tests/test_insert.py @@ -0,0 +1,110 @@ +""" +Test our insert +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our insert + """ + + def test_insert(self) -> None: + """ + Test insert + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + req = requests.post( + f"{BASE_URL}/sc/v0", + json={"dummy_data": "dummy"}, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + bad_data = insert_data.copy() + bad_data["_id"] = "63765238890b49a0c3118f4f" + req = requests.post( + f"{BASE_URL}/sc/v0", + json=bad_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + del bad_data["_id"] + + del bad_data["ip"] + req = requests.post( + f"{BASE_URL}/sc/v0", + json=bad_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + # Allow duplicate data but with different id + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key2 = json.loads(req.text)["_id"] + + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + db_data = json.loads(req.text)["doc"] + del db_data["_id"] + self.assertTrue(insert_data == db_data) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + req = requests.delete( + f"{BASE_URL}/sc/v0/{key2}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) diff --git a/tests/test_replace.py b/tests/test_replace.py new file mode 100644 index 0000000..291207f --- /dev/null +++ b/tests/test_replace.py @@ -0,0 +1,102 @@ +""" +Test our replace +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our replace + """ + + def test_replace(self) -> None: + """ + Test replace + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + replace_data = json_load_data("./tests/data/example_data_1_replace_test.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + replace_data["_id"] = json.loads(req.text)["_id"] + + bad_data = replace_data.copy() + + # ip missing + del bad_data["ip"] + req = requests.put( + f"{BASE_URL}/sc/v0", + json=bad_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + bad_data["ip"] = replace_data["ip"] + del bad_data["_id"] + req = requests.put( + f"{BASE_URL}/sc/v0", + json=bad_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + bad_data["_id"] = "sdvnsvdlac" + req = requests.put( + f"{BASE_URL}/sc/v0", + json=bad_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 400) + + req = requests.put( + f"{BASE_URL}/sc/v0", + json=replace_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + req = requests.get( + f"{BASE_URL}/sc/v0/{replace_data['_id']}", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + db_data = json.loads(req.text)["doc"] + self.assertTrue(replace_data == db_data) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{replace_data['_id']}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) diff --git a/tests/test_search.py b/tests/test_search.py new file mode 100644 index 0000000..8a01332 --- /dev/null +++ b/tests/test_search.py @@ -0,0 +1,125 @@ +""" +Test our search +""" +from typing import Dict, Any +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our search + """ + + def test_search(self) -> None: + """ + Test search + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + insert_data["ip"] = "test_dummy_ip1" + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key1 = json.loads(req.text)["_id"] + + insert_data["port"] = 4123 + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key2 = json.loads(req.text)["_id"] + + req = requests.post( + f"{BASE_URL}/sc/v0/search", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 422) + + search_data: Dict[str, Any] = {"dummy": {"ip": "test_dummy_ip"}} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 422) + + search_data = {"filter": {"ip": "test_dummy_ip"}} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data = json.loads(req.text)["docs"] + self.assertTrue(data == []) + + search_data = {"filter": {"ip": "test_dummy_ip1"}} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data = json.loads(req.text)["docs"] + self.assertTrue(len(data) == 2) + del data[0]["_id"] + del data[1]["_id"] + self.assertTrue(data[0] != insert_data) + self.assertTrue(data[1] == insert_data) + + search_data = {"filter": {"ip": "test_dummy_ip1", "port": 4123}} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + data = json.loads(req.text)["docs"] + self.assertTrue(len(data) == 1) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{key1}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + req = requests.delete( + f"{BASE_URL}/sc/v0/{key2}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) diff --git a/tests/test_soc_collector_cli.py b/tests/test_soc_collector_cli.py new file mode 100644 index 0000000..f867d60 --- /dev/null +++ b/tests/test_soc_collector_cli.py @@ -0,0 +1,244 @@ +""" +Test our cli +""" +import unittest +import io +import sys +import json +import os + +from src.soc_collector.auth import load_api_keys + +from src.soc_collector.soc_collector_cli import ( + json_load_data, + info_action, + get_action, + replace_action, + delete_action, + insert_action, + update_local_action, + search_action, +) + +BASE_URL = "https://localhost:8000" + + +class TestAddress(unittest.TestCase): + """ + Test our cli + """ + + def test_json_load_data(self) -> None: + """ + Test cli json_load_data + """ + + insert_data = json_load_data("./tests/data/example_data_1.json") + with open("./json_load_data_test_here11.json", "w", encoding="utf-8") as f_data: + f_data.write(json.dumps(insert_data)) + + data = json_load_data("./json_load_data_test_here11.json") + self.assertTrue(isinstance(data, dict)) + + data = json_load_data('{"filter": {"ip": "123.123.4.123"}}') + self.assertTrue(isinstance(data, dict)) + + # Remove test data + os.remove("./json_load_data_test_here11.json") + + def test_cli_info(self) -> None: + """ + Test cli info + """ + + api_keys = load_api_keys("data/api_keys.txt") + output = io.StringIO() + sys.stdout = output + info_action(api_keys[-1], BASE_URL) + self.assertTrue("Estimated document count: " in output.getvalue()) + sys.stdout = sys.__stdout__ + + def test_cli_insert(self) -> None: + """ + Test cli insert + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + output = io.StringIO() + sys.stdout = output + insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL) + self.assertTrue("Inserted data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key1 = output.getvalue().split(" OK - key: ")[1].strip() + + output = io.StringIO() + sys.stdout = output + insert_action("./tests/data/example_data_1.json", api_keys[-1], BASE_URL) + self.assertTrue("Inserted data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key2 = output.getvalue().split(" OK - key: ")[1].strip() + + # Delete test data + output = io.StringIO() + sys.stdout = output + delete_action(key1, api_keys[-1], BASE_URL) + delete_action(key2, api_keys[-1], BASE_URL) + sys.stdout = sys.__stdout__ + + def test_cli_replace(self) -> None: + """ + Test cli insert + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + replace_vals = {"ip": "replace_test1"} + replace_data = insert_data.copy() + replace_data.update(replace_vals) + + output = io.StringIO() + sys.stdout = output + insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL) + self.assertTrue("Inserted data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key1 = output.getvalue().split(" OK - key: ")[1].strip() + + output = io.StringIO() + sys.stdout = output + replace_data["_id"] = key1 + replace_action(json.dumps(replace_data), api_keys[-1], BASE_URL) + self.assertTrue("Replaced data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key2 = output.getvalue().split(" OK - key: ")[1].strip() + self.assertTrue(key1 == key2) + + replace_data["ip"] = "replace_test2" + with open("./replace_test_here11.json", "w", encoding="utf-8") as f_data: + f_data.write(json.dumps(replace_data)) + + output = io.StringIO() + sys.stdout = output + replace_action("./replace_test_here11.json", api_keys[-1], BASE_URL) + self.assertTrue("Replaced data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key3 = output.getvalue().split(" OK - key: ")[1].strip() + self.assertTrue(key1 == key2 == key3) + + # Delete test data + os.remove("./replace_test_here11.json") + output = io.StringIO() + sys.stdout = output + delete_action(key1, api_keys[-1], BASE_URL) + sys.stdout = sys.__stdout__ + + def test_cli_update_local(self) -> None: + """ + Test cli update_local + """ + + data = json_load_data("./tests/data/example_data_1.json") + update_vals = {"ip": "update_local_test1"} + updated_data = data.copy() + updated_data.update(update_vals) + + output = io.StringIO() + sys.stdout = output + update_local_action(json.dumps(data), json.dumps(update_vals)) + self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue()) + sys.stdout = sys.__stdout__ + + with open("./update_local_test_here11.json", "w", encoding="utf-8") as f_data: + f_data.write(json.dumps(data)) + + output = io.StringIO() + sys.stdout = output + update_local_action("./update_local_test_here11.json", json.dumps(update_vals)) + os.remove("./update_local_test_here11.json") + self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue()) + sys.stdout = sys.__stdout__ + + def test_cli_get(self) -> None: + """ + Test cli get + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + output = io.StringIO() + sys.stdout = output + insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL) + self.assertTrue("Inserted data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key1 = output.getvalue().split(" OK - key: ")[1].strip() + + output = io.StringIO() + sys.stdout = output + get_action(key1, api_keys[-1], BASE_URL) + expected_data = {"_id": key1} + expected_data.update(insert_data) + self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue()) + sys.stdout = sys.__stdout__ + + with open("./get_test_here11.json", "w", encoding="utf-8") as f_data: + f_data.write(json.dumps(expected_data)) + + output = io.StringIO() + sys.stdout = output + get_action("./get_test_here11.json", api_keys[-1], BASE_URL) + expected_data = {"_id": key1} + expected_data.update(insert_data) + self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue()) + sys.stdout = sys.__stdout__ + + # Delete test data + os.remove("./get_test_here11.json") + output = io.StringIO() + sys.stdout = output + delete_action(key1, api_keys[-1], BASE_URL) + sys.stdout = sys.__stdout__ + + def test_cli_search(self) -> None: + """ + Test cli search + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + insert_data["ip"] = "search_dummy_ip1" + search_data = {"filter": {"ip": "search_dummy_ip1"}} + + output = io.StringIO() + sys.stdout = output + insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL) + self.assertTrue("Inserted data OK - key: " in output.getvalue()) + sys.stdout = sys.__stdout__ + key1 = output.getvalue().split(" OK - key: ")[1].strip() + + output = io.StringIO() + sys.stdout = output + search_action(json.dumps(search_data), api_keys[-1], BASE_URL) + expected_data = {"_id": key1} + expected_data.update(insert_data) + self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1]) + sys.stdout = sys.__stdout__ + + with open("./search_test_here11.json", "w", encoding="utf-8") as f_data: + f_data.write(json.dumps(search_data)) + + output = io.StringIO() + sys.stdout = output + search_action("./search_test_here11.json", api_keys[-1], BASE_URL) + expected_data = {"_id": key1} + expected_data.update(insert_data) + self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1]) + sys.stdout = sys.__stdout__ + + # Delete test data + os.remove("./search_test_here11.json") + output = io.StringIO() + sys.stdout = output + delete_action(key1, api_keys[-1], BASE_URL) + sys.stdout = sys.__stdout__ |