diff options
-rw-r--r-- | tools/certtools.py | 24 | ||||
-rwxr-xr-x | tools/josef_experimental.py | 76 | ||||
-rwxr-xr-x | tools/josef_experimental_auditor.py | 49 |
3 files changed, 105 insertions, 44 deletions
diff --git a/tools/certtools.py b/tools/certtools.py index beb2812..3c52761 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -38,6 +38,30 @@ def get_cert_info(s): result[key] = value return result +def my_get_cert_info(s): + p = subprocess.Popen( + ["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + parsed = p.communicate(s) + if parsed[1]: + print "ERROR:", parsed[1] + sys.exit(1) + result = {} + prev = "" + for line in parsed[0].split("\n"): + if "Subject:" in line: + result["subject"] = line.split("Subject: ")[1] + if "Issuer:" in line: + result["issuer"] = line.split("Issuer: ")[1] + if "Subject Alternative Name" in prev: + result["SAN"] = line.lstrip() + if "Not After" in line: + result["not_after"] = line.split(": ")[1] + if "Not Before" in line: + result["not_before"] = line.split(": ")[1] + prev = line + return result def get_pemlike(filename, marker): return get_pemlike_from_file(open(filename), marker) diff --git a/tools/josef_experimental.py b/tools/josef_experimental.py index 6d95894..383e385 100755 --- a/tools/josef_experimental.py +++ b/tools/josef_experimental.py @@ -65,8 +65,7 @@ def get_proof_by_index(baseurl, index, tree_size): def my_get_cert_info(s): p = subprocess.Popen( - ["openssl", "x509", "-text", "-noout", - "-certopt", "no_header,no_version,no_serial,no_signame,no_validity,no_aux", "-inform", "der"], + ["openssl", "x509", "-fingerprint", "-text", "-noout", "-inform", "der"], # ["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -74,11 +73,24 @@ def my_get_cert_info(s): if parsed[1]: print "ERROR:", parsed[1] sys.exit(1) + # result = [] result = {} + prev = "" for line in parsed[0].split("\n"): - (key, sep, value) = line.partition("=") - if sep == "=": - result[key] = value + if "Subject:" in line: + result["subject"] = line.split("Subject: ")[1] + # print line.split("Subject: ")[1] + if "Issuer:" in line: + result["issuer"] = line.split("Issuer: ")[1] + # print line.split("Issuer: ")[1] + if "Subject Alternative Name" in prev: + result["SAN"] = line.lstrip() + # print line.lstrip() + if "Not After" in line: + result["not_after"] = line.split(": ")[1] + if "Not Before" in line: + result["not_before"] = line.split(": ")[1] + prev = line return result def read_sth(fn): @@ -124,20 +136,40 @@ monitored_domains = [ "symantec.com", ] -data = read_sth("plausible_cert_data.json") -ss = [] -for item in data: - try: - s = item["subject"].split("CN=")[1] - print s - except: - # if not item["subject"] in ss: - # print item["subject"] - # ss.append(item["subject"]) - pass - -print "\nTotal entries: " + str(len(data)) - - - - +# data = read_sth("plausible_cert_data.json") +# ss = [] +# for item in data: +# try: +# s = item["subject"].split("CN=")[1] +# print s +# except: +# # if not item["subject"] in ss: +# # print item["subject"] +# # ss.append(item["subject"]) +# pass + +# print "\nTotal entries: " + str(len(data)) + +base_url = base_urls[0] + +entries = get_entries(base_url, 11, 11)["entries"] +for item in entries: + orig_entry = extract_original_entry(item) + cert_info = my_get_cert_info(orig_entry[0][0]) + # prev = "" + # res = {} + # for line in cert_info: + # if "Subject:" in line: + # res["subject"] = line.split("Subject: ")[1] + # # print line.split("Subject: ")[1] + # if "Issuer:" in line: + # res["issuer"] = line.split("Issuer: ")[1] + # # print line.split("Issuer: ")[1] + # if "Subject Alternative Name" in prev: + # res["SAN"] = line.lstrip() + # # print line.lstrip() + # if "Not After" in line: + # res["not_after"] = line.split(": ")[1] + + # prev = line + print cert_info diff --git a/tools/josef_experimental_auditor.py b/tools/josef_experimental_auditor.py index 05f7a9a..78a3fe2 100755 --- a/tools/josef_experimental_auditor.py +++ b/tools/josef_experimental_auditor.py @@ -18,12 +18,12 @@ DEFAULT_CUR_FILE = 'all-sth.json' base_urls = [ # "https://plausible.ct.nordu.net/", # "https://ct1.digicert-ct.com/log/", - "https://ct.izenpe.com/", + # "https://ct.izenpe.com/", # "https://log.certly.io/", # "https://ct.googleapis.com/aviator/", # "https://ct.googleapis.com/pilot/", # "https://ct.googleapis.com/rocketeer/", - "https://ctlog.api.venafi.com/", + # "https://ctlog.api.venafi.com/", "https://ct.ws.symantec.com/", ] @@ -55,7 +55,7 @@ monitored_domains = [ # ".se", ] -cert_data = [] +# cert_data = [] # class cert(subject, issuer, log): # def __init__(self): @@ -205,17 +205,12 @@ def verify_inclusion_all(old, new): print time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url) -def check_domain(raw_entry): +def check_domain(raw_entry, log=None): orig_entry = extract_original_entry(raw_entry) - cert_info = get_cert_info(orig_entry[0][0]) - # for md in monitored_domains: - # if md in cert_info["subject"]: - # print md + " (" + cert_info["subject"].split("CN=")[1] + ") certifed by " + cert_info["issuer"] - try: - # print cert_info["subject"] + " certifed by " + cert_info["issuer"] - cert_data.append(cert_info) - except: - pass + cert_info = my_get_cert_info(orig_entry[0][0]) + if log: + cert_info["log"] = log[8:-1] # strip generic URL stuff + return cert_info def fetch_and_increment_subtree(old_sth, new_sth_in, subtree, base_url): @@ -228,10 +223,11 @@ def fetch_and_increment_subtree(old_sth, new_sth_in, subtree, base_url): pre_size = idx entries = get_entries(base_url, idx, new_sth["tree_size"]-1)["entries"] new_leafs = [] + tmp_cert_data = [] for item in entries: - check_domain(item) + tmp_cert_data.append(check_domain(item, base_url)) new_leafs.append(get_leaf_hash(base64.b64decode(item["leaf_input"]))) - # write_file("cert_data.json", cert_data) + append_file("cert_data.json", tmp_cert_data) idx += len(new_leafs) print time.strftime('%H:%M:%S') + " Got entries " + str(pre_size) + " to " \ + str(idx -1) + " (" + str(len(new_leafs)) +" entries) from " + base_url @@ -252,11 +248,12 @@ def fetch_and_build_subtree(old_sth, base_url): pre_size = idx entries = get_entries(base_url, idx, sth["tree_size"]-1)["entries"] new_leafs = [] + tmp_cert_data = [] for item in entries: - check_domain(item) + tmp_cert_data.append(check_domain(item, base_url)) new_leafs.append(get_leaf_hash(base64.b64decode(item["leaf_input"]))) idx += len(new_leafs) - # write_file("cert_data.json", cert_data) + append_file("cert_data.json", tmp_cert_data) print time.strftime('%H:%M:%S') + " Got entries " + str(pre_size) + " to " + str(idx) + " from " + base_url subtree = reduce_tree(new_leafs, subtree) @@ -269,7 +266,8 @@ def fetch_and_build_subtree(old_sth, base_url): def verify_subtree(old_sth, subtree, base_url): try: sth = old_sth[base_url] - root = base64.b64encode(reduce_subtree_to_root(subtree)[0]) + tmp = list(subtree) + root = base64.b64encode(reduce_subtree_to_root(tmp)[0]) if root == sth["sha256_root_hash"]: print time.strftime('%H:%M:%S') + " Verifying root hashes for " + base_url + "...OK." @@ -372,7 +370,16 @@ def write_file(fn, content): tempname = fn + ".new" open(tempname, 'w').write(json.dumps(content)) mv_file(tempname, fn) - print "wrote " + fn + # print "wrote " + fn + +def append_file(fn, content): + with open(fn, 'a') as f: + for item in content: + try: + f.write(json.dumps(item)) + except: + # print "failed to write " + str(item) + pass def main(args): @@ -436,19 +443,17 @@ def main(args): write_file(args.cur_sth, sth) if args.monitor: - # Run for one log only all_subtrees = {} print time.strftime('%H:%M:%S') + " Building trees from entries. This may take a while, go get coffee or something..." for url in base_urls: all_subtrees[url] = fetch_and_build_subtree(sth, url) verify_subtree(sth, all_subtrees[url], url) - while True: time.sleep(30) new_sth = fetch_all_sth() for url in base_urls: - if sth[url]["tree_size"] != new_sth[url]["tree_size"]: + if url in sth and url in new_sth and sth[url]["tree_size"] != new_sth[url]["tree_size"]: all_subtrees[url] = fetch_and_increment_subtree(sth, new_sth, all_subtrees[url], url) verify_subtree(new_sth, all_subtrees[url], url) sth = new_sth |