diff options
Diffstat (limited to 'monitor/josef_lib.py')
-rw-r--r-- | monitor/josef_lib.py | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/monitor/josef_lib.py b/monitor/josef_lib.py index 922636c..f401b2c 100644 --- a/monitor/josef_lib.py +++ b/monitor/josef_lib.py @@ -84,6 +84,21 @@ def check_domain(raw_entry, log=None): except IndexError: return None +def check_domain_extended(raw_entry, log=None): + orig_entry = extract_original_entry(raw_entry) + try: + cert_info = my_get_more_cert_info(orig_entry[0][0]) + # print len(orig_entry[0]) + cert_info["chain_length"] = str(len(orig_entry[0])) + cert_info["validation"] = get_validation_type(cert_info["policy"]) + cert_info["in_mozilla"] = validate_cert(orig_entry[0][-1]) + # print my_get_all_cert_info(orig_entry[0][-1]) + if log: + cert_info["log"] = log[8:-1] # strip generic URL stuff + return cert_info + except IndexError: + return None + def check_domain_all(raw_entry, log=None): orig_entry = extract_original_entry(raw_entry) try: @@ -138,6 +153,26 @@ def append_file(fn, content): except: pass +def validate_cert(s): + p = subprocess.Popen( + ["openssl", "x509", "-inform", "DER"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + converted = p.communicate(s) + # print converted + p = subprocess.Popen( + ["openssl", "verify", "-x509_strict", "-CAfile", "certdata.txt"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + parsed = p.communicate(converted[0]) + # print parsed + res = parsed[0][7:-1] + if res == "OK": + return "OK" + else: + return res.split("\n")[-2] + + def get_cert_info(s): p = subprocess.Popen( ["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"], @@ -181,6 +216,95 @@ def my_get_cert_info(s): prev = line return result +def my_get_more_cert_info(s): + p = subprocess.Popen( + ["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + parsed = p.communicate(s) + if parsed[1]: + print "ERROR:", parsed[1] + # sys.exit(1) + raise Exception + result = {} + result["policy"] = [] + prev = "" + for line in parsed[0].split("\n"): + if "Subject:" in line: + result["subject"] = line.split("Subject: ")[1] + if "Issuer:" in line: + result["issuer"] = line.split("Issuer: ")[1] + if "Signature Algorithm:" in line: + result["sig_algorithm"] = line.split("Signature Algorithm: ")[1] + if "Public Key Algorithm:" in line: + result["pubkey_algorithm"] = line.split("Public Key Algorithm: ")[1] + if "Subject Alternative Name" in prev: + result["SAN"] = line.lstrip() + if "Not After" in line: + result["not_after"] = line.split(": ")[1] + if "Not Before" in line: + result["not_before"] = line.split(": ")[1] + if "Policy:" in line: + result["policy"].append(line.split("Policy: ")[1]) + prev = line + return result + +def get_validation_type(policy_list): + DV_list = ["2.23.140.1.2.1"] + OV_list = ["2.23.140.1.2.2"] + EV_list = [ + "1.3.159.1.17.1", + "1.3.6.1.4.1.34697.2.1", + "1.3.6.1.4.1.34697.2.2", + "1.3.6.1.4.1.34697.2.3", + "1.3.6.1.4.1.34697.2.4", + "1.2.40.0.17.1.22", + "2.16.578.1.26.1.3.3", + "1.3.6.1.4.1.17326.10.14.2.1.2", + "1.3.6.1.4.1.17326.10.8.12.1.2", + "1.3.6.1.4.1.6449.1.2.1.5.1", + "2.16.840.1.114412.2.1", + "2.16.840.1.114412.1.3.0.2", + "2.16.528.1.1001.1.1.1.12.6.1.1.1", + "2.16.792.3.0.4.1.1.4", + "2.16.840.1.114028.10.1.2", + "0.4.0.2042.1.4", + "0.4.0.2042.1.5", + "1.3.6.1.4.1.13177.10.1.3.10", + "1.3.6.1.4.1.14370.1.6", + "1.3.6.1.4.1.4146.1.1", + "2.16.840.1.114413.1.7.23.3", + "1.3.6.1.4.1.14777.6.1.1", + "2.16.792.1.2.1.1.5.7.1.9", + "1.3.6.1.4.1.22234.2.5.2.3.1", + "1.3.6.1.4.1.782.1.2.1.8.1", + "1.3.6.1.4.1.8024.0.2.100.1.2", + "1.2.392.200091.100.721.1", + "2.16.840.1.114414.1.7.23.3", + "1.3.6.1.4.1.23223.2", + "1.3.6.1.4.1.23223.1.1.1", + "2.16.756.1.83.21.0", + "2.16.756.1.89.1.2.1.1", + "2.16.840.1.113733.1.7.48.1", + "2.16.840.1.114404.1.1.2.4.1", + "2.16.840.1.113733.1.7.23.6", + "1.3.6.1.4.1.6334.1.100.1", + "2.16.840.1.114171.500.9", + "1.3.6.1.4.1.36305.2" +] + + status = "DV" + + for policy in policy_list: + if policy in EV_list: + status = "EV" + if policy in OV_list: + status = "OV" + if policy in DV_list: + status = "DV" + return status + + def my_get_all_cert_info(s): p = subprocess.Popen( ["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"], |