#!/usr/bin/python # -*- coding: utf-8 -*- import time import base64 import argparse # from pympler.asizeof import asizeof from certtools import * base_urls = ["https://plausible.ct.nordu.net/", "https://ct1.digicert-ct.com/log/", "https://ct.izenpe.com/", "https://log.certly.io/", "https://ct.googleapis.com/aviator/", "https://ct.googleapis.com/pilot/", "https://ct.googleapis.com/rocketeer/", ] logkeys = {} logkeys["https://plausible.ct.nordu.net/"] = get_public_key_from_file("../../plausible-logkey.pem") logkeys["https://ct.googleapis.com/rocketeer/"] = get_public_key_from_file("../../rocketeer-logkey.pem") logkeys["https://ct.googleapis.com/aviator/"] = get_public_key_from_file("../../aviator-logkey.pem") logkeys["https://ct.googleapis.com/pilot/"] = get_public_key_from_file("../../pilot-logkey.pem") logkeys["https://log.certly.io/"] = get_public_key_from_file("../../certly-logkey.pem") logkeys["https://ct.izenpe.com/"] = get_public_key_from_file("../../izenpe-logkey.pem") logkeys["https://ct1.digicert-ct.com/log/"] = get_public_key_from_file("../../digicert-logkey.pem") parser = argparse.ArgumentParser(description="") parser.add_argument('--audit', action='store_true', help="run lightweight auditor verifying consistency in STH") parser.add_argument('--audit2', action='store_true', help="run medium-weight auditor verifying consistency in STH and inclusion proofs of new entries") parser.add_argument('--build-sth', action='store_true', help="get all entries and construct STH") parser.add_argument('--verify-index', default=None, help="Verify a specific index in all logs" ) parser.add_argument('--verify-hash', action='store_true', help="Verify a specific index in all logs" ) def reduce_layer(layer): new_layer = [] while len(layer) > 1: e1 = layer.pop(0) e2 = layer.pop(0) new_layer.append(internal_hash((e1,e2))) return new_layer def reduce_tree(entries, layers): if len(entries) == 0 and layers is []: return [[hashlib.sha256().digest()]] layer_idx = 0 layers[layer_idx] += entries while len(layers[layer_idx]) > 1: if len(layers) == layer_idx + 1: layers.append([]) layers[layer_idx + 1] += reduce_layer(layers[layer_idx]) layer_idx += 1 return layers def reduce_subtree_to_root(layers): while len(layers) > 1: if len(layers[1]) == 0: layers[1] = layers[0] else: layers[1] += next_merkle_layer(layers[0]) del layers[0] if len(layers[0]) > 1: return next_merkle_layer(layers[0]) return layers[0] def fetch_all_sth(): sths = {} for base_url in base_urls: try: sths[base_url] = get_sth(base_url) except: print "Failed to retrieve STH from " + base_url continue try: check_sth_signature(base_url, sths[base_url], logkeys[base_url]) except: print "Could not verify signature from " + base_url + "!!!" continue return sths def verify_consistency(old, new): for url in old: # try: if old[url]["tree_size"]!= new[url]["tree_size"]: consistency_proof = get_consistency_proof(url, old[url]["tree_size"], new[url]["tree_size"]) decoded_consistency_proof = [] for item in consistency_proof: decoded_consistency_proof.append(base64.b64decode(item)) res = verify_consistency_proof(decoded_consistency_proof, old[url]["tree_size"], new[url]["tree_size"], old[url]["sha256_root_hash"]) if old[url]["sha256_root_hash"] != str(base64.b64encode(res[0])): print "Verification of old hash failed!!!" print old[url]["sha256_root_hash"], str(base64.b64encode(res[0])) elif new[url]["sha256_root_hash"] != str(base64.b64encode(res[1])): print "Verification of new hash failed!!!" print new[url]["sha256_root_hash"], str(base64.b64encode(res[1])) else: print time.strftime("%H:%M:%S", time.gmtime()) + " New STH from " + url + ", timestamp: " + str(new[url]["timestamp"]) + ", size: " + str(new[url]["tree_size"]) + "...OK." # except: # print "ERROR: Could not verify consistency for " + url def verify_inclusion_all(old, new): for url in old: try: if old[url]["tree_size"]!= new[url]["tree_size"]: entries = get_entries(url, old[url]["tree_size"]-1, new[url]["tree_size"] -1)["entries"] success = True for i in entries: h = get_leaf_hash(base64.b64decode(i["leaf_input"])) if not verify_inclusion_by_hash(url, h): success = False if success: print "Verifying inclusion for " + str(len(entries)) + " new entries in " + url + " ...OK" else: print "ERROR: Failed to prove inclusion of all new entries in " + url except: print "ERROR: Failed to prove inclusion of all new entries in " + url def fetch_and_build_tree(old_sth, base_url): sth = old_sth[base_url] subtree = [[]] idx = 0 print "Getting all entries from " + base_url while idx < sth["tree_size"]: pre_size = idx entries = get_entries(base_url, idx, sth["tree_size"]-1)["entries"] new_leafs = [] for item in entries: new_leafs.append(get_leaf_hash(base64.b64decode(item["leaf_input"]))) idx += len(new_leafs) print "Got entries " + str(pre_size) + " to " + str(idx) #+ " (tree size: " + str(asizeof(subtree)) + " B)" subtree = reduce_tree(new_leafs, subtree) root = base64.b64encode(reduce_subtree_to_root(subtree)[0]) if root == sth["sha256_root_hash"]: print "Verifying root hashes...OK." else: print "ERROR: Failed to verify root hashes!" print "STH root: " + sth["sha256_root_hash"] print "Tree root: " + root def verify_inclusion_by_hash(base_url, leaf_hash): try: tmp_sth = get_sth(base_url) proof = get_proof_by_hash(base_url, leaf_hash, tmp_sth["tree_size"]) decoded_inclusion_proof = [] for item in proof["audit_path"]: decoded_inclusion_proof.append(base64.b64decode(item)) root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, proof["leaf_index"], tmp_sth["tree_size"], leaf_hash)) if tmp_sth["sha256_root_hash"] == root: # print "Verifying inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url + "...OK." return True else: print "ERROR: Could not prove inclusion for entry " + str(proof["leaf_index"]) + " in " + base_url return False except: print "ERROR: Could not prove inclusion for hashed entry in " + base_url return False def verify_inclusion_by_index(base_url, index): try: tmp_sth = get_sth(base_url) proof = get_proof_by_index(base_url, index, tmp_sth["tree_size"]) decoded_inclusion_proof = [] for item in proof["audit_path"]: decoded_inclusion_proof.append(base64.b64decode(item)) root = base64.b64encode(verify_inclusion_proof(decoded_inclusion_proof, index, tmp_sth["tree_size"], get_leaf_hash(base64.b64decode(proof["leaf_input"])))) if tmp_sth["sha256_root_hash"] == root: print "Verifying inclusion for entry " + str(index) + " in " + base_url + "...OK." else: print "ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url except: print "ERROR: Could not prove inclusion for entry " + str(index) + " in " + base_url def get_proof_by_index(baseurl, index, tree_size): try: params = urllib.urlencode({"leaf_index":index, "tree_size":tree_size}) result = \ urlopen(baseurl + "ct/v1/get-entry-and-proof?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() sys.exit(0) def main(args): print "Started " + time.strftime("%H:%M:%S", time.gmtime()) if args.verify_index is None and not args.build_sth and not args.audit and not args.audit2 and not args.verify_hash: print "Nothing to do." return else: sth = fetch_all_sth() if args.verify_index is not None: for url in base_urls: verify_inclusion_by_index(url, int(args.verify_index)) if args.verify_hash: idx = 1337 url = base_urls[0] entries = get_entries(url, idx, idx)["entries"] h = get_leaf_hash(base64.b64decode(entries[0]["leaf_input"])) verify_inclusion_by_hash(url, h) if args.build_sth: print "Building trees from entries. This may take a while, go get coffee or something..." fetch_and_build_tree(sth, base_urls[2]) if args.audit: print "Running auditor for " +str(len(base_urls)) + " logs..." while True: time.sleep(1*60-4) new_sth = fetch_all_sth() verify_consistency(sth, new_sth) sth = new_sth if args.audit2: print "Running auditor2 for " +str(len(base_urls)) + " logs..." while True: time.sleep(1*60-4) new_sth = fetch_all_sth() verify_consistency(sth, new_sth) verify_inclusion_all(sth, new_sth) sth = new_sth print "Done. Exiting..." if __name__ == '__main__': main(parser.parse_args())