summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-02 15:31:23 +0100
committerVictor Näslund <victor@sunet.se>2022-11-02 15:31:23 +0100
commit8baecf339e8061160bee519e87ffe837d1525c18 (patch)
tree22664c10f22382b1d4647b5f2e96bcea4220d879 /tests
parentffb26f4a81a9ca61c4105df037f7e1beb8dc5fb0 (diff)
more freshup
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/test_api.py232
2 files changed, 232 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_api.py b/tests/test_api.py
new file mode 100644
index 0000000..371fcf2
--- /dev/null
+++ b/tests/test_api.py
@@ -0,0 +1,232 @@
+import os
+import time
+import pytest
+import random
+import ipaddress
+
+from main import app
+from fastapi import FastAPI
+from fastapi import testclient
+
+client = testclient.TestClient(app)
+JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A'
+JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'}
+
+
+def test_001():
+ print("*** Adding document.")
+
+ doc_port = random.randint(1, 65536)
+ doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
+ doc_asn = str(doc_ip) + '_' + str(doc_port)
+
+ json_data = {
+ 'ip': doc_ip,
+ 'port': doc_port,
+ 'whois_description': 'unittest',
+ 'asn': doc_asn,
+ 'asn_country_code': 'SE',
+ 'ptr': 'unittest.example.com',
+ 'abuse_mail': 'unittest@example.com',
+ 'domain': 'sunet.se',
+ 'timestamp_in_utc': '2021-06-21T14:06UTC',
+ 'producer_unique_keys': {
+ 'subject_cn': 'unittest',
+ 'subject_o': 'unittest',
+ 'full_name': 'unittest',
+ 'end_of_general_support': False,
+ 'cve_2021_21972': 'unittest',
+ 'cve_2021_21974': 'unittest',
+ 'cve_2021_21985': 'unittest'
+ }
+ }
+
+ response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+
+ response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['port'] == doc_port)
+
+ response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['asn'] == doc_asn)
+
+ response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['ip'] == doc_ip)
+
+
+def test_002():
+ nr_documents = 100
+ starttime = time.time()
+
+ for i in range(nr_documents):
+ doc_port = random.randint(1, 65536)
+ doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
+ doc_asn = str(doc_ip) + '_' + str(doc_port)
+
+ json_data = {
+ 'ip': doc_ip,
+ 'port': doc_port,
+ 'whois_description': 'unittest',
+ 'asn': doc_asn,
+ 'asn_country_code': 'SE',
+ 'ptr': 'unittest.example.com',
+ 'abuse_mail': 'unittest@example.com',
+ 'domain': 'sunet.se',
+ 'timestamp_in_utc': '2021-06-21T14:06UTC',
+ 'producer_unique_keys': {
+ 'subject_cn': 'unittest',
+ 'subject_o': 'unittest',
+ 'full_name': 'unittest',
+ 'end_of_general_support': False,
+ 'cve_2021_21972': 'unittest',
+ 'cve_2021_21974': 'unittest',
+ 'cve_2021_21985': 'unittest'
+ }
+ }
+
+ response = client.post(
+ "/sc/v0/add", headers=JWT_HEADER, json=json_data)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+
+ response = client.get(
+ f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['port'] == doc_port)
+
+ response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['asn'] == doc_asn)
+
+ response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+ assert(len(response.json()['docs']) == 1)
+ assert(response.json()['docs'][0]['ip'] == doc_ip)
+
+ stop_time = str(time.time() - starttime)
+ print(f"*** Adding {nr_documents} documents took {stop_time} seconds.")
+
+
+def test_003():
+ response = client.get("/sc/v0/get", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+
+ for doc in response.json()['docs']:
+ doc_id = doc['_id']
+
+ response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
+ assert(response_doc.status_code == 200)
+ assert(response_doc.json()['status'] == 'success')
+ assert(type(response_doc.json()['docs']) == type(dict()))
+ assert(response_doc.json()['docs']['domain'] == 'sunet.se')
+
+
+def test_004():
+ response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER)
+ assert(response.status_code == 200)
+
+ nr_documents = len(response.json()['docs'])
+ starttime = time.time()
+
+ for doc in response.json()['docs']:
+ doc_id = doc['_id']
+ response_doc = client.delete(
+ f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER)
+ assert(response_doc.status_code == 200)
+ assert(response_doc.json()['status'] == 'success')
+ response_doc = client.get(
+ f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
+ assert(response_doc.status_code == 200)
+ assert(response_doc.json()['status'] == 'success')
+ assert(response_doc.json()['docs'] == {})
+
+ stop_time = str(time.time() - starttime)
+ print(f"*** Removing {nr_documents} documents took {stop_time} seconds.")
+
+ print("*** Removing document with invalid ID.")
+ response = client.delete(
+ "/sc/v0/delete/nonexistent", headers=JWT_HEADER)
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+
+
+def test_005():
+ print("*** Accessing endpoints without JWT token...")
+
+ response = client.get("/sc/v0/get?limit=1000")
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+
+ response = client.get("/sc/v0/get/unittest")
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+
+ response = client.post("/sc/v0/add", json={"data": "nothing"})
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+
+ response = client.delete("/sc/v0/delete/unittest")
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+
+def test_006():
+ print("*** Add doc for unauthorized domain (this is allowed, currently)")
+
+ doc_port = random.randint(1, 65536)
+ doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
+ doc_asn = str(doc_ip) + '_' + str(doc_port)
+
+ json_data = {
+ 'ip': doc_ip,
+ 'port': doc_port,
+ 'whois_description': 'unittest',
+ 'asn': doc_asn,
+ 'asn_country_code': 'SE',
+ 'ptr': 'unittest.example.com',
+ 'abuse_mail': 'unittest@example.com',
+ 'domain': 'sunet.se',
+ 'timestamp_in_utc': '2021-06-21T14:06UTC',
+ 'producer_unique_keys': {
+ 'subject_cn': 'unittest',
+ 'subject_o': 'unittest',
+ 'full_name': 'unittest',
+ 'end_of_general_support': False,
+ 'cve_2021_21972': 'unittest',
+ 'cve_2021_21974': 'unittest',
+ 'cve_2021_21985': 'unittest'
+ }
+ }
+
+ response = client.post(
+ "/sc/v0/add", headers=JWT_HEADER, json=dict(json_data, domain="example.com")
+ )
+ assert(response.status_code == 200)
+ assert(response.json()['status'] == 'success')
+
+ print("*** Get doc for unauthorized domain (not allowed)")
+ doc_id = response.json()['docs']['_id']
+ response = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+ assert(response.json()['message'] == 'User not authorized to view this object')
+
+ print("*** Delete doc for unauthorized domain (not allowed)")
+ response = client.delete(f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER)
+ assert(response.status_code == 400)
+ assert(response.json()['status'] == 'error')
+ assert(response.json()['message'] == 'User not authorized to delete this object')