diff options
author | Victor Näslund <victor@sunet.se> | 2022-11-02 15:31:23 +0100 |
---|---|---|
committer | Victor Näslund <victor@sunet.se> | 2022-11-02 15:31:23 +0100 |
commit | 8baecf339e8061160bee519e87ffe837d1525c18 (patch) | |
tree | 22664c10f22382b1d4647b5f2e96bcea4220d879 /tests | |
parent | ffb26f4a81a9ca61c4105df037f7e1beb8dc5fb0 (diff) |
more freshup
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/test_api.py | 232 |
2 files changed, 232 insertions, 0 deletions
diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..371fcf2 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,232 @@ +import os +import time +import pytest +import random +import ipaddress + +from main import app +from fastapi import FastAPI +from fastapi import testclient + +client = testclient.TestClient(app) +JWT_TOKEN = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTY0MjE2ODkyMCwianRpIjoiNjM0NGFiNjEtMTIzZC00YWMyLTk3YjMtYmVlYTE2M2JiMWMwIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6InVzZXIxIiwibmJmIjoxNjQyMTY4OTIwLCJyZWFkIjpbInN1bmV0LnNlIl0sIndyaXRlIjpbInN1bmV0LnNlIl19._bX9EHI9h0Vjw75UvYvypqaH3AmsgaATFSUSOT-cYLZHrfMlxios3emr7cyKw-OV_BN5h_XNyrMBV1gIoqAk3A' +JWT_HEADER = {'Authorization': f'Bearer {JWT_TOKEN}'} + + +def test_001(): + print("*** Adding document.") + + doc_port = random.randint(1, 65536) + doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) + doc_asn = str(doc_ip) + '_' + str(doc_port) + + json_data = { + 'ip': doc_ip, + 'port': doc_port, + 'whois_description': 'unittest', + 'asn': doc_asn, + 'asn_country_code': 'SE', + 'ptr': 'unittest.example.com', + 'abuse_mail': 'unittest@example.com', + 'domain': 'sunet.se', + 'timestamp_in_utc': '2021-06-21T14:06UTC', + 'producer_unique_keys': { + 'subject_cn': 'unittest', + 'subject_o': 'unittest', + 'full_name': 'unittest', + 'end_of_general_support': False, + 'cve_2021_21972': 'unittest', + 'cve_2021_21974': 'unittest', + 'cve_2021_21985': 'unittest' + } + } + + response = client.post("/sc/v0/add", headers=JWT_HEADER, json=json_data) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + + response = client.get(f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['port'] == doc_port) + + response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['asn'] == doc_asn) + + response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['ip'] == doc_ip) + + +def test_002(): + nr_documents = 100 + starttime = time.time() + + for i in range(nr_documents): + doc_port = random.randint(1, 65536) + doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) + doc_asn = str(doc_ip) + '_' + str(doc_port) + + json_data = { + 'ip': doc_ip, + 'port': doc_port, + 'whois_description': 'unittest', + 'asn': doc_asn, + 'asn_country_code': 'SE', + 'ptr': 'unittest.example.com', + 'abuse_mail': 'unittest@example.com', + 'domain': 'sunet.se', + 'timestamp_in_utc': '2021-06-21T14:06UTC', + 'producer_unique_keys': { + 'subject_cn': 'unittest', + 'subject_o': 'unittest', + 'full_name': 'unittest', + 'end_of_general_support': False, + 'cve_2021_21972': 'unittest', + 'cve_2021_21974': 'unittest', + 'cve_2021_21985': 'unittest' + } + } + + response = client.post( + "/sc/v0/add", headers=JWT_HEADER, json=json_data) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + + response = client.get( + f"/sc/v0/get?port={doc_port}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['port'] == doc_port) + + response = client.get(f"/sc/v0/get?asn={doc_asn}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['asn'] == doc_asn) + + response = client.get(f"/sc/v0/get?ip={doc_ip}", headers=JWT_HEADER) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + assert(len(response.json()['docs']) == 1) + assert(response.json()['docs'][0]['ip'] == doc_ip) + + stop_time = str(time.time() - starttime) + print(f"*** Adding {nr_documents} documents took {stop_time} seconds.") + + +def test_003(): + response = client.get("/sc/v0/get", headers=JWT_HEADER) + assert(response.status_code == 200) + + for doc in response.json()['docs']: + doc_id = doc['_id'] + + response_doc = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) + assert(response_doc.status_code == 200) + assert(response_doc.json()['status'] == 'success') + assert(type(response_doc.json()['docs']) == type(dict())) + assert(response_doc.json()['docs']['domain'] == 'sunet.se') + + +def test_004(): + response = client.get("/sc/v0/get?limit=1000", headers=JWT_HEADER) + assert(response.status_code == 200) + + nr_documents = len(response.json()['docs']) + starttime = time.time() + + for doc in response.json()['docs']: + doc_id = doc['_id'] + response_doc = client.delete( + f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) + assert(response_doc.status_code == 200) + assert(response_doc.json()['status'] == 'success') + response_doc = client.get( + f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) + assert(response_doc.status_code == 200) + assert(response_doc.json()['status'] == 'success') + assert(response_doc.json()['docs'] == {}) + + stop_time = str(time.time() - starttime) + print(f"*** Removing {nr_documents} documents took {stop_time} seconds.") + + print("*** Removing document with invalid ID.") + response = client.delete( + "/sc/v0/delete/nonexistent", headers=JWT_HEADER) + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + + +def test_005(): + print("*** Accessing endpoints without JWT token...") + + response = client.get("/sc/v0/get?limit=1000") + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + + response = client.get("/sc/v0/get/unittest") + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + + response = client.post("/sc/v0/add", json={"data": "nothing"}) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + + response = client.delete("/sc/v0/delete/unittest") + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + +def test_006(): + print("*** Add doc for unauthorized domain (this is allowed, currently)") + + doc_port = random.randint(1, 65536) + doc_ip = str(ipaddress.IPv4Address(random.randint(1, 0xffffffff))) + doc_asn = str(doc_ip) + '_' + str(doc_port) + + json_data = { + 'ip': doc_ip, + 'port': doc_port, + 'whois_description': 'unittest', + 'asn': doc_asn, + 'asn_country_code': 'SE', + 'ptr': 'unittest.example.com', + 'abuse_mail': 'unittest@example.com', + 'domain': 'sunet.se', + 'timestamp_in_utc': '2021-06-21T14:06UTC', + 'producer_unique_keys': { + 'subject_cn': 'unittest', + 'subject_o': 'unittest', + 'full_name': 'unittest', + 'end_of_general_support': False, + 'cve_2021_21972': 'unittest', + 'cve_2021_21974': 'unittest', + 'cve_2021_21985': 'unittest' + } + } + + response = client.post( + "/sc/v0/add", headers=JWT_HEADER, json=dict(json_data, domain="example.com") + ) + assert(response.status_code == 200) + assert(response.json()['status'] == 'success') + + print("*** Get doc for unauthorized domain (not allowed)") + doc_id = response.json()['docs']['_id'] + response = client.get(f"/sc/v0/get/{doc_id}", headers=JWT_HEADER) + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + assert(response.json()['message'] == 'User not authorized to view this object') + + print("*** Delete doc for unauthorized domain (not allowed)") + response = client.delete(f"/sc/v0/delete/{doc_id}", headers=JWT_HEADER) + assert(response.status_code == 400) + assert(response.json()['status'] == 'error') + assert(response.json()['message'] == 'User not authorized to delete this object') |