From 188035590e105df928bac47ac97f1fe8ced17123 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Sun, 27 Sep 2015 11:40:47 +0200 Subject: make tests work(s) --- tools/certtools.py | 2 +- tools/comparecert.py | 35 ++++++++++---------------------- tools/fetchallcerts.py | 10 ++------- tools/submitcert.py | 31 +++++++--------------------- tools/testcase1.py | 55 ++++++++++++++++++-------------------------------- 5 files changed, 41 insertions(+), 92 deletions(-) diff --git a/tools/certtools.py b/tools/certtools.py index 6cb4f55..2165781 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -179,7 +179,7 @@ def unpack_tls_array(packed_data, length_len): def add_chain(baseurl, submission): try: - result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() + result = urlopen(baseurl + "ct/v1/add-blob", json.dumps(submission)).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR", e.code,":", e.read() diff --git a/tools/comparecert.py b/tools/comparecert.py index 81893f7..e6864b6 100755 --- a/tools/comparecert.py +++ b/tools/comparecert.py @@ -20,31 +20,18 @@ import signal import select import zipfile -def readfile(filename): - contents = open(filename).read() - certchain = get_certs_from_string(contents) - precerts = get_precerts_from_string(contents) - return (certchain, precerts) +def readfile(filename, filetype): + if filetype == 'raw': + return open(filename, 'r').read() + else: + return get_pemlike(filename, filetype) def testcerts(template, test): - (certchain1, precerts1) = template - (certchain2, precerts2) = test + blob1 = template + blob2 = test - if precerts1 != precerts2: - return (False, "precerts are different") - - if certchain1 == certchain2: - return (True, "") - - if len(certchain2) == len(certchain1) + 1: - if certchain2[:-1] != certchain1: - return (False, "certchains are different") - last_issuer = get_cert_info(certchain1[-1])["issuer"] - root_subject = get_cert_info(certchain2[-1])["subject"] - if last_issuer == root_subject: - return (True, "fetched chain has an appended root cert") - else: - return (False, "fetched chain has an extra entry") + if blob1 == blob2: + return (True, "equal") return (False, "certchains are different") @@ -53,9 +40,9 @@ parser.add_argument('templates', help="Test templates, separated with colon") parser.add_argument('test', help="Files to test, separated with colon") args = parser.parse_args() -templates = [readfile(filename) for filename in args.templates.split(":")] +templates = [readfile(filename, 'raw') for filename in args.templates.split(":")] -tests = [readfile(filename) for filename in args.test.split(":")] +tests = [readfile(filename, 'BLOB')[0] for filename in args.test.split(":")] for test in tests: diff --git a/tools/fetchallcerts.py b/tools/fetchallcerts.py index 66fde74..169764f 100755 --- a/tools/fetchallcerts.py +++ b/tools/fetchallcerts.py @@ -129,16 +129,10 @@ else: leaf_input = base64.decodestring(entry["leaf_input"]) leaf_hash = get_leaf_hash(leaf_input) s += "Leafhash: %s\n" % base64.b16encode(leaf_hash) - if issuer_key_hash: - s += "-----BEGIN PRECERTIFICATE-----\n" - s += base64.encodestring(chain[0]).rstrip() + "\n" - s += "-----END PRECERTIFICATE-----\n" - s += "\n" - chain = chain[1:] for cert in chain: - s += "-----BEGIN CERTIFICATE-----\n" + s += "-----BEGIN BLOB-----\n" s += base64.encodestring(cert).rstrip() + "\n" - s += "-----END CERTIFICATE-----\n" + s += "-----END BLOB-----\n" s += "\n" zf.writestr("%08d" % i, s) except AssertionError, e: diff --git a/tools/submitcert.py b/tools/submitcert.py index 3b14912..91d2111 100755 --- a/tools/submitcert.py +++ b/tools/submitcert.py @@ -49,31 +49,14 @@ else: sth = get_sth(baseurl) -def submitcert((certfile, cert)): +def submitcert((certfile, blob)): timing = timing_point() - certchain = get_certs_from_string(cert) - precerts = get_precerts_from_string(cert) - assert len(precerts) == 0 or len(precerts) == 1 - precert = precerts[0] if precerts else None timing_point(timing, "readcerts") try: - if precert: - if ext_key_usage_precert_signing_cert in get_ext_key_usage(certchain[0]): - issuer_key_hash = get_cert_key_hash(certchain[1]) - issuer = certchain[1] - else: - issuer_key_hash = get_cert_key_hash(certchain[0]) - issuer = None - cleanedcert = cleanprecert(precert, issuer=issuer) - signed_entry = pack_precert(cleanedcert, issuer_key_hash) - leafcert = cleanedcert - result = add_prechain(baseurl, {"chain":map(base64.b64encode, [precert] + certchain)}) - else: - signed_entry = pack_cert(certchain[0]) - leafcert = certchain[0] - issuer_key_hash = None - result = add_chain(baseurl, {"chain":map(base64.b64encode, certchain)}) + signed_entry = pack_cert(blob) + issuer_key_hash = None + result = add_chain(baseurl, {"blob":base64.b64encode(blob)}) except SystemExit: print "EXIT:", certfile select.select([], [], [], 1.0) @@ -87,7 +70,7 @@ def submitcert((certfile, cert)): try: if args.check_sct: - check_sct_signature(baseurl, signed_entry, result, precert=precert, publickey=logpublickey) + check_sct_signature(baseurl, signed_entry, result, publickey=logpublickey) timing_point(timing, "checksig") except AssertionError, e: print "ERROR:", certfile, e @@ -101,7 +84,7 @@ def submitcert((certfile, cert)): if lookup_in_log: - merkle_tree_leaf = pack_mtl(result["timestamp"], leafcert) + merkle_tree_leaf = pack_mtl(result["timestamp"], blob) leaf_hash = get_leaf_hash(merkle_tree_leaf) @@ -139,7 +122,7 @@ def submitcert((certfile, cert)): print "and submitted chain has length", len(submittedcertchain) timing_point(timing, "lookup") - return ((leafcert, issuer_key_hash, result), timing["deltatimes"]) + return ((blob, issuer_key_hash, result), timing["deltatimes"]) def get_ncerts(certfiles): n = 0 diff --git a/tools/testcase1.py b/tools/testcase1.py index 697cc99..1a294d9 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -22,11 +22,14 @@ certfiles = ["../tools/testcerts/cert1.txt", "../tools/testcerts/cert2.txt", "../tools/testcerts/cert3.txt", "../tools/testcerts/cert4.txt", "../tools/testcerts/cert5.txt"] -cc1 = get_certs_from_file(certfiles[0]) -cc2 = get_certs_from_file(certfiles[1]) -cc3 = get_certs_from_file(certfiles[2]) -cc4 = get_certs_from_file(certfiles[3]) -cc5 = get_certs_from_file(certfiles[4]) +def get_blob_from_file(filename): + return [open(filename, 'r').read()] + +cc1 = get_blob_from_file(certfiles[0]) +cc2 = get_blob_from_file(certfiles[1]) +cc3 = get_blob_from_file(certfiles[2]) +cc4 = get_blob_from_file(certfiles[3]) +cc5 = get_blob_from_file(certfiles[4]) create_ssl_context(cafile=cacertfile) @@ -54,7 +57,8 @@ def assert_equal(actual, expected, name, quiet=False, nodata=False, fatal=False) if nodata: print_error("%s differs", name) else: - print_error("%s expected %s got %s", name, expected, actual) + print_error("%s expected %s got %s", name, repr(expected), + repr(actual)) if fatal: sys.exit(1) elif not quiet: @@ -74,12 +78,13 @@ def print_and_check_tree_size(expected, baseurl): def do_add_chain(chain, baseurl): global failures + blob = ''.join(chain) try: - result = add_chain(baseurl, {"chain":map(base64.b64encode, chain)}) + result = add_chain(baseurl, {"blob":base64.b64encode(blob)}) except ValueError, e: print_error("%s", e) try: - signed_entry = pack_cert(chain[0]) + signed_entry = pack_cert(blob) check_sct_signature(baseurl, signed_entry, result, publickey=logpublickey) print_success("signature check succeeded") except AssertionError, e: @@ -90,8 +95,8 @@ def do_add_chain(chain, baseurl): return result def get_and_validate_proof(timestamp, chain, leaf_index, nentries, baseurl): - cert = chain[0] - merkle_tree_leaf = pack_mtl(timestamp, cert) + blob = ''.join(chain) + merkle_tree_leaf = pack_mtl(timestamp, blob) leaf_hash = get_leaf_hash(merkle_tree_leaf) sth = get_sth(baseurl) proof = get_proof_by_hash(baseurl, leaf_hash, sth["tree_size"]) @@ -104,7 +109,7 @@ def get_and_validate_proof(timestamp, chain, leaf_index, nentries, baseurl): root_hash = base64.b64decode(sth["sha256_root_hash"]) assert_equal(root_hash, calc_root_hash, "verified root hash", nodata=True, quiet=True) - get_and_check_entry(timestamp, chain, leaf_index, baseurl) + get_and_check_entry(timestamp, blob, leaf_index, baseurl) def get_and_validate_consistency_proof(sth1, sth2, size1, size2, baseurl): consistency_proof = [base64.decodestring(entry) for entry in get_consistency_proof(baseurl, size1, size2)] @@ -116,35 +121,15 @@ def get_and_validate_consistency_proof(sth1, sth2, size1, size2, baseurl): def get_and_check_entry(timestamp, chain, leaf_index, baseurl): + blob = ''.join(chain) entries = get_entries(baseurl, leaf_index, leaf_index) assert_equal(len(entries), 1, "get_entries", quiet=True) fetched_entry = entries["entries"][0] - merkle_tree_leaf = pack_mtl(timestamp, chain[0]) + merkle_tree_leaf = pack_mtl(timestamp, blob) leaf_input = base64.decodestring(fetched_entry["leaf_input"]) - assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True) extra_data = base64.decodestring(fetched_entry["extra_data"]) - certchain = decode_certificate_chain(extra_data) - - submittedcertchain = chain[1:] - - for (submittedcert, fetchedcert, i) in zip(submittedcertchain, - certchain, itertools.count(1)): - assert_equal(fetchedcert, submittedcert, "cert %d in chain" % (i,), quiet=True) - - if len(certchain) == len(submittedcertchain) + 1: - last_issuer = get_cert_info(submittedcertchain[-1])["issuer"] - root_subject = get_cert_info(certchain[-1])["subject"] - if last_issuer == root_subject: - print_success("fetched chain has an appended root cert") - else: - print_error("fetched chain has an extra entry") - elif len(certchain) == len(submittedcertchain): - print_success("cert chains are the same length") - else: - print_error("cert chain length %d expected %d or %d", - len(certchain), - len(submittedcertchain), - len(submittedcertchain)) + assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True) + assert_equal(extra_data, '\x00\x00\x00', "extra_data", quiet=True) def merge(): return subprocess.call(["../tools/merge", "--config", "../test/catlfish-test.cfg", -- cgit v1.1