summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-17 22:04:24 +0100
committerVictor Näslund <victor@sunet.se>2022-11-17 22:04:24 +0100
commit03735d4c6fc17193e5019d3bd595bad2ce41c61f (patch)
tree889e5b6615f62930ef2ebd1e36616a177fca539e /tests
parenta276c55e8f1f7f2c5872a43485425dd85f1dfa9f (diff)
added tests
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/data/example_data_1.json57
-rw-r--r--tests/data/example_data_1_replace_test.json57
-rw-r--r--tests/data/example_data_3.json51
-rw-r--r--tests/data/example_data_3_replace_test.json52
-rw-r--r--tests/test_auth.py275
-rw-r--r--tests/test_delete.py86
-rw-r--r--tests/test_get.py82
-rw-r--r--tests/test_info.py37
-rw-r--r--tests/test_insert.py110
-rw-r--r--tests/test_replace.py102
-rw-r--r--tests/test_search.py125
-rw-r--r--tests/test_soc_collector_cli.py244
13 files changed, 1278 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/data/example_data_1.json b/tests/data/example_data_1.json
new file mode 100644
index 0000000..69f5d85
--- /dev/null
+++ b/tests/data/example_data_1.json
@@ -0,0 +1,57 @@
+{
+ "document_version": 1,
+ "ip": "192.0.2.10",
+ "port": 443,
+ "whois_description": "SOMENET",
+ "asn": "AS65001",
+ "asn_country_code": "SE",
+ "ptr": "host10.test.soc.sunet.se",
+ "abuse_mail": "abuse@test.soc.sunet.se",
+ "domain": "sunet.se",
+ "timestamp": "2021-06-21T14:06:00Z",
+ "display_name": "Apache 2.1.3",
+ "description": "The Apache HTTP Server is a free and open-source cross-platform web server software, released under the terms of Apache License 2.0.",
+ "custom_data": {
+ "subject_cn": {
+ "data": "Apache",
+ "display_name": "Subject Common Name"
+ },
+ "end_of_general_support": {
+ "data": false,
+ "display_name": "End of general support",
+ "description": "Is the software currently supported?"
+ }
+ },
+ "result": {
+ "cve_2015_0049": {
+ "display_name": "CVE-2015-0049",
+ "vulnerable": false,
+ "description": "Allows remote attackers to execute arbitrary code or cause a denial of service (memory corruption)."
+ },
+ "cve_2015_0050": {
+ "display_name": "CVE-2015-0050",
+ "vulnerable": false
+ },
+ "cve_2015_0060": {
+ "display_name": "CVE-2015-0060",
+ "vulnerable": true,
+ "reliability": 2
+ },
+ "cve_2015_0063": {
+ "display_name": "CVE-2015-0063",
+ "vulnerable": false
+ },
+ "insecure_cryptography": {
+ "display_name": "Insecure cryptography",
+ "vulnerable": true,
+ "reliability": 5,
+ "description": "Uses RSA instead of elliptic curve."
+ },
+ "possible_webshell": {
+ "display_name": "Webshells (PST)",
+ "investigation_needed": true,
+ "reliability": 1,
+ "description": "A webshell of type PST was confirmed at /test/webshell.php"
+ }
+ }
+}
diff --git a/tests/data/example_data_1_replace_test.json b/tests/data/example_data_1_replace_test.json
new file mode 100644
index 0000000..f56d82c
--- /dev/null
+++ b/tests/data/example_data_1_replace_test.json
@@ -0,0 +1,57 @@
+{
+ "document_version": 2,
+ "ip": "192.0.2.10",
+ "port": 444,
+ "whois_description": "SOMENET",
+ "asn": "AS65001",
+ "asn_country_code": "SE",
+ "ptr": "host10.test.soc.sunet.se",
+ "abuse_mail": "abuse@test.soc.sunet.se",
+ "domain": "sunet.se",
+ "timestamp": "2021-06-21T14:06:00Z",
+ "display_name": "Apache 2.1.3",
+ "description": "The Apache HTTP Server is a free and open-source cross-platform web server software, released under the terms of Apache License 2.0.",
+ "custom_data": {
+ "subject_cn": {
+ "data": "Apache",
+ "display_name": "Subject Common Name"
+ },
+ "end_of_general_support": {
+ "data": false,
+ "display_name": "End of general support",
+ "description": "Is the software currently supported?"
+ }
+ },
+ "result": {
+ "cve_2015_0049": {
+ "display_name": "CVE-2015-0049",
+ "vulnerable": false,
+ "description": "Allows remote attackers to execute arbitrary code or cause a denial of service (memory corruption)."
+ },
+ "cve_2015_0050": {
+ "display_name": "CVE-2015-0050",
+ "vulnerable": false
+ },
+ "cve_2015_0060": {
+ "display_name": "CVE-2015-0060",
+ "vulnerable": true,
+ "reliability": 2
+ },
+ "cve_2015_0063": {
+ "display_name": "CVE-2015-0063",
+ "vulnerable": false
+ },
+ "insecure_cryptography": {
+ "display_name": "Insecure cryptography",
+ "vulnerable": true,
+ "reliability": 5,
+ "description": "Uses RSA instead of elliptic curve."
+ },
+ "possible_webshell": {
+ "display_name": "Webshells (PST)",
+ "investigation_needed": true,
+ "reliability": 1,
+ "description": "A webshell of type PST was confirmed at /test/webshell.php"
+ }
+ }
+}
diff --git a/tests/data/example_data_3.json b/tests/data/example_data_3.json
new file mode 100644
index 0000000..44d483b
--- /dev/null
+++ b/tests/data/example_data_3.json
@@ -0,0 +1,51 @@
+{
+ "document_version": 1,
+ "ip": "192.0.2.28",
+ "port": 111,
+ "whois_description": "SOMENET",
+ "asn": "AS65001",
+ "asn_country_code": "SE",
+ "ptr": "host111.test.soc.sunet.se",
+ "abuse_mail": "abuse@test.soc.sunet.se",
+ "domain": "sunet.se",
+ "timestamp": "2021-06-30T15:00:00Z",
+ "display_name": "VMware ESXi 6.7.0 build-17700523",
+ "description": "VMware ESXi is an enterprise-class, type-1 hypervisor developed by VMware for deploying and serving virtual computers. As a type-1 hypervisor, ESXi is not a software application that is installed on an operating system; instead, it includes and integrates vital OS components, such as a kernel.",
+ "custom_data": {
+ "subject_cn": {
+ "data": "VMware ESXi",
+ "display_name": "Subject Common Name"
+ },
+ "end_of_general_support": {
+ "data": true,
+ "display_name": "End of general support",
+ "description": "Is the software currently supported?"
+ }
+ },
+ "result": {
+ "cve_2019_0001": {
+ "display_name": "CVE-2019-0001",
+ "vulnerable": false
+ },
+ "cve_2015_0002": {
+ "display_name": "CVE-2015-0002",
+ "vulnerable": false,
+ "description": "There is a use of insufficiently random values vulnerability. An unauthenticated, remote attacker can guess information by a large number of attempts. Successful exploitation may cause information leak."
+ },
+ "cve_2015_0003": {
+ "display_name": "CVE-2015-0003",
+ "vulnerable": true,
+ "reliability": 2,
+ "description": "A carefully crafted request body can cause a read to a random memory area which could cause the process to crash."
+ },
+ "cve_2015_0004": {
+ "display_name": "CVE-2015-0004",
+ "vulnerable": false
+ },
+ "cve_2015_0005": {
+ "display_name": "CVE-2015-0005",
+ "vulnerable": true,
+ "reliability": 4
+ }
+ }
+}
diff --git a/tests/data/example_data_3_replace_test.json b/tests/data/example_data_3_replace_test.json
new file mode 100644
index 0000000..31cc64d
--- /dev/null
+++ b/tests/data/example_data_3_replace_test.json
@@ -0,0 +1,52 @@
+{
+ "_id": "6370498050845fac09e0fc01",
+ "document_version": 2,
+ "ip": "192.0.2.28",
+ "port": 112,
+ "whois_description": "SOMENET",
+ "asn": "AS65001",
+ "asn_country_code": "SE",
+ "ptr": "host111.test.soc.sunet.se",
+ "abuse_mail": "abuse@test.soc.sunet.se",
+ "domain": "sunet.se",
+ "timestamp": "2021-06-30T15:00:00Z",
+ "display_name": "VMware ESXi 6.7.0 build-17700523",
+ "description": "VMware ESXi is an enterprise-class, type-1 hypervisor developed by VMware for deploying and serving virtual computers. As a type-1 hypervisor, ESXi is not a software application that is installed on an operating system; instead, it includes and integrates vital OS components, such as a kernel.",
+ "custom_data": {
+ "subject_cn": {
+ "data": "VMware ESXi",
+ "display_name": "Subject Common Name"
+ },
+ "end_of_general_support": {
+ "data": true,
+ "display_name": "End of general support",
+ "description": "Is the software currently supported?"
+ }
+ },
+ "result": {
+ "cve_2019_0001": {
+ "display_name": "CVE-2019-0001",
+ "vulnerable": false
+ },
+ "cve_2015_0002": {
+ "display_name": "CVE-2015-0002",
+ "vulnerable": false,
+ "description": "There is a use of insufficiently random values vulnerability. An unauthenticated, remote attacker can guess information by a large number of attempts. Successful exploitation may cause information leak."
+ },
+ "cve_2015_0003": {
+ "display_name": "CVE-2015-0003",
+ "vulnerable": true,
+ "reliability": 2,
+ "description": "A carefully crafted request body can cause a read to a random memory area which could cause the process to crash."
+ },
+ "cve_2015_0004": {
+ "display_name": "CVE-2015-0004",
+ "vulnerable": false
+ },
+ "cve_2015_0005": {
+ "display_name": "CVE-2015-0005",
+ "vulnerable": true,
+ "reliability": 4
+ }
+ }
+}
diff --git a/tests/test_auth.py b/tests/test_auth.py
new file mode 100644
index 0000000..d3fbf23
--- /dev/null
+++ b/tests/test_auth.py
@@ -0,0 +1,275 @@
+"""
+Test our auth
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAuth(unittest.TestCase):
+ """
+ Test our auth
+ """
+
+ def test_auth_info(self) -> None:
+ """
+ Test auth info
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+
+ # Test no key
+ req = requests.get(f"{BASE_URL}/info", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.get(
+ f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.get(
+ f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_insert(self) -> None:
+ """
+ Test auth insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ # Test no key
+ req = requests.post(f"{BASE_URL}/sc/v0", json=insert_data, timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ key = json.loads(req.text)["_id"]
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_replace(self) -> None:
+ """
+ Test auth replace
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ replace_data = json_load_data("./tests/data/example_data_1_replace_test.json")
+
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ replace_data["_id"] = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.put(f"{BASE_URL}/sc/v0", json=replace_data, timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=replace_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=replace_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{replace_data['_id']}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_get(self) -> None:
+ """
+ Test auth get
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.get(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_delete(self) -> None:
+ """
+ Test auth delete
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.delete(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_search(self) -> None:
+ """
+ Test auth search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["timestamp"] = "2021-06-21T15:06:00Z"
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+ search_data = {"filter": {"timestamp": insert_data["timestamp"]}}
+
+ # Test no key
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search", json=search_data, timeout=5, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
diff --git a/tests/test_delete.py b/tests/test_delete.py
new file mode 100644
index 0000000..855d987
--- /dev/null
+++ b/tests/test_delete.py
@@ -0,0 +1,86 @@
+"""
+Test our delete
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our delete
+ """
+
+ def test_delete(self) -> None:
+ """
+ Test delete
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/dummy",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/63765238890b48a0c3118f4f",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 404)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 404)
+
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 404)
diff --git a/tests/test_get.py b/tests/test_get.py
new file mode 100644
index 0000000..377f35a
--- /dev/null
+++ b/tests/test_get.py
@@ -0,0 +1,82 @@
+"""
+Test our get
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our get
+ """
+
+ def test_get(self) -> None:
+ """
+ Test get
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/dummy",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/63765238890b48a0c3118f4f",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 404)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data1 = json.loads(req.text)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data2 = json.loads(req.text)
+ self.assertTrue(data1 == data2)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
diff --git a/tests/test_info.py b/tests/test_info.py
new file mode 100644
index 0000000..ef775d7
--- /dev/null
+++ b/tests/test_info.py
@@ -0,0 +1,37 @@
+"""
+Test our info
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our info
+ """
+
+ def test_info(self) -> None:
+ """
+ Test info
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ request_headers = {"API-KEY": api_keys[-1]}
+
+ req = requests.get(
+ f"{BASE_URL}/info",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)
+ self.assertTrue("Estimated document count" in data)
+ self.assertTrue(isinstance(data["Estimated document count"], int))
+ self.assertTrue(data["Estimated document count"] >= 0)
diff --git a/tests/test_insert.py b/tests/test_insert.py
new file mode 100644
index 0000000..5bee72d
--- /dev/null
+++ b/tests/test_insert.py
@@ -0,0 +1,110 @@
+"""
+Test our insert
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our insert
+ """
+
+ def test_insert(self) -> None:
+ """
+ Test insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json={"dummy_data": "dummy"},
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ bad_data = insert_data.copy()
+ bad_data["_id"] = "63765238890b49a0c3118f4f"
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=bad_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+ del bad_data["_id"]
+
+ del bad_data["ip"]
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=bad_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ # Allow duplicate data but with different id
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key2 = json.loads(req.text)["_id"]
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ db_data = json.loads(req.text)["doc"]
+ del db_data["_id"]
+ self.assertTrue(insert_data == db_data)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key2}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
diff --git a/tests/test_replace.py b/tests/test_replace.py
new file mode 100644
index 0000000..291207f
--- /dev/null
+++ b/tests/test_replace.py
@@ -0,0 +1,102 @@
+"""
+Test our replace
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our replace
+ """
+
+ def test_replace(self) -> None:
+ """
+ Test replace
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ replace_data = json_load_data("./tests/data/example_data_1_replace_test.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ replace_data["_id"] = json.loads(req.text)["_id"]
+
+ bad_data = replace_data.copy()
+
+ # ip missing
+ del bad_data["ip"]
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=bad_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ bad_data["ip"] = replace_data["ip"]
+ del bad_data["_id"]
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=bad_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ bad_data["_id"] = "sdvnsvdlac"
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=bad_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 400)
+
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=replace_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{replace_data['_id']}",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ db_data = json.loads(req.text)["doc"]
+ self.assertTrue(replace_data == db_data)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{replace_data['_id']}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
diff --git a/tests/test_search.py b/tests/test_search.py
new file mode 100644
index 0000000..8a01332
--- /dev/null
+++ b/tests/test_search.py
@@ -0,0 +1,125 @@
+"""
+Test our search
+"""
+from typing import Dict, Any
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our search
+ """
+
+ def test_search(self) -> None:
+ """
+ Test search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["ip"] = "test_dummy_ip1"
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key1 = json.loads(req.text)["_id"]
+
+ insert_data["port"] = 4123
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key2 = json.loads(req.text)["_id"]
+
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 422)
+
+ search_data: Dict[str, Any] = {"dummy": {"ip": "test_dummy_ip"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 422)
+
+ search_data = {"filter": {"ip": "test_dummy_ip"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(data == [])
+
+ search_data = {"filter": {"ip": "test_dummy_ip1"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(len(data) == 2)
+ del data[0]["_id"]
+ del data[1]["_id"]
+ self.assertTrue(data[0] != insert_data)
+ self.assertTrue(data[1] == insert_data)
+
+ search_data = {"filter": {"ip": "test_dummy_ip1", "port": 4123}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(len(data) == 1)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key1}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key2}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
diff --git a/tests/test_soc_collector_cli.py b/tests/test_soc_collector_cli.py
new file mode 100644
index 0000000..f867d60
--- /dev/null
+++ b/tests/test_soc_collector_cli.py
@@ -0,0 +1,244 @@
+"""
+Test our cli
+"""
+import unittest
+import io
+import sys
+import json
+import os
+
+from src.soc_collector.auth import load_api_keys
+
+from src.soc_collector.soc_collector_cli import (
+ json_load_data,
+ info_action,
+ get_action,
+ replace_action,
+ delete_action,
+ insert_action,
+ update_local_action,
+ search_action,
+)
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our cli
+ """
+
+ def test_json_load_data(self) -> None:
+ """
+ Test cli json_load_data
+ """
+
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ with open("./json_load_data_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(insert_data))
+
+ data = json_load_data("./json_load_data_test_here11.json")
+ self.assertTrue(isinstance(data, dict))
+
+ data = json_load_data('{"filter": {"ip": "123.123.4.123"}}')
+ self.assertTrue(isinstance(data, dict))
+
+ # Remove test data
+ os.remove("./json_load_data_test_here11.json")
+
+ def test_cli_info(self) -> None:
+ """
+ Test cli info
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ output = io.StringIO()
+ sys.stdout = output
+ info_action(api_keys[-1], BASE_URL)
+ self.assertTrue("Estimated document count: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ def test_cli_insert(self) -> None:
+ """
+ Test cli insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action("./tests/data/example_data_1.json", api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key2 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ # Delete test data
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ delete_action(key2, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_replace(self) -> None:
+ """
+ Test cli insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ replace_vals = {"ip": "replace_test1"}
+ replace_data = insert_data.copy()
+ replace_data.update(replace_vals)
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ replace_data["_id"] = key1
+ replace_action(json.dumps(replace_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Replaced data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key2 = output.getvalue().split(" OK - key: ")[1].strip()
+ self.assertTrue(key1 == key2)
+
+ replace_data["ip"] = "replace_test2"
+ with open("./replace_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(replace_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ replace_action("./replace_test_here11.json", api_keys[-1], BASE_URL)
+ self.assertTrue("Replaced data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key3 = output.getvalue().split(" OK - key: ")[1].strip()
+ self.assertTrue(key1 == key2 == key3)
+
+ # Delete test data
+ os.remove("./replace_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_update_local(self) -> None:
+ """
+ Test cli update_local
+ """
+
+ data = json_load_data("./tests/data/example_data_1.json")
+ update_vals = {"ip": "update_local_test1"}
+ updated_data = data.copy()
+ updated_data.update(update_vals)
+
+ output = io.StringIO()
+ sys.stdout = output
+ update_local_action(json.dumps(data), json.dumps(update_vals))
+ self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ with open("./update_local_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ update_local_action("./update_local_test_here11.json", json.dumps(update_vals))
+ os.remove("./update_local_test_here11.json")
+ self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ def test_cli_get(self) -> None:
+ """
+ Test cli get
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ get_action(key1, api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ with open("./get_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(expected_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ get_action("./get_test_here11.json", api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ # Delete test data
+ os.remove("./get_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_search(self) -> None:
+ """
+ Test cli search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["ip"] = "search_dummy_ip1"
+ search_data = {"filter": {"ip": "search_dummy_ip1"}}
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ search_action(json.dumps(search_data), api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1])
+ sys.stdout = sys.__stdout__
+
+ with open("./search_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(search_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ search_action("./search_test_here11.json", api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1])
+ sys.stdout = sys.__stdout__
+
+ # Delete test data
+ os.remove("./search_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__