1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""
Test our search
"""
from typing import Dict, Any
import unittest
import json
import requests
from src.soc_collector.auth import load_api_keys
from src.soc_collector.soc_collector_cli import json_load_data
BASE_URL = "https://localhost:8000"
class TestAddress(unittest.TestCase):
"""
Test our search
"""
def test_search(self) -> None:
"""
Test search
"""
api_keys = load_api_keys("data/api_keys.txt")
insert_data = json_load_data("./tests/data/example_data_1.json")
insert_data["ip"] = "test_dummy_ip1"
request_headers = {"API-KEY": api_keys[-1]}
req = requests.post(
f"{BASE_URL}/sc/v0",
json=insert_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
key1 = json.loads(req.text)["_id"]
insert_data["port"] = 4123
req = requests.post(
f"{BASE_URL}/sc/v0",
json=insert_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
key2 = json.loads(req.text)["_id"]
req = requests.post(
f"{BASE_URL}/sc/v0/search",
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 422)
search_data: Dict[str, Any] = {"dummy": {"ip": "test_dummy_ip"}}
req = requests.post(
f"{BASE_URL}/sc/v0/search",
json=search_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 422)
search_data = {"filter": {"ip": "test_dummy_ip"}}
req = requests.post(
f"{BASE_URL}/sc/v0/search",
json=search_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
data = json.loads(req.text)["docs"]
self.assertTrue(data == [])
search_data = {"filter": {"ip": "test_dummy_ip1"}}
req = requests.post(
f"{BASE_URL}/sc/v0/search",
json=search_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
data = json.loads(req.text)["docs"]
self.assertTrue(len(data) == 2)
del data[0]["_id"]
del data[1]["_id"]
self.assertTrue(data[0] != insert_data)
self.assertTrue(data[1] == insert_data)
search_data = {"filter": {"ip": "test_dummy_ip1", "port": 4123}}
req = requests.post(
f"{BASE_URL}/sc/v0/search",
json=search_data,
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
data = json.loads(req.text)["docs"]
self.assertTrue(len(data) == 1)
# Delete test data
req = requests.delete(
f"{BASE_URL}/sc/v0/{key1}",
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
req = requests.delete(
f"{BASE_URL}/sc/v0/{key2}",
headers=request_headers,
timeout=4,
verify="./data/collector_root_ca.crt",
)
self.assertTrue(req.status_code == 200)
|