summaryrefslogtreecommitdiff
path: root/tools/certtools.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/certtools.py')
-rw-r--r--tools/certtools.py102
1 files changed, 91 insertions, 11 deletions
diff --git a/tools/certtools.py b/tools/certtools.py
index ffd631c..2c97dfb 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -61,11 +61,20 @@ def get_certs_from_string(s):
f = cStringIO.StringIO(s)
return get_pemlike_from_file(f, "CERTIFICATE")
+def get_precerts_from_string(s):
+ f = cStringIO.StringIO(s)
+ return get_pemlike_from_file(f, "PRECERTIFICATE")
+
def get_eckey_from_file(keyfile):
keys = get_pemlike(keyfile, "EC PRIVATE KEY")
assert len(keys) == 1
return keys[0]
+def get_public_key_from_file(keyfile):
+ keys = get_pemlike(keyfile, "PUBLIC KEY")
+ assert len(keys) == 1
+ return keys[0]
+
def get_root_cert(issuer):
accepted_certs = \
json.loads(open("googlelog-accepted-certs.txt").read())["certificates"]
@@ -80,7 +89,7 @@ def get_root_cert(issuer):
return root_cert
def get_sth(baseurl):
- result = urllib2.urlopen(baseurl + "ct/v1/get-sth", context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
+ result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read()
return json.loads(result)
def get_proof_by_hash(baseurl, hash, tree_size):
@@ -88,7 +97,7 @@ def get_proof_by_hash(baseurl, hash, tree_size):
params = urllib.urlencode({"hash":base64.b64encode(hash),
"tree_size":tree_size})
result = \
- urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params, context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
+ urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -99,7 +108,7 @@ def get_consistency_proof(baseurl, tree_size1, tree_size2):
params = urllib.urlencode({"first":tree_size1,
"second":tree_size2})
result = \
- urllib2.urlopen(baseurl + "ct/v1/get-sth-consistency?" + params, context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
+ urllib2.urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read()
return json.loads(result)["consistency"]
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -122,7 +131,25 @@ def unpack_tls_array(packed_data, length_len):
def add_chain(baseurl, submission):
try:
- result = urllib2.urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission), context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
+ result = urllib2.urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read()
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print "ERROR", e.code,":", e.read()
+ if e.code == 400:
+ return None
+ sys.exit(1)
+ except ValueError, e:
+ print "==== FAILED REQUEST ===="
+ print submission
+ print "======= RESPONSE ======="
+ print result
+ print "========================"
+ raise e
+
+def add_prechain(baseurl, submission):
+ try:
+ result = urllib2.urlopen(baseurl + "ct/v1/add-pre-chain",
+ json.dumps(submission)).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR", e.code,":", e.read()
@@ -140,7 +167,7 @@ def add_chain(baseurl, submission):
def get_entries(baseurl, start, end):
try:
params = urllib.urlencode({"start":start, "end":end})
- result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params, context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
+ result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -183,7 +210,26 @@ def check_signature(baseurl, signature, data):
vk.verify(unpacked_signature, data, hashfunc=hashlib.sha256,
sigdecode=ecdsa.util.sigdecode_der)
-def http_request(url, data=None, key=None):
+def parse_auth_header(authheader):
+ splittedheader = authheader.split(";")
+ (signature, rawoptions) = (splittedheader[0], splittedheader[1:])
+ options = dict([(e.partition("=")[0], e.partition("=")[2]) for e in rawoptions])
+ return (base64.b64decode(signature), options)
+
+def check_auth_header(authheader, expected_key, publickeydir, data, path):
+ if expected_key == None:
+ return True
+ (signature, options) = parse_auth_header(authheader)
+ keyname = options.get("key")
+ if keyname != expected_key:
+ raise Exception("Response claimed to come from %s, expected %s" % (keyname, expected_key))
+ publickey = get_public_key_from_file(publickeydir + "/" + keyname + ".pem")
+ vk = ecdsa.VerifyingKey.from_der(publickey)
+ vk.verify(signature, "%s\0%s\0%s" % ("REPLY", path, data), hashfunc=hashlib.sha256,
+ sigdecode=ecdsa.util.sigdecode_der)
+ return True
+
+def http_request(url, data=None, key=None, verifynode=None, publickeydir="."):
req = urllib2.Request(url, data)
(keyname, keyfile) = key
privatekey = get_eckey_from_file(keyfile)
@@ -197,8 +243,11 @@ def http_request(url, data=None, key=None):
signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data), hashfunc=hashlib.sha256,
sigencode=ecdsa.util.sigencode_der)
req.add_header('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)
- result = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)).read()
- return result
+ result = urllib2.urlopen(req)
+ authheader = result.info().get('X-Catlfish-Auth')
+ data = result.read()
+ check_auth_header(authheader, verifynode, publickeydir, data, parsed_url.path)
+ return data
def get_signature(baseurl, data, key=None):
try:
@@ -235,7 +284,7 @@ def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None):
return create_signature(baseurl, tree_head, key=key)
-def check_sct_signature(baseurl, leafcert, sct):
+def check_sct_signature(baseurl, signed_entry, sct, precert=False):
publickey = base64.decodestring(publickeys[baseurl])
calculated_logid = hashlib.sha256(publickey).digest()
received_logid = base64.decodestring(sct["id"])
@@ -249,9 +298,12 @@ def check_sct_signature(baseurl, leafcert, sct):
version = struct.pack(">b", sct["sct_version"])
signature_type = struct.pack(">b", 0)
timestamp = struct.pack(">Q", sct["timestamp"])
- entry_type = struct.pack(">H", 0)
+ if precert:
+ entry_type = struct.pack(">H", 1)
+ else:
+ entry_type = struct.pack(">H", 0)
signed_struct = version + signature_type + timestamp + \
- entry_type + tls_array(leafcert, 3) + \
+ entry_type + signed_entry + \
tls_array(base64.decodestring(sct["extensions"]), 2)
check_signature(baseurl, signature, signed_struct)
@@ -267,6 +319,25 @@ def pack_mtl(timestamp, leafcert):
merkle_tree_leaf = version + leaf_type + timestamped_entry
return merkle_tree_leaf
+def pack_mtl_precert(timestamp, cleanedcert, issuer_key_hash):
+ entry_type = struct.pack(">H", 1)
+ extensions = ""
+
+ timestamped_entry = struct.pack(">Q", timestamp) + entry_type + \
+ pack_precert(cleanedcert, issuer_key_hash) + tls_array(extensions, 2)
+ version = struct.pack(">b", 0)
+ leaf_type = struct.pack(">b", 0)
+ merkle_tree_leaf = version + leaf_type + timestamped_entry
+ return merkle_tree_leaf
+
+def pack_precert(cleanedcert, issuer_key_hash):
+ assert len(issuer_key_hash) == 32
+
+ return issuer_key_hash + tls_array(cleanedcert, 3)
+
+def pack_cert(cert):
+ return tls_array(cert, 3)
+
def unpack_mtl(merkle_tree_leaf):
version = merkle_tree_leaf[0:1]
leaf_type = merkle_tree_leaf[1:2]
@@ -353,6 +424,14 @@ def get_hash_from_certfile(cert):
return base64.b16decode(line[len("Leafhash: "):])
return None
+def get_timestamp_from_certfile(cert):
+ for line in cert.split("\n"):
+ if line.startswith("-----"):
+ return None
+ if line.startswith("Timestamp: "):
+ return int(line[len("Timestamp: "):])
+ return None
+
def get_proof(store, tree_size, n):
hash = get_hash_from_certfile(get_one_cert(store, n))
return get_proof_by_hash(args.baseurl, hash, tree_size)
@@ -586,6 +665,7 @@ def verify_consistency_proof(consistency_proof, first, second, oldhash_input):
def verify_inclusion_proof(inclusion_proof, index, treesize, leafhash):
chain = zip([(index, 0)] + nodes_for_index(index, treesize), [leafhash] + inclusion_proof)
+ assert len(nodes_for_index(index, treesize)) == len(inclusion_proof)
(_, hash) = reduce(lambda e1, e2: combine_two_hashes(e1, e2, treesize), chain)
return hash