summaryrefslogtreecommitdiff
path: root/monitor/josef_lib.py
diff options
context:
space:
mode:
Diffstat (limited to 'monitor/josef_lib.py')
-rw-r--r--monitor/josef_lib.py124
1 files changed, 124 insertions, 0 deletions
diff --git a/monitor/josef_lib.py b/monitor/josef_lib.py
index 922636c..f401b2c 100644
--- a/monitor/josef_lib.py
+++ b/monitor/josef_lib.py
@@ -84,6 +84,21 @@ def check_domain(raw_entry, log=None):
except IndexError:
return None
+def check_domain_extended(raw_entry, log=None):
+ orig_entry = extract_original_entry(raw_entry)
+ try:
+ cert_info = my_get_more_cert_info(orig_entry[0][0])
+ # print len(orig_entry[0])
+ cert_info["chain_length"] = str(len(orig_entry[0]))
+ cert_info["validation"] = get_validation_type(cert_info["policy"])
+ cert_info["in_mozilla"] = validate_cert(orig_entry[0][-1])
+ # print my_get_all_cert_info(orig_entry[0][-1])
+ if log:
+ cert_info["log"] = log[8:-1] # strip generic URL stuff
+ return cert_info
+ except IndexError:
+ return None
+
def check_domain_all(raw_entry, log=None):
orig_entry = extract_original_entry(raw_entry)
try:
@@ -138,6 +153,26 @@ def append_file(fn, content):
except:
pass
+def validate_cert(s):
+ p = subprocess.Popen(
+ ["openssl", "x509", "-inform", "DER"],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ converted = p.communicate(s)
+ # print converted
+ p = subprocess.Popen(
+ ["openssl", "verify", "-x509_strict", "-CAfile", "certdata.txt"],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ parsed = p.communicate(converted[0])
+ # print parsed
+ res = parsed[0][7:-1]
+ if res == "OK":
+ return "OK"
+ else:
+ return res.split("\n")[-2]
+
+
def get_cert_info(s):
p = subprocess.Popen(
["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"],
@@ -181,6 +216,95 @@ def my_get_cert_info(s):
prev = line
return result
+def my_get_more_cert_info(s):
+ p = subprocess.Popen(
+ ["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ parsed = p.communicate(s)
+ if parsed[1]:
+ print "ERROR:", parsed[1]
+ # sys.exit(1)
+ raise Exception
+ result = {}
+ result["policy"] = []
+ prev = ""
+ for line in parsed[0].split("\n"):
+ if "Subject:" in line:
+ result["subject"] = line.split("Subject: ")[1]
+ if "Issuer:" in line:
+ result["issuer"] = line.split("Issuer: ")[1]
+ if "Signature Algorithm:" in line:
+ result["sig_algorithm"] = line.split("Signature Algorithm: ")[1]
+ if "Public Key Algorithm:" in line:
+ result["pubkey_algorithm"] = line.split("Public Key Algorithm: ")[1]
+ if "Subject Alternative Name" in prev:
+ result["SAN"] = line.lstrip()
+ if "Not After" in line:
+ result["not_after"] = line.split(": ")[1]
+ if "Not Before" in line:
+ result["not_before"] = line.split(": ")[1]
+ if "Policy:" in line:
+ result["policy"].append(line.split("Policy: ")[1])
+ prev = line
+ return result
+
+def get_validation_type(policy_list):
+ DV_list = ["2.23.140.1.2.1"]
+ OV_list = ["2.23.140.1.2.2"]
+ EV_list = [
+ "1.3.159.1.17.1",
+ "1.3.6.1.4.1.34697.2.1",
+ "1.3.6.1.4.1.34697.2.2",
+ "1.3.6.1.4.1.34697.2.3",
+ "1.3.6.1.4.1.34697.2.4",
+ "1.2.40.0.17.1.22",
+ "2.16.578.1.26.1.3.3",
+ "1.3.6.1.4.1.17326.10.14.2.1.2",
+ "1.3.6.1.4.1.17326.10.8.12.1.2",
+ "1.3.6.1.4.1.6449.1.2.1.5.1",
+ "2.16.840.1.114412.2.1",
+ "2.16.840.1.114412.1.3.0.2",
+ "2.16.528.1.1001.1.1.1.12.6.1.1.1",
+ "2.16.792.3.0.4.1.1.4",
+ "2.16.840.1.114028.10.1.2",
+ "0.4.0.2042.1.4",
+ "0.4.0.2042.1.5",
+ "1.3.6.1.4.1.13177.10.1.3.10",
+ "1.3.6.1.4.1.14370.1.6",
+ "1.3.6.1.4.1.4146.1.1",
+ "2.16.840.1.114413.1.7.23.3",
+ "1.3.6.1.4.1.14777.6.1.1",
+ "2.16.792.1.2.1.1.5.7.1.9",
+ "1.3.6.1.4.1.22234.2.5.2.3.1",
+ "1.3.6.1.4.1.782.1.2.1.8.1",
+ "1.3.6.1.4.1.8024.0.2.100.1.2",
+ "1.2.392.200091.100.721.1",
+ "2.16.840.1.114414.1.7.23.3",
+ "1.3.6.1.4.1.23223.2",
+ "1.3.6.1.4.1.23223.1.1.1",
+ "2.16.756.1.83.21.0",
+ "2.16.756.1.89.1.2.1.1",
+ "2.16.840.1.113733.1.7.48.1",
+ "2.16.840.1.114404.1.1.2.4.1",
+ "2.16.840.1.113733.1.7.23.6",
+ "1.3.6.1.4.1.6334.1.100.1",
+ "2.16.840.1.114171.500.9",
+ "1.3.6.1.4.1.36305.2"
+]
+
+ status = "DV"
+
+ for policy in policy_list:
+ if policy in EV_list:
+ status = "EV"
+ if policy in OV_list:
+ status = "OV"
+ if policy in DV_list:
+ status = "DV"
+ return status
+
+
def my_get_all_cert_info(s):
p = subprocess.Popen(
["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"],