summaryrefslogtreecommitdiff
path: root/tests/test_auth.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_auth.py')
-rw-r--r--tests/test_auth.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
new file mode 100644
index 0000000..d3fbf23
--- /dev/null
+++ b/tests/test_auth.py
@@ -0,0 +1,275 @@
+"""
+Test our auth
+"""
+import unittest
+import json
+
+import requests
+
+from src.soc_collector.auth import load_api_keys
+from src.soc_collector.soc_collector_cli import json_load_data
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAuth(unittest.TestCase):
+ """
+ Test our auth
+ """
+
+ def test_auth_info(self) -> None:
+ """
+ Test auth info
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+
+ # Test no key
+ req = requests.get(f"{BASE_URL}/info", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.get(
+ f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.get(
+ f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_insert(self) -> None:
+ """
+ Test auth insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ # Test no key
+ req = requests.post(f"{BASE_URL}/sc/v0", json=insert_data, timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ key = json.loads(req.text)["_id"]
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_replace(self) -> None:
+ """
+ Test auth replace
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ replace_data = json_load_data("./tests/data/example_data_1_replace_test.json")
+
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ replace_data["_id"] = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.put(f"{BASE_URL}/sc/v0", json=replace_data, timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=replace_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.put(
+ f"{BASE_URL}/sc/v0",
+ json=replace_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{replace_data['_id']}",
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_get(self) -> None:
+ """
+ Test auth get
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.get(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.get(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_delete(self) -> None:
+ """
+ Test auth delete
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+
+ # Test no key
+ req = requests.delete(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt")
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)
+
+ def test_auth_search(self) -> None:
+ """
+ Test auth search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["timestamp"] = "2021-06-21T15:06:00Z"
+
+ request_headers = {"API-KEY": api_keys[-1]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0",
+ json=insert_data,
+ headers=request_headers,
+ timeout=5,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+ key = json.loads(req.text)["_id"]
+ search_data = {"filter": {"timestamp": insert_data["timestamp"]}}
+
+ # Test no key
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search", json=search_data, timeout=5, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # Test wrong api key
+ request_headers = {"API-KEY": "dummy"}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 401)
+
+ # OK api key
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.post(
+ f"{BASE_URL}/sc/v0/search",
+ json=search_data,
+ headers=request_headers,
+ timeout=4,
+ verify="./data/collector_root_ca.crt",
+ )
+ self.assertTrue(req.status_code == 200)
+
+ # Delete test data
+ request_headers = {"API-KEY": api_keys[0]}
+ req = requests.delete(
+ f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt"
+ )
+ self.assertTrue(req.status_code == 200)