import subprocess import json import base64 import urllib import urllib2 import struct import sys def get_cert_info(s): p = subprocess.Popen( ["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) parsed = p.communicate(s) if parsed[1]: print "error:", parsed[1] result = {} for line in parsed[0].split("\n"): (key, sep, value) = line.partition("=") if sep == "=": result[key] = value return result def get_certs_from_file(certfile): certs = [] cert = "" incert = False for line in open(certfile): line = line.strip() if line == "-----BEGIN CERTIFICATE-----": cert = "" incert = True elif line == "-----END CERTIFICATE-----": certs.append(cert) incert = False elif incert: cert += line return certs def get_root_cert(issuer): accepted_certs = \ json.loads(open("googlelog-accepted-certs.txt").read())["certificates"] root_cert = None for accepted_cert in accepted_certs: subject = get_cert_info(base64.decodestring(accepted_cert))["subject"] if subject == issuer: print "found root cert" root_cert = base64.decodestring(accepted_cert) return root_cert def get_sth(baseurl): result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read() return json.loads(result) def get_proof_by_hash(baseurl, hash, tree_size): try: params = urllib.urlencode({"hash":base64.b64encode(hash), "tree_size":tree_size}) print params result = \ urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print e.read() sys.exit(1) def tls_array(data, length_len): length_bytes = struct.pack(">Q", len(data))[-length_len:] return length_bytes + data def unpack_tls_array(packed_data, length_len): padded_length = ["\x00"] * 8 padded_length[-length_len:] = packed_data[:length_len] (length,) = struct.unpack(">Q", "".join(padded_length)) unpacked_data = packed_data[length_len:length_len+length] assert len(unpacked_data) == length rest_data = packed_data[length_len+length:] return (unpacked_data, rest_data) def add_chain(baseurl, submission): try: return json.loads(urllib2.urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read()) except urllib2.HTTPError, e: print e.read() sys.exit(1) def get_entries(baseurl, start, end): try: params = urllib.urlencode({"start":start, "end":end}) result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print e.read() sys.exit(1) def decode_certificate_chain(packed_certchain): (unpacked_certchain, rest) = unpack_tls_array(packed_certchain, 3) assert len(rest) == 0 certs = [] while len(unpacked_certchain): (cert, rest) = unpack_tls_array(unpacked_certchain, 3) certs.append(cert) unpacked_certchain = rest return certs