summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorVictor Näslund <victor@sunet.se>2022-11-13 04:12:47 +0100
committerVictor Näslund <victor@sunet.se>2022-11-13 04:12:47 +0100
commitf7a40b9e13d242968db83acaac13660224eb0143 (patch)
treed8f0cdf5d93cc1aebc83343aea6615bc2ee9bc55 /tests
parent8baecf339e8061160bee519e87ffe837d1525c18 (diff)
new direction
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/test_api.py232
2 files changed, 0 insertions, 232 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/tests/__init__.py
+++ /dev/null
diff --git a/tests/test_api.py b/tests/test_api.py
deleted file mode 100644
index 371fcf2..0000000
--- a/tests/test_api.py
+++ /dev/null
@@ -1,232 +0,0 @@
-import os
-import time
-import pytest
-import random
-import ipaddress
-
-from main import app
-from fastapi import FastAPI
-from fastapi import testclient
-
-client = testclient.TestClient(app)
-JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A'
-JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'}
-
-
-def test_001():
- print("*** Adding document.")
-
- doc_port = random.randint(1, 65536)
- doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
- doc_asn = str(doc_ip) + '_' + str(doc_port)
-
- json_data = {
- 'ip': doc_ip,
- 'port': doc_port,
- 'whois_description': 'unittest',
- 'asn': doc_asn,
- 'asn_country_code': 'SE',
- 'ptr': 'unittest.example.com',
- 'abuse_mail': 'unittest@example.com',
- 'domain': 'sunet.se',
- 'timestamp_in_utc': '2021-06-21T14:06UTC',
- 'producer_unique_keys': {
- 'subject_cn': 'unittest',
- 'subject_o': 'unittest',
- 'full_name': 'unittest',
- 'end_of_general_support': False,
- 'cve_2021_21972': 'unittest',
- 'cve_2021_21974': 'unittest',
- 'cve_2021_21985': 'unittest'
- }
- }
-
- response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
-
- response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['port'] == doc_port)
-
- response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['asn'] == doc_asn)
-
- response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['ip'] == doc_ip)
-
-
-def test_002():
- nr_documents = 100
- starttime = time.time()
-
- for i in range(nr_documents):
- doc_port = random.randint(1, 65536)
- doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
- doc_asn = str(doc_ip) + '_' + str(doc_port)
-
- json_data = {
- 'ip': doc_ip,
- 'port': doc_port,
- 'whois_description': 'unittest',
- 'asn': doc_asn,
- 'asn_country_code': 'SE',
- 'ptr': 'unittest.example.com',
- 'abuse_mail': 'unittest@example.com',
- 'domain': 'sunet.se',
- 'timestamp_in_utc': '2021-06-21T14:06UTC',
- 'producer_unique_keys': {
- 'subject_cn': 'unittest',
- 'subject_o': 'unittest',
- 'full_name': 'unittest',
- 'end_of_general_support': False,
- 'cve_2021_21972': 'unittest',
- 'cve_2021_21974': 'unittest',
- 'cve_2021_21985': 'unittest'
- }
- }
-
- response = client.post(
- "/sc/v0/add", headers=JWT_HEADER, json=json_data)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
-
- response = client.get(
- f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['port'] == doc_port)
-
- response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['asn'] == doc_asn)
-
- response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER)
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
- assert(len(response.json()['docs']) == 1)
- assert(response.json()['docs'][0]['ip'] == doc_ip)
-
- stop_time = str(time.time() - starttime)
- print(f"*** Adding {nr_documents} documents took {stop_time} seconds.")
-
-
-def test_003():
- response = client.get("/sc/v0/get", headers=JWT_HEADER)
- assert(response.status_code == 200)
-
- for doc in response.json()['docs']:
- doc_id = doc['_id']
-
- response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
- assert(response_doc.status_code == 200)
- assert(response_doc.json()['status'] == 'success')
- assert(type(response_doc.json()['docs']) == type(dict()))
- assert(response_doc.json()['docs']['domain'] == 'sunet.se')
-
-
-def test_004():
- response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER)
- assert(response.status_code == 200)
-
- nr_documents = len(response.json()['docs'])
- starttime = time.time()
-
- for doc in response.json()['docs']:
- doc_id = doc['_id']
- response_doc = client.delete(
- f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER)
- assert(response_doc.status_code == 200)
- assert(response_doc.json()['status'] == 'success')
- response_doc = client.get(
- f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
- assert(response_doc.status_code == 200)
- assert(response_doc.json()['status'] == 'success')
- assert(response_doc.json()['docs'] == {})
-
- stop_time = str(time.time() - starttime)
- print(f"*** Removing {nr_documents} documents took {stop_time} seconds.")
-
- print("*** Removing document with invalid ID.")
- response = client.delete(
- "/sc/v0/delete/nonexistent", headers=JWT_HEADER)
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
-
-
-def test_005():
- print("*** Accessing endpoints without JWT token...")
-
- response = client.get("/sc/v0/get?limit=1000")
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
-
- response = client.get("/sc/v0/get/unittest")
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
-
- response = client.post("/sc/v0/add", json={"data": "nothing"})
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
-
- response = client.delete("/sc/v0/delete/unittest")
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
-
-def test_006():
- print("*** Add doc for unauthorized domain (this is allowed, currently)")
-
- doc_port = random.randint(1, 65536)
- doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff)))
- doc_asn = str(doc_ip) + '_' + str(doc_port)
-
- json_data = {
- 'ip': doc_ip,
- 'port': doc_port,
- 'whois_description': 'unittest',
- 'asn': doc_asn,
- 'asn_country_code': 'SE',
- 'ptr': 'unittest.example.com',
- 'abuse_mail': 'unittest@example.com',
- 'domain': 'sunet.se',
- 'timestamp_in_utc': '2021-06-21T14:06UTC',
- 'producer_unique_keys': {
- 'subject_cn': 'unittest',
- 'subject_o': 'unittest',
- 'full_name': 'unittest',
- 'end_of_general_support': False,
- 'cve_2021_21972': 'unittest',
- 'cve_2021_21974': 'unittest',
- 'cve_2021_21985': 'unittest'
- }
- }
-
- response = client.post(
- "/sc/v0/add", headers=JWT_HEADER, json=dict(json_data, domain="example.com")
- )
- assert(response.status_code == 200)
- assert(response.json()['status'] == 'success')
-
- print("*** Get doc for unauthorized domain (not allowed)")
- doc_id = response.json()['docs']['_id']
- response = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER)
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
- assert(response.json()['message'] == 'User not authorized to view this object')
-
- print("*** Delete doc for unauthorized domain (not allowed)")
- response = client.delete(f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER)
- assert(response.status_code == 400)
- assert(response.json()['status'] == 'error')
- assert(response.json()['message'] == 'User not authorized to delete this object')