summaryrefslogtreecommitdiff
path: root/tools/certtools.py
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-03-31 19:18:30 +0200
committerMagnus Ahltorp <map@kth.se>2015-03-31 19:18:30 +0200
commitab924f51f254d1bdd6f752f8c19c4cbcc55cf0e4 (patch)
tree91261dcf3047c735207d706862bd9136f003230a /tools/certtools.py
parenta706e79fa722f681320fe1b05824352b6b9a63fc (diff)
parent13c3789add4f1630c4bc8dfccb229ebc7d4bfa38 (diff)
Merge branch 'genauthkeys'
Diffstat (limited to 'tools/certtools.py')
-rw-r--r--tools/certtools.py146
1 files changed, 125 insertions, 21 deletions
diff --git a/tools/certtools.py b/tools/certtools.py
index 222497f..498a2e0 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -6,6 +6,7 @@ import json
import base64
import urllib
import urllib2
+import ssl
import urlparse
import struct
import sys
@@ -60,11 +61,20 @@ def get_certs_from_string(s):
f = cStringIO.StringIO(s)
return get_pemlike_from_file(f, "CERTIFICATE")
+def get_precerts_from_string(s):
+ f = cStringIO.StringIO(s)
+ return get_pemlike_from_file(f, "PRECERTIFICATE")
+
def get_eckey_from_file(keyfile):
keys = get_pemlike(keyfile, "EC PRIVATE KEY")
assert len(keys) == 1
return keys[0]
+def get_public_key_from_file(keyfile):
+ keys = get_pemlike(keyfile, "PUBLIC KEY")
+ assert len(keys) == 1
+ return keys[0]
+
def get_root_cert(issuer):
accepted_certs = \
json.loads(open("googlelog-accepted-certs.txt").read())["certificates"]
@@ -78,8 +88,15 @@ def get_root_cert(issuer):
return root_cert
+def urlopen(url, data=None):
+ try:
+ opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)))
+ except AttributeError:
+ opener = urllib2.build_opener(urllib2.HTTPSHandler())
+ return opener.open(url, data)
+
def get_sth(baseurl):
- result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read()
+ result = urlopen(baseurl + "ct/v1/get-sth").read()
return json.loads(result)
def get_proof_by_hash(baseurl, hash, tree_size):
@@ -87,7 +104,7 @@ def get_proof_by_hash(baseurl, hash, tree_size):
params = urllib.urlencode({"hash":base64.b64encode(hash),
"tree_size":tree_size})
result = \
- urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read()
+ urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -98,7 +115,7 @@ def get_consistency_proof(baseurl, tree_size1, tree_size2):
params = urllib.urlencode({"first":tree_size1,
"second":tree_size2})
result = \
- urllib2.urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read()
+ urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read()
return json.loads(result)["consistency"]
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -121,7 +138,24 @@ def unpack_tls_array(packed_data, length_len):
def add_chain(baseurl, submission):
try:
- result = urllib2.urlopen(baseurl + "ct/v1/add-chain",
+ result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read()
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print "ERROR", e.code,":", e.read()
+ if e.code == 400:
+ return None
+ sys.exit(1)
+ except ValueError, e:
+ print "==== FAILED REQUEST ===="
+ print submission
+ print "======= RESPONSE ======="
+ print result
+ print "========================"
+ raise e
+
+def add_prechain(baseurl, submission):
+ try:
+ result = urlopen(baseurl + "ct/v1/add-pre-chain",
json.dumps(submission)).read()
return json.loads(result)
except urllib2.HTTPError, e:
@@ -140,7 +174,7 @@ def add_chain(baseurl, submission):
def get_entries(baseurl, start, end):
try:
params = urllib.urlencode({"start":start, "end":end})
- result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read()
+ result = urlopen(baseurl + "ct/v1/get-entries?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR:", e.read()
@@ -171,8 +205,9 @@ def encode_signature(hash_alg, signature_alg, unpacked_signature):
signature += tls_array(unpacked_signature, 2)
return signature
-def check_signature(baseurl, signature, data):
- publickey = base64.decodestring(publickeys[baseurl])
+def check_signature(baseurl, signature, data, publickey=None):
+ if publickey == None:
+ publickey = base64.decodestring(publickeys[baseurl])
(hash_alg, signature_alg, unpacked_signature) = decode_signature(signature)
assert hash_alg == 4, \
"hash_alg is %d, expected 4" % (hash_alg,) # sha256
@@ -183,22 +218,49 @@ def check_signature(baseurl, signature, data):
vk.verify(unpacked_signature, data, hashfunc=hashlib.sha256,
sigdecode=ecdsa.util.sigdecode_der)
-def http_request(url, data=None, key=None):
- req = urllib2.Request(url, data)
+def parse_auth_header(authheader):
+ splittedheader = authheader.split(";")
+ (signature, rawoptions) = (splittedheader[0], splittedheader[1:])
+ options = dict([(e.partition("=")[0], e.partition("=")[2]) for e in rawoptions])
+ return (base64.b64decode(signature), options)
+
+def check_auth_header(authheader, expected_key, publickeydir, data, path):
+ if expected_key == None:
+ return True
+ (signature, options) = parse_auth_header(authheader)
+ keyname = options.get("key")
+ if keyname != expected_key:
+ raise Exception("Response claimed to come from %s, expected %s" % (keyname, expected_key))
+ publickey = get_public_key_from_file(publickeydir + "/" + keyname + ".pem")
+ vk = ecdsa.VerifyingKey.from_der(publickey)
+ vk.verify(signature, "%s\0%s\0%s" % ("REPLY", path, data), hashfunc=hashlib.sha256,
+ sigdecode=ecdsa.util.sigdecode_der)
+ return True
+
+def http_request(url, data=None, key=None, verifynode=None, publickeydir="."):
+ try:
+ opener = urllib2.build_opener(urllib2.HTTPSHandler(context=ssl.SSLContext(ssl.PROTOCOL_TLSv1)))
+ except AttributeError:
+ opener = urllib2.build_opener(urllib2.HTTPSHandler())
+
(keyname, keyfile) = key
privatekey = get_eckey_from_file(keyfile)
sk = ecdsa.SigningKey.from_der(privatekey)
parsed_url = urlparse.urlparse(url)
if data == None:
- data = parsed_url.query
+ data_to_sign = parsed_url.query
method = "GET"
else:
+ data_to_sign = data
method = "POST"
- signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data), hashfunc=hashlib.sha256,
+ signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data_to_sign), hashfunc=hashlib.sha256,
sigencode=ecdsa.util.sigencode_der)
- req.add_header('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)
- result = urllib2.urlopen(req).read()
- return result
+ opener.addheaders = [('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)]
+ result = opener.open(url, data)
+ authheader = result.info().get('X-Catlfish-Auth')
+ data = result.read()
+ check_auth_header(authheader, verifynode, publickeydir, data, parsed_url.path)
+ return data
def get_signature(baseurl, data, key=None):
try:
@@ -214,7 +276,7 @@ def create_signature(baseurl, data, key=None):
unpacked_signature = get_signature(baseurl, data, key)
return encode_signature(4, 3, unpacked_signature)
-def check_sth_signature(baseurl, sth):
+def check_sth_signature(baseurl, sth, publickey=None):
signature = base64.decodestring(sth["tree_head_signature"])
version = struct.pack(">b", 0)
@@ -224,7 +286,7 @@ def check_sth_signature(baseurl, sth):
hash = base64.decodestring(sth["sha256_root_hash"])
tree_head = version + signature_type + timestamp + tree_size + hash
- check_signature(baseurl, signature, tree_head)
+ check_signature(baseurl, signature, tree_head, publickey=publickey)
def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None):
version = struct.pack(">b", 0)
@@ -235,8 +297,9 @@ def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None):
return create_signature(baseurl, tree_head, key=key)
-def check_sct_signature(baseurl, leafcert, sct):
- publickey = base64.decodestring(publickeys[baseurl])
+def check_sct_signature(baseurl, signed_entry, sct, precert=False, publickey=None):
+ if publickey == None:
+ publickey = base64.decodestring(publickeys[baseurl])
calculated_logid = hashlib.sha256(publickey).digest()
received_logid = base64.decodestring(sct["id"])
assert calculated_logid == received_logid, \
@@ -249,12 +312,15 @@ def check_sct_signature(baseurl, leafcert, sct):
version = struct.pack(">b", sct["sct_version"])
signature_type = struct.pack(">b", 0)
timestamp = struct.pack(">Q", sct["timestamp"])
- entry_type = struct.pack(">H", 0)
+ if precert:
+ entry_type = struct.pack(">H", 1)
+ else:
+ entry_type = struct.pack(">H", 0)
signed_struct = version + signature_type + timestamp + \
- entry_type + tls_array(leafcert, 3) + \
+ entry_type + signed_entry + \
tls_array(base64.decodestring(sct["extensions"]), 2)
- check_signature(baseurl, signature, signed_struct)
+ check_signature(baseurl, signature, signed_struct, publickey=publickey)
def pack_mtl(timestamp, leafcert):
entry_type = struct.pack(">H", 0)
@@ -267,6 +333,25 @@ def pack_mtl(timestamp, leafcert):
merkle_tree_leaf = version + leaf_type + timestamped_entry
return merkle_tree_leaf
+def pack_mtl_precert(timestamp, cleanedcert, issuer_key_hash):
+ entry_type = struct.pack(">H", 1)
+ extensions = ""
+
+ timestamped_entry = struct.pack(">Q", timestamp) + entry_type + \
+ pack_precert(cleanedcert, issuer_key_hash) + tls_array(extensions, 2)
+ version = struct.pack(">b", 0)
+ leaf_type = struct.pack(">b", 0)
+ merkle_tree_leaf = version + leaf_type + timestamped_entry
+ return merkle_tree_leaf
+
+def pack_precert(cleanedcert, issuer_key_hash):
+ assert len(issuer_key_hash) == 32
+
+ return issuer_key_hash + tls_array(cleanedcert, 3)
+
+def pack_cert(cert):
+ return tls_array(cert, 3)
+
def unpack_mtl(merkle_tree_leaf):
version = merkle_tree_leaf[0:1]
leaf_type = merkle_tree_leaf[1:2]
@@ -353,6 +438,14 @@ def get_hash_from_certfile(cert):
return base64.b16decode(line[len("Leafhash: "):])
return None
+def get_timestamp_from_certfile(cert):
+ for line in cert.split("\n"):
+ if line.startswith("-----"):
+ return None
+ if line.startswith("Timestamp: "):
+ return int(line[len("Timestamp: "):])
+ return None
+
def get_proof(store, tree_size, n):
hash = get_hash_from_certfile(get_one_cert(store, n))
return get_proof_by_hash(args.baseurl, hash, tree_size)
@@ -586,5 +679,16 @@ def verify_consistency_proof(consistency_proof, first, second, oldhash_input):
def verify_inclusion_proof(inclusion_proof, index, treesize, leafhash):
chain = zip([(index, 0)] + nodes_for_index(index, treesize), [leafhash] + inclusion_proof)
+ assert len(nodes_for_index(index, treesize)) == len(inclusion_proof)
(_, hash) = reduce(lambda e1, e2: combine_two_hashes(e1, e2, treesize), chain)
return hash
+
+def extract_original_entry(entry):
+ leaf_input = base64.decodestring(entry["leaf_input"])
+ (leaf_cert, timestamp, issuer_key_hash) = unpack_mtl(leaf_input)
+ extra_data = base64.decodestring(entry["extra_data"])
+ if issuer_key_hash != None:
+ (precert, extra_data) = extract_precertificate(extra_data)
+ leaf_cert = precert
+ certchain = decode_certificate_chain(extra_data)
+ return ([leaf_cert] + certchain, timestamp, issuer_key_hash)