summaryrefslogtreecommitdiff
path: root/tests/test_search.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_search.py')
-rw-r--r--tests/test_search.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/tests/test_search.py b/tests/test_search.py
new file mode 100644
index 0000000..8a01332
--- /dev/null
+++ b/tests/test_search.py
@@ -0,0 +1,125 @@
+"""
+Test our search
+"""
+from typing import Dict, Any
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our search
+ """
+
+ def test_search(self) -> None:
+ """
+ Test search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["ip"] = "test_dummy_ip1"
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key1 = json.loads(req.text)["_id"]
+
+ insert_data["port"] = 4123
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key2 = json.loads(req.text)["_id"]
+
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 422)
+
+ search_data: Dict[str, Any] = {"dummy": {"ip": "test_dummy_ip"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 422)
+
+ search_data = {"filter": {"ip": "test_dummy_ip"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(data == [])
+
+ search_data = {"filter": {"ip": "test_dummy_ip1"}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(len(data) == 2)
+ del data[0]["_id"]
+ del data[1]["_id"]
+ self.assertTrue(data[0] != insert_data)
+ self.assertTrue(data[1] == insert_data)
+
+ search_data = {"filter": {"ip": "test_dummy_ip1", "port": 4123}}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ data = json.loads(req.text)["docs"]
+ self.assertTrue(len(data) == 1)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key1}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key2}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)