""" Test our auth """ import unittest import json import requests from src.soc_collector.auth import load_api_keys from src.soc_collector.soc_collector_cli import json_load_data BASE_URL = "https://localhost:8000" class TestAuth(unittest.TestCase): """ Test our auth """ def test_auth_info(self) -> None: """ Test auth info """ api_keys = load_api_keys("data/api_keys.txt") # Test no key req = requests.get(f"{BASE_URL}/info", timeout=5, verify="./data/collector_root_ca.crt") self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.get( f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[-1]} req = requests.get( f"{BASE_URL}/info", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200) def test_auth_insert(self) -> None: """ Test auth insert """ api_keys = load_api_keys("data/api_keys.txt") insert_data = json_load_data("./tests/data/example_data_1.json") # Test no key req = requests.post(f"{BASE_URL}/sc/v0", json=insert_data, timeout=5, verify="./data/collector_root_ca.crt") self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[-1]} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) # Delete test data key = json.loads(req.text)["_id"] req = requests.delete( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200) def test_auth_replace(self) -> None: """ Test auth replace """ api_keys = load_api_keys("data/api_keys.txt") insert_data = json_load_data("./tests/data/example_data_1.json") replace_data = json_load_data("./tests/data/example_data_1_replace_test.json") request_headers = {"API-KEY": api_keys[0]} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=5, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) replace_data["_id"] = json.loads(req.text)["_id"] # Test no key req = requests.put(f"{BASE_URL}/sc/v0", json=replace_data, timeout=5, verify="./data/collector_root_ca.crt") self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.put( f"{BASE_URL}/sc/v0", json=replace_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[-1]} req = requests.put( f"{BASE_URL}/sc/v0", json=replace_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) # Delete test data req = requests.delete( f"{BASE_URL}/sc/v0/{replace_data['_id']}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) def test_auth_get(self) -> None: """ Test auth get """ api_keys = load_api_keys("data/api_keys.txt") insert_data = json_load_data("./tests/data/example_data_1.json") request_headers = {"API-KEY": api_keys[-1]} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=5, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) key = json.loads(req.text)["_id"] # Test no key req = requests.get(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.get( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[-1]} req = requests.get( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200) # Delete test data req = requests.delete( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200) def test_auth_delete(self) -> None: """ Test auth delete """ api_keys = load_api_keys("data/api_keys.txt") insert_data = json_load_data("./tests/data/example_data_1.json") request_headers = {"API-KEY": api_keys[-1]} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=5, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) key = json.loads(req.text)["_id"] # Test no key req = requests.delete(f"{BASE_URL}/sc/v0/{key}", timeout=5, verify="./data/collector_root_ca.crt") self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.delete( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[0]} req = requests.delete( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200) def test_auth_search(self) -> None: """ Test auth search """ api_keys = load_api_keys("data/api_keys.txt") insert_data = json_load_data("./tests/data/example_data_1.json") insert_data["timestamp"] = "2021-06-21T15:06:00Z" request_headers = {"API-KEY": api_keys[-1]} req = requests.post( f"{BASE_URL}/sc/v0", json=insert_data, headers=request_headers, timeout=5, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) key = json.loads(req.text)["_id"] search_data = {"filter": {"timestamp": insert_data["timestamp"]}} # Test no key req = requests.post( f"{BASE_URL}/sc/v0/search", json=search_data, timeout=5, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 401) # Test wrong api key request_headers = {"API-KEY": "dummy"} req = requests.post( f"{BASE_URL}/sc/v0/search", json=search_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 401) # OK api key request_headers = {"API-KEY": api_keys[0]} req = requests.post( f"{BASE_URL}/sc/v0/search", json=search_data, headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt", ) self.assertTrue(req.status_code == 200) # Delete test data request_headers = {"API-KEY": api_keys[0]} req = requests.delete( f"{BASE_URL}/sc/v0/{key}", headers=request_headers, timeout=4, verify="./data/collector_root_ca.crt" ) self.assertTrue(req.status_code == 200)