diff options
Diffstat (limited to 'tests/test_auth.py')
-rw-r--r-- | tests/test_auth.py | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..d3fbf23 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,275 @@ +""" +Test our auth +""" +import unittest +import json + +import requests + +from src.soc_collector.auth import load_api_keys +from src.soc_collector.soc_collector_cli import json_load_data + +BASE_URL = "https://localhost:8000" + + +class TestAuth(unittest.TestCase): + """ + Test our auth + """ + + def test_auth_info(self) -> None: + """ + Test auth info + """ + + api_keys = load_api_keys("data/api_keys.txt") + + # Test no key + req = requests.get(f"{BASE_URL}/info", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.get( + f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.get( + f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_insert(self) -> None: + """ + Test auth insert + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + # Test no key + req = requests.post(f"{BASE_URL}/sc/v0", json=insert_data, timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + key = json.loads(req.text)["_id"] + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_replace(self) -> None: + """ + Test auth replace + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + replace_data = json_load_data("./tests/data/example_data_1_replace_test.json") + + request_headers = {"API-KEY": api_keys[0]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + replace_data["_id"] = json.loads(req.text)["_id"] + + # Test no key + req = requests.put(f"{BASE_URL}/sc/v0", json=replace_data, timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.put( + f"{BASE_URL}/sc/v0", + json=replace_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.put( + f"{BASE_URL}/sc/v0", + json=replace_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{replace_data['_id']}", + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + def test_auth_get(self) -> None: + """ + Test auth get + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + # Test no key + req = requests.get(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[-1]} + req = requests.get( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_delete(self) -> None: + """ + Test auth delete + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + + # Test no key + req = requests.delete(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[0]} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) + + def test_auth_search(self) -> None: + """ + Test auth search + """ + + api_keys = load_api_keys("data/api_keys.txt") + insert_data = json_load_data("./tests/data/example_data_1.json") + insert_data["timestamp"] = "2021-06-21T15:06:00Z" + + request_headers = {"API-KEY": api_keys[-1]} + req = requests.post( + f"{BASE_URL}/sc/v0", + json=insert_data, + headers=request_headers, + timeout=5, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + key = json.loads(req.text)["_id"] + search_data = {"filter": {"timestamp": insert_data["timestamp"]}} + + # Test no key + req = requests.post( + f"{BASE_URL}/sc/v0/search", json=search_data, timeout=5, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 401) + + # Test wrong api key + request_headers = {"API-KEY": "dummy"} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 401) + + # OK api key + request_headers = {"API-KEY": api_keys[0]} + req = requests.post( + f"{BASE_URL}/sc/v0/search", + json=search_data, + headers=request_headers, + timeout=4, + verify="./data/collector_root_ca.crt", + ) + self.assertTrue(req.status_code == 200) + + # Delete test data + request_headers = {"API-KEY": api_keys[0]} + req = requests.delete( + f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" + ) + self.assertTrue(req.status_code == 200) |