diff options
Diffstat (limited to 'tools/josef_experimental_auditor.py')
-rwxr-xr-x | tools/josef_experimental_auditor.py | 476 |
1 files changed, 0 insertions, 476 deletions
diff --git a/tools/josef_experimental_auditor.py b/tools/josef_experimental_auditor.py deleted file mode 100755 index 57ef9cb..0000000 --- a/tools/josef_experimental_auditor.py +++ /dev/null @@ -1,476 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- - -import time -import datetime -import base64 -import argparse -import errno -from certtools import * - -NAGIOS_OK = 0 -NAGIOS_WARN = 1 -NAGIOS_CRIT = 2 -NAGIOS_UNKNOWN = 3 - -DEFAULT_CUR_FILE = 'all-sth.json' - -base_urls = [ - "https://plausible.ct.nordu.net/", - "https://ct1.digicert-ct.com/log/", - "https://ct.izenpe.com/", - "https://log.certly.io/", - "https://ct.googleapis.com/aviator/", - "https://ct.googleapis.com/pilot/", - "https://ct.googleapis.com/rocketeer/", - "https://ct.ws.symantec.com/", - # "https://ctlog.api.venafi.com/", - ] - -# logkeys = {} -# logkeys["https://plausible.ct.nordu.net/"] = get_public_key_from_file("../../plausible-logkey.pem") -# logkeys["https://ct.googleapis.com/rocketeer/"] = get_public_key_from_file("../../rocketeer-logkey.pem") -# logkeys["https://ct.googleapis.com/aviator/"] = get_public_key_from_file("../../aviator-logkey.pem") -# logkeys["https://ct.googleapis.com/pilot/"] = get_public_key_from_file("../../pilot-logkey.pem") -# logkeys["https://log.certly.io/"] = get_public_key_from_file("../../certly-logkey.pem") -# logkeys["https://ct.izenpe.com/"] = get_public_key_from_file("../../izenpe-logkey.pem") -# logkeys["https://ct.ws.symantec.com/"] = get_public_key_from_file("../../symantec-logkey.pem") -# logkeys["https://ctlog.api.venafi.com/"] = get_public_key_from_file("../../venafi-logkey.pem") -# logkeys["https://ct1.digicert-ct.com/log/"] = get_public_key_from_file("../../digicert-logkey.pem") - -parser = argparse.ArgumentParser(description="") -parser.add_argument('--audit', action='store_true', help="run lightweight auditor verifying consistency in STH") -parser.add_argument('--audit2', action='store_true', help="run medium-weight auditor verifying consistency in STH and inclusion proofs of new entries") -parser.add_argument('--audit3', action='store_true', help="continously run medium-weight auditor verifying consistency in STH and inclusion proofs of new entries") -parser.add_argument('--audit4', action='store_true', help="run one check on one server") -parser.add_argument('--build-sth', action='store_true', help="get all entries and construct STH") -parser.add_argument('--verify-index', default=None, help="Verify a specific index in all logs" ) -# parser.add_argument('--verify-hash', action='store_true', help="Verify an entry hash in all logs" ) -parser.add_argument('--host', default=None, help="Base URL for CT log") -parser.add_argument('--roots', action='store_true', help="Check accepted root certificates for all logs" ) -parser.add_argument('--cur-sth', - metavar='file', - default=DEFAULT_CUR_FILE, - help="File containing current STH (default=%s)" % DEFAULT_CUR_FILE) - -timings = {} -errors = [] - -class UTC(datetime.tzinfo): - def utcoffset(self, dt): - return datetime.timedelta(hours=0) - def dst(self, dt): - return datetime.timedelta(0) - -def reduce_layer(layer): - new_layer = [] - while len(layer) > 1: - e1 = layer.pop(0) - e2 = layer.pop(0) - new_layer.append(internal_hash((e1,e2))) - return new_layer - -def reduce_tree(entries, layers): - if len(entries) == 0 and layers is []: - return [[hashlib.sha256().digest()]] - - layer_idx = 0 - layers[layer_idx] += entries - - while len(layers[layer_idx]) > 1: - if len(layers) == layer_idx + 1: - layers.append([]) - - layers[layer_idx + 1] += reduce_layer(layers[layer_idx]) - layer_idx += 1 - return layers - -def reduce_subtree_to_root(layers): - while len(layers) > 1: - if len(layers[1]) == 0: - layers[1] = layers[0] - else: - layers[1] += next_merkle_layer(layers[0]) - del layers[0] - - if len(layers[0]) > 1: - return next_merkle_layer(layers[0]) - return layers[0] - -def fetch_all_sth(): - sths = {} - for base_url in base_urls: - # Fetch STH - try: - sths[base_url] = get_sth(base_url) - except: - sths[base_url] = None - error_str = time.strftime('%H:%M:%S') + " ERROR: Failed to retrieve STH from " + base_url - print error_str - errors.append(error_str) - continue - - # Check signature on the STH - try: - # check_sth_signature(base_url, sths[base_url], logkeys[base_url]) - check_sth_signature(base_url, sths[base_url], None) - except: - error_str = time.strftime('%H:%M:%S') + " ERROR: Could not verify signature from " + base_url - print error_str - errors.append(error_str) - continue - - # Add timing info - # try: - # if base_url not in timings: - # timings[base_url] = {"last":sths[base_url]["timestamp"], "longest":0} - # else: - # then = datetime.datetime.fromtimestamp(int(timings[base_url]["last"])/1000) - # now = datetime.datetime.fromtimestamp(int(sths[base_url]["timestamp"])/1000) - # tdelta = now - then - - # timings[base_url]["last"] = sths[base_url]["timestamp"] - - # if tdelta.total_seconds() > timings[base_url]["longest"]: - # timings[base_url]["longest"] = tdelta.total_seconds() - - # except Exception, err: - # print Exception, err - # print time.strftime('%H:%M:%S') + "ERROR: Failed to set TIME info for STH" - - return sths - -def verify_progress(old, new): - print "Verifying progress" - try: - for url in new: - if new and old and new[url] and old[url]: - if new[url]["tree_size"] == old[url]["tree_size"]: - if old[url]["sha256_root_hash"] != new[url]["sha256_root_hash"]: - errors.append(time.strftime('%H:%M:%S') + " CRITICAL: root hash is different for same tree size in " + url) - # print "tree size:", newsth["tree_size"], - # print "old hash:", b64_to_b16(oldsth["sha256_root_hash"]) - # print "new hash:", b64_to_b16(newsth["sha256_root_hash"]) - # sys.exit(NAGIOS_CRIT) - # TODO - elif new[url]["tree_size"] < old[url]["tree_size"]: - # if not args.allow_lag: - errors.append(time.strftime('%H:%M:%S') + " CRITICAL: new tree smaller than previous tree (%d < %d)" % \ - (new[url]["tree_size"], old[url]["tree_size"])) - # sys.exit(NAGIOS_CRIT) - if new[url]: - age = time.time() - new[url]["timestamp"]/1000 - sth_time = datetime.datetime.fromtimestamp(new[url]['timestamp'] / 1000, UTC()).strftime("%Y-%m-%d %H:%M:%S") - # roothash = b64_to_b16(sth['sha256_root_hash']) - roothash = new[url]['sha256_root_hash'] - if age > 24 * 3600: - errors.append(time.strftime('%H:%M:%S') + " CRITICAL: %s is older than 24h: %s UTC" % (url, sth_time)) - elif age > 12 * 3600: - errors.append(time.strftime('%H:%M:%S') + " WARNING: %s is older than 12h: %s UTC" % (url, sth_time)) - elif age > 6 * 3600: - errors.append(time.strftime('%H:%M:%S') + " WARNING: %s is older than 6h: %s UTC" % (url, sth_time)) - # elif age > 2 * 3600: - # errors.append(time.strftime('%H:%M:%S') + " WARNING: %s is older than 2h: %s UTC" % (url, sth_time)) - except: - print time.strftime('%H:%M:%S') + " ERROR: Failed to verify progress for " + url - - -def verify_consistency(old, new): - for url in old: - try: - if old[url] and new[url] and old[url]["tree_size"]!= new[url]["tree_size"]: - consistency_proof = get_consistency_proof(url, old[url]["tree_size"], new[url]["tree_size"]) - decoded_consistency_proof = [] - for item in consistency_proof: - decoded_consistency_proof.append(base64.b64decode(item)) - res = verify_consistency_proof(decoded_consistency_proof, old[url]["tree_size"], new[url]["tree_size"], old[url]["sha256_root_hash"]) - - if old[url]["sha256_root_hash"] != str(base64.b64encode(res[0])): - print time.strftime('%H:%M:%S') + " Verification of old hash failed! " + old[url]["sha256_root_hash"], str(base64.b64encode(res[0])) - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify consistency for " + url + ", tree size " + old[url]["tree_size"]) - elif new[url]["sha256_root_hash"] != str(base64.b64encode(res[1])): - print time.strftime('%H:%M:%S') + " Verification of new hash failed! " + new[url]["sha256_root_hash"], str(base64.b64encode(res[1])) - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify consistency for " + url + ", tree size " + new[url]["tree_size"]) - else: - print time.strftime("%H:%M:%S") + " New STH from " + url + ", timestamp: " + \ - str(new[url]["timestamp"]) + ", size: " + str(new[url]["tree_size"]) + "...OK." - - except: - print "ERROR: Could not verify consistency for " + url - -def verify_inclusion_all(old, new): - for url in old: - try: - if old[url] and new[url]: - if old[url]["tree_size"]!= new[url]["tree_size"]: - entries = [] - - while len(entries) + old[url]["tree_size"]!= new[url]["tree_size"]: - entries += get_entries(url, str(int(old[url]["tree_size"]) + len(entries)), new[url]["tree_size"] -1)["entries"] - print "Got " + str(len(entries)) + " entries..." - - success = True - for i in entries: - h = get_leaf_hash(base64.b64decode(i["leaf_input"])) - if not verify_inclusion_by_hash(url, h): - success = False - - if success: - print time.strftime("%H:%M:%S") + " Verifying inclusion for " + str(len(entries)) + " new entries in " + url + " ...OK" - else: - print time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url) - except: - print time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to prove inclusion of all new entries in " + url) - -def fetch_and_build_tree(old_sth, base_url): - try: - sth = old_sth[base_url] - subtree = [[]] - idx = 0 - - res_strings = [""] - - print time.strftime('%H:%M:%S') + " Getting all entries from " + base_url - while idx < sth["tree_size"]: - pre_size = idx - entries = get_entries(base_url, idx, sth["tree_size"]-1)["entries"] - - new_leafs = [] - for item in entries: - new_leafs.append(get_leaf_hash(base64.b64decode(item["leaf_input"]))) - idx += len(new_leafs) - print time.strftime('%H:%M:%S') + " Got entries " + str(pre_size) + " to " + str(idx) + " from " + base_url - subtree = reduce_tree(new_leafs, subtree) - - root = base64.b64encode(reduce_subtree_to_root(subtree)[0]) - - if root == sth["sha256_root_hash"]: - print time.strftime('%H:%M:%S') + " Verifying root hashes for " + base_url + "...OK." - res_strings.append("STH for " + base_url + " built successfully.") - else: - print time.strftime('%H:%M:%S') + " ERROR: Failed to verify root hashes! STH root: " + sth["sha256_root_hash"] + ", Tree root: " + root - res_strings.append(time.strftime('%H:%M:%S') + " " + base_url + " Failed! STH root: " + sth["sha256_root_hash"] + " Calculated root: " + root) - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to verify root hash for " + base_url + ", tre size " + sth["tree_size"]) - - for item in res_strings: - print item + "\n" - - except: - print time.strftime('%H:%M:%S') + " ERROR: Failed to build STH for " + base_url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Failed to build STH for " + base_url) - -def verify_inclusion_by_hash(base_url, leaf_hash): - try: - tmp_sth = get_sth(base_url) - proof = get_proof_by_hash(base_url, leaf_hash, tmp_sth["tree_size"]) - - decoded_inclusion_proof = [] - for item in proof["audit_path"]: - decoded_inclusion_proof.append(base64.b64decode(item)) - - root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, proof["leaf_index"], tmp_sth["tree_size"], leaf_hash)) - - if tmp_sth["sha256_root_hash"] == root: - # print "Verifying inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url + "...OK." - return True - else: - print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url) - return False - except: - print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for hashed entry in " + base_url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for hashed entry in " + base_url) - return False - -def verify_inclusion_by_index(base_url, index): - try: - tmp_sth = get_sth(base_url) - proof = get_proof_by_index(base_url, index, tmp_sth["tree_size"]) - - decoded_inclusion_proof = [] - for item in proof["audit_path"]: - decoded_inclusion_proof.append(base64.b64decode(item)) - - root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, index, tmp_sth["tree_size"], get_leaf_hash(base64.b64decode(proof["leaf_input"])))) - - if tmp_sth["sha256_root_hash"] == root: - print time.strftime('%H:%M:%S') + " Verifying inclusion for entry " + str(index) + " in " + base_url + "...OK." - else: - print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url) - except: - print time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url - errors.append(time.strftime('%H:%M:%S') + " ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url) - -def get_proof_by_index(baseurl, index, tree_size): - try: - params = urllib.urlencode({"leaf_index":index, - "tree_size":tree_size}) - result = \ - urlopen(baseurl + "ct/v1/get-entry-and-proof?" + params).read() - return json.loads(result) - except urllib2.HTTPError, e: - print "ERROR:", e.read() - sys.exit(0) - -def get_all_roots(base_url): - # print "Fetching roots from " + base_url - result = urlopen(base_url + "ct/v1/get-roots").read() - certs = json.loads(result)["certificates"] - print time.strftime('%H:%M:%S') + " Received " + str(len(certs)) + " certs from " + base_url - - for accepted_cert in certs: - subject = get_cert_info(base64.decodestring(accepted_cert))["subject"] - issuer = get_cert_info(base64.decodestring(accepted_cert))["issuer"] - if subject == issuer: - root_cert = base64.decodestring(accepted_cert) - print get_cert_info(root_cert)["subject"] - -def print_errors(errors): - print "Encountered " + str(len(errors)) + " errors:" - for item in errors: - print item - -def print_timings(timings): - for item in timings: - m,s = divmod(timings[item]["longest"], 60) - h,m = divmod(m, 60) - print item + " last seen " + datetime.datetime.fromtimestamp(int(timings[item]["last"])/1000).strftime('%Y-%m-%d %H:%M:%S') \ - + " longest between two STH: " + str(int(h)) + "h " + str(int(m)) + "m "# + str(int(s)) + "s." - - -def read_sth(fn): - try: - f = open(fn) - except IOError, e: - if e.errno == errno.ENOENT: - return None - raise e - return json.loads(f.read()) - - -def write_file(fn, sth): - tempname = fn + ".new" - open(tempname, 'w').write(json.dumps(sth)) - mv_file(tempname, fn) - - -def main(args): - - # print time.strftime("%H:%M:%S") + " Starting..." - if args.verify_index is None and not args.build_sth and not args.audit and not args.audit2 \ - and not args.audit3 and not args.audit4 and not args.roots: - - print time.strftime('%H:%M:%S') + " Nothing to do." - return - elif args.audit4: - pass - else: - sth = fetch_all_sth() - - if args.verify_index is not None: - for url in base_urls: - verify_inclusion_by_index(url, int(args.verify_index)) - - # if args.verify_hash: - # idx = 1337 - # url = base_urls[0] - # entries = get_entries(url, idx, idx)["entries"] - # h = get_leaf_hash(base64.b64decode(entries[0]["leaf_input"])) - # verify_inclusion_by_hash(url, h) - - if args.roots: - print time.strftime('%H:%M:%S') + " Getting accepted Root Certs from all logs..." - for url in base_urls: - get_all_roots(url) - - - if args.build_sth: - print time.strftime('%H:%M:%S') + " Building trees from entries. This may take a while, go get coffee or something..." - for base_url in base_urls: - fetch_and_build_tree(sth, base_url) - # fetch_and_build_tree(sth, base_urls[2]) - - if args.audit: - print time.strftime('%H:%M:%S') + " Running auditor1 for " +str(len(base_urls)) + " logs..." - old_sth = read_sth(args.cur_sth) - if old_sth: - verify_consistency(old_sth, sth) - else: - print "No old sth found..." - write_file(args.cur_sth, sth) - - - if args.audit3: - print time.strftime('%H:%M:%S') + " Running auditor3 for " +str(len(base_urls)) + " logs..." - while True: - time.sleep(30) - new_sth = fetch_all_sth() - verify_consistency(sth, new_sth) - verify_inclusion_all(sth, new_sth) - sth = new_sth - - if args.audit2: - print time.strftime('%H:%M:%S') + " Running auditor2 for " +str(len(base_urls)) + " logs..." - old_sth = read_sth(args.cur_sth) - # print "Verifying progress..." - verify_progress(old_sth, sth) - if old_sth: - print "Verifying consistency..." - verify_consistency(old_sth, sth) - print "Verifying inclusion..." - verify_inclusion_all(old_sth, sth) - write_file(args.cur_sth, sth) - - # Experimental for plausible + nagios - if args.audit4: - base_url = base_urls[0] - old_sth = read_sth("plausible-sth.json") - print "Running auditor4 for " + base_url - try: - tmp_sth = get_sth(base_url) - except: - # sths[base_url] = None - error_str = time.strftime('%H:%M:%S') + " ERROR: Failed to retrieve STH from " + base_url - print error_str - errors.append(error_str) - sys.exit(NAGIOS_WARN) - - # Check signature on the STH - try: - check_sth_signature(base_url, tmp_sth, None) - write_file("plausible-sth.json", tmp_sth) - except: - error_str = time.strftime('%H:%M:%S') + " ERROR: Could not verify signature from " + base_url - print error_str - errors.append(error_str) - sys.exit(NAGIOS_CRIT) - sys.exit(NAGIOS_OK) - - -if __name__ == '__main__': - # try: - main(parser.parse_args()) - if len(errors) == 0: - print time.strftime('%H:%M:%S') + " Everything OK." - sys.exit(NAGIOS_OK) - else: - # print "errors found!" - print_errors(errors) - sys.exit(NAGIOS_WARN) - # except: - # pass - # finally: - # # print_timings(timings) - # print_errors(errors) - - - - - - - - - |