summaryrefslogtreecommitdiff
path: root/tools/certtools.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordberg.se>2014-10-06 14:08:42 +0200
committerLinus Nordberg <linus@nordberg.se>2014-10-06 14:08:42 +0200
commit25f4683083db69b62465167ab4e300cd4f91152c (patch)
treefbc61416e3c81f91fe7fa64167e37e3ca9c200e9 /tools/certtools.py
parent294ac576ab050b6c46c011825d89337794404f66 (diff)
parent3847607ce64a715993df3eaa42d9c4ea42064678 (diff)
Merge remote-tracking branch 'refs/remotes/map/testtools4'
Diffstat (limited to 'tools/certtools.py')
-rw-r--r--tools/certtools.py91
1 files changed, 73 insertions, 18 deletions
diff --git a/tools/certtools.py b/tools/certtools.py
index 13dad17..a62d58f 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -8,6 +8,20 @@ import sys
import hashlib
import ecdsa
+publickeys = {
+ "https://ct.googleapis.com/pilot/":
+ "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEfahLEimAoz2t01p3uMziiLOl/fHTD"
+ "M0YDOhBRuiBARsV4UvxG2LdNgoIGLrtCzWE0J5APC2em4JlvR8EEEFMoA==",
+
+ "https://127.0.0.1:8080/":
+ "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE4qWq6afhBUi0OdcWUYhyJLNXTkGqQ9"
+ "PMS5lqoCgkV2h1ZvpNjBH2u8UbgcOQwqDo66z6BWQJGolozZYmNHE2kQ==",
+
+ "https://flimsy.ct.nordu.net/":
+ "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE4qWq6afhBUi0OdcWUYhyJLNXTkGqQ9"
+ "PMS5lqoCgkV2h1ZvpNjBH2u8UbgcOQwqDo66z6BWQJGolozZYmNHE2kQ==",
+}
+
def get_cert_info(s):
p = subprocess.Popen(
["openssl", "x509", "-noout", "-subject", "-issuer", "-inform", "der"],
@@ -15,7 +29,8 @@ def get_cert_info(s):
stderr=subprocess.PIPE)
parsed = p.communicate(s)
if parsed[1]:
- print "error:", parsed[1]
+ print "ERROR:", parsed[1]
+ sys.exit(1)
result = {}
for line in parsed[0].split("\n"):
(key, sep, value) = line.partition("=")
@@ -34,7 +49,7 @@ def get_certs_from_file(certfile):
cert = ""
incert = True
elif line == "-----END CERTIFICATE-----":
- certs.append(cert)
+ certs.append(base64.decodestring(cert))
incert = False
elif incert:
cert += line
@@ -49,7 +64,6 @@ def get_root_cert(issuer):
for accepted_cert in accepted_certs:
subject = get_cert_info(base64.decodestring(accepted_cert))["subject"]
if subject == issuer:
- print "found root cert"
root_cert = base64.decodestring(accepted_cert)
return root_cert
@@ -62,12 +76,11 @@ def get_proof_by_hash(baseurl, hash, tree_size):
try:
params = urllib.urlencode({"hash":base64.b64encode(hash),
"tree_size":tree_size})
- print params
result = \
urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
- print e.read()
+ print "ERROR:", e.read()
sys.exit(1)
def tls_array(data, length_len):
@@ -87,11 +100,19 @@ def unpack_tls_array(packed_data, length_len):
def add_chain(baseurl, submission):
try:
- return json.loads(urllib2.urlopen(baseurl + "ct/v1/add-chain",
- json.dumps(submission)).read())
+ result = urllib2.urlopen(baseurl + "ct/v1/add-chain",
+ json.dumps(submission)).read()
+ return json.loads(result)
except urllib2.HTTPError, e:
- print e.read()
+ print "ERROR:", e.read()
sys.exit(1)
+ except ValueError, e:
+ print "==== FAILED REQUEST ===="
+ print submission
+ print "======= RESPONSE ======="
+ print result
+ print "========================"
+ raise e
def get_entries(baseurl, start, end):
try:
@@ -99,7 +120,7 @@ def get_entries(baseurl, start, end):
result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read()
return json.loads(result)
except urllib2.HTTPError, e:
- print e.read()
+ print "ERROR:", e.read()
sys.exit(1)
def decode_certificate_chain(packed_certchain):
@@ -118,7 +139,32 @@ def decode_signature(signature):
assert rest == ""
return (hash_alg, signature_alg, unpacked_signature)
-def check_signature(publickey, leafcert, sct):
+def check_signature(baseurl, signature, data):
+ publickey = base64.decodestring(publickeys[baseurl])
+ (hash_alg, signature_alg, unpacked_signature) = decode_signature(signature)
+ assert hash_alg == 4, \
+ "hash_alg is %d, expected 4" % (hash_alg,) # sha256
+ assert signature_alg == 3, \
+ "signature_alg is %d, expected 3" % (signature_alg,) # ecdsa
+
+ vk = ecdsa.VerifyingKey.from_der(publickey)
+ vk.verify(unpacked_signature, data, hashfunc=hashlib.sha256,
+ sigdecode=ecdsa.util.sigdecode_der)
+
+def check_sth_signature(baseurl, sth):
+ signature = base64.decodestring(sth["tree_head_signature"])
+
+ version = struct.pack(">b", 0)
+ signature_type = struct.pack(">b", 1)
+ timestamp = struct.pack(">Q", sth["timestamp"])
+ tree_size = struct.pack(">Q", sth["tree_size"])
+ hash = base64.decodestring(sth["sha256_root_hash"])
+ tree_head = version + signature_type + timestamp + tree_size + hash
+
+ check_signature(baseurl, signature, tree_head)
+
+def check_sct_signature(baseurl, leafcert, sct):
+ publickey = base64.decodestring(publickeys[baseurl])
calculated_logid = hashlib.sha256(publickey).digest()
received_logid = base64.decodestring(sct["id"])
assert calculated_logid == received_logid, \
@@ -136,13 +182,22 @@ def check_signature(publickey, leafcert, sct):
entry_type + tls_array(leafcert, 3) + \
tls_array(base64.decodestring(sct["extensions"]), 2)
- (hash_alg, signature_alg, unpacked_signature) = decode_signature(signature)
- assert hash_alg == 4 # sha256
- assert signature_alg == 3 # ecdsa
+ check_signature(baseurl, signature, signed_struct)
- hash = hashlib.sha256()
- hash.update(signed_struct)
+def pack_mtl(timestamp, leafcert):
+ entry_type = struct.pack(">H", 0)
+ extensions = ""
- vk = ecdsa.VerifyingKey.from_der(publickey)
- vk.verify(unpacked_signature, signed_struct, hashfunc=hashlib.sha256,
- sigdecode=ecdsa.util.sigdecode_der)
+ timestamped_entry = struct.pack(">Q", timestamp) + entry_type + \
+ tls_array(leafcert, 3) + tls_array(extensions, 2)
+ version = struct.pack(">b", 0)
+ leaf_type = struct.pack(">b", 0)
+ merkle_tree_leaf = version + leaf_type + timestamped_entry
+ return merkle_tree_leaf
+
+def get_leaf_hash(merkle_tree_leaf):
+ leaf_hash = hashlib.sha256()
+ leaf_hash.update(struct.pack(">b", 0))
+ leaf_hash.update(merkle_tree_leaf)
+
+ return leaf_hash.digest()