summaryrefslogtreecommitdiff
path: root/tests/test_soc_collector_cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_soc_collector_cli.py')
-rw-r--r--tests/test_soc_collector_cli.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/tests/test_soc_collector_cli.py b/tests/test_soc_collector_cli.py
new file mode 100644
index 0000000..f867d60
--- /dev/null
+++ b/tests/test_soc_collector_cli.py
@@ -0,0 +1,244 @@
+"""
+Test our cli
+"""
+import unittest
+import io
+import sys
+import json
+import os
+
+from src.soc_collector.auth import load_api_keys
+
+from src.soc_collector.soc_collector_cli import (
+ json_load_data,
+ info_action,
+ get_action,
+ replace_action,
+ delete_action,
+ insert_action,
+ update_local_action,
+ search_action,
+)
+
+BASE_URL = "https://localhost:8000"
+
+
+class TestAddress(unittest.TestCase):
+ """
+ Test our cli
+ """
+
+ def test_json_load_data(self) -> None:
+ """
+ Test cli json_load_data
+ """
+
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ with open("./json_load_data_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(insert_data))
+
+ data = json_load_data("./json_load_data_test_here11.json")
+ self.assertTrue(isinstance(data, dict))
+
+ data = json_load_data('{"filter": {"ip": "123.123.4.123"}}')
+ self.assertTrue(isinstance(data, dict))
+
+ # Remove test data
+ os.remove("./json_load_data_test_here11.json")
+
+ def test_cli_info(self) -> None:
+ """
+ Test cli info
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ output = io.StringIO()
+ sys.stdout = output
+ info_action(api_keys[-1], BASE_URL)
+ self.assertTrue("Estimated document count: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ def test_cli_insert(self) -> None:
+ """
+ Test cli insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action("./tests/data/example_data_1.json", api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key2 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ # Delete test data
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ delete_action(key2, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_replace(self) -> None:
+ """
+ Test cli insert
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ replace_vals = {"ip": "replace_test1"}
+ replace_data = insert_data.copy()
+ replace_data.update(replace_vals)
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ replace_data["_id"] = key1
+ replace_action(json.dumps(replace_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Replaced data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key2 = output.getvalue().split(" OK - key: ")[1].strip()
+ self.assertTrue(key1 == key2)
+
+ replace_data["ip"] = "replace_test2"
+ with open("./replace_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(replace_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ replace_action("./replace_test_here11.json", api_keys[-1], BASE_URL)
+ self.assertTrue("Replaced data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key3 = output.getvalue().split(" OK - key: ")[1].strip()
+ self.assertTrue(key1 == key2 == key3)
+
+ # Delete test data
+ os.remove("./replace_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_update_local(self) -> None:
+ """
+ Test cli update_local
+ """
+
+ data = json_load_data("./tests/data/example_data_1.json")
+ update_vals = {"ip": "update_local_test1"}
+ updated_data = data.copy()
+ updated_data.update(update_vals)
+
+ output = io.StringIO()
+ sys.stdout = output
+ update_local_action(json.dumps(data), json.dumps(update_vals))
+ self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ with open("./update_local_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ update_local_action("./update_local_test_here11.json", json.dumps(update_vals))
+ os.remove("./update_local_test_here11.json")
+ self.assertTrue(json.dumps(updated_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ def test_cli_get(self) -> None:
+ """
+ Test cli get
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ get_action(key1, api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ with open("./get_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(expected_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ get_action("./get_test_here11.json", api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps(expected_data, indent=4) in output.getvalue())
+ sys.stdout = sys.__stdout__
+
+ # Delete test data
+ os.remove("./get_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__
+
+ def test_cli_search(self) -> None:
+ """
+ Test cli search
+ """
+
+ api_keys = load_api_keys("data/api_keys.txt")
+ insert_data = json_load_data("./tests/data/example_data_1.json")
+ insert_data["ip"] = "search_dummy_ip1"
+ search_data = {"filter": {"ip": "search_dummy_ip1"}}
+
+ output = io.StringIO()
+ sys.stdout = output
+ insert_action(json.dumps(insert_data), api_keys[-1], BASE_URL)
+ self.assertTrue("Inserted data OK - key: " in output.getvalue())
+ sys.stdout = sys.__stdout__
+ key1 = output.getvalue().split(" OK - key: ")[1].strip()
+
+ output = io.StringIO()
+ sys.stdout = output
+ search_action(json.dumps(search_data), api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1])
+ sys.stdout = sys.__stdout__
+
+ with open("./search_test_here11.json", "w", encoding="utf-8") as f_data:
+ f_data.write(json.dumps(search_data))
+
+ output = io.StringIO()
+ sys.stdout = output
+ search_action("./search_test_here11.json", api_keys[-1], BASE_URL)
+ expected_data = {"_id": key1}
+ expected_data.update(insert_data)
+ self.assertTrue(json.dumps([expected_data], indent=4) in output.getvalue()[:-1])
+ sys.stdout = sys.__stdout__
+
+ # Delete test data
+ os.remove("./search_test_here11.json")
+ output = io.StringIO()
+ sys.stdout = output
+ delete_action(key1, api_keys[-1], BASE_URL)
+ sys.stdout = sys.__stdout__