diff options
author | Victor Näslund <victor@sunet.se> | 2022-11-13 04:12:47 +0100 |
---|---|---|
committer | Victor Näslund <victor@sunet.se> | 2022-11-13 04:12:47 +0100 |
commit | f7a40b9e13d242968db83acaac13660224eb0143 (patch) | |
tree | d8f0cdf5d93cc1aebc83343aea6615bc2ee9bc55 /tests | |
parent | 8baecf339e8061160bee519e87ffe837d1525c18 (diff) |
new direction
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/test_api.py | 232 |
2 files changed, 0 insertions, 232 deletions
diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tests/__init__.py +++ /dev/null diff --git a/tests/test_api.py b/tests/test_api.py deleted file mode 100644 index 371fcf2..0000000 --- a/tests/test_api.py +++ /dev/null @@ -1,232 +0,0 @@ -import os -import time -import pytest -import random -import ipaddress - -from main import app -from fastapi import FastAPI -from fastapi import testclient - -client = testclient.TestClient(app) -JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A' -JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'} - - -def test_001(): - print("*** Adding document.") - - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['port'] == doc_port) - - response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['asn'] == doc_asn) - - response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['ip'] == doc_ip) - - -def test_002(): - nr_documents = 100 - starttime = time.time() - - for i in range(nr_documents): - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post( - "/sc/v0/add", headers=JWT_HEADER, json=json_data) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.get( - f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['port'] == doc_port) - - response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['asn'] == doc_asn) - - response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - assert(len(response.json()['docs']) == 1) - assert(response.json()['docs'][0]['ip'] == doc_ip) - - stop_time = str(time.time() - starttime) - print(f"*** Adding {nr_documents} documents took {stop_time} seconds.") - - -def test_003(): - response = client.get("/sc/v0/get", headers=JWT_HEADER) - assert(response.status_code == 200) - - for doc in response.json()['docs']: - doc_id = doc['_id'] - - response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - assert(type(response_doc.json()['docs']) == type(dict())) - assert(response_doc.json()['docs']['domain'] == 'sunet.se') - - -def test_004(): - response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER) - assert(response.status_code == 200) - - nr_documents = len(response.json()['docs']) - starttime = time.time() - - for doc in response.json()['docs']: - doc_id = doc['_id'] - response_doc = client.delete( - f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - response_doc = client.get( - f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response_doc.status_code == 200) - assert(response_doc.json()['status'] == 'success') - assert(response_doc.json()['docs'] == {}) - - stop_time = str(time.time() - starttime) - print(f"*** Removing {nr_documents} documents took {stop_time} seconds.") - - print("*** Removing document with invalid ID.") - response = client.delete( - "/sc/v0/delete/nonexistent", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - -def test_005(): - print("*** Accessing endpoints without JWT token...") - - response = client.get("/sc/v0/get?limit=1000") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - response = client.get("/sc/v0/get/unittest") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - - response = client.post("/sc/v0/add", json={"data": "nothing"}) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - response = client.delete("/sc/v0/delete/unittest") - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - -def test_006(): - print("*** Add doc for unauthorized domain (this is allowed, currently)") - - doc_port = random.randint(1, 65536) - doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) - doc_asn = str(doc_ip) + '_' + str(doc_port) - - json_data = { - 'ip': doc_ip, - 'port': doc_port, - 'whois_description': 'unittest', - 'asn': doc_asn, - 'asn_country_code': 'SE', - 'ptr': 'unittest.example.com', - 'abuse_mail': 'unittest@example.com', - 'domain': 'sunet.se', - 'timestamp_in_utc': '2021-06-21T14:06UTC', - 'producer_unique_keys': { - 'subject_cn': 'unittest', - 'subject_o': 'unittest', - 'full_name': 'unittest', - 'end_of_general_support': False, - 'cve_2021_21972': 'unittest', - 'cve_2021_21974': 'unittest', - 'cve_2021_21985': 'unittest' - } - } - - response = client.post( - "/sc/v0/add", headers=JWT_HEADER, json=dict(json_data, domain="example.com") - ) - assert(response.status_code == 200) - assert(response.json()['status'] == 'success') - - print("*** Get doc for unauthorized domain (not allowed)") - doc_id = response.json()['docs']['_id'] - response = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - assert(response.json()['message'] == 'User not authorized to view this object') - - print("*** Delete doc for unauthorized domain (not allowed)") - response = client.delete(f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) - assert(response.status_code == 400) - assert(response.json()['status'] == 'error') - assert(response.json()['message'] == 'User not authorized to delete this object') |