diff options
author | Magnus Ahltorp <map@kth.se> | 2015-03-31 14:27:23 +0200 |
---|---|---|
committer | Magnus Ahltorp <map@kth.se> | 2015-03-31 14:27:23 +0200 |
commit | 6b62ebbf1de5b9e55b04e9cfafd0620f1374c2d4 (patch) | |
tree | 80a4dccbd98c26a80c07146a93318ba1edece01f /tools/certtools.py | |
parent | 22cefc84254cae1f57195da819eba69dbacb5a6e (diff) |
Cleanup tests and use urllib2.build_openercleanup-tests
Remove unused files
Generate test config files directly in release directory
Move test database files to "tests" directory
Generate log key when preparing tests
Report error when STH not found in v1.erl
Make merge, fetchallcerts, submitcert, verifysct, and testcase1 take log key as argument
Diffstat (limited to 'tools/certtools.py')
-rw-r--r-- | tools/certtools.py | 50 |
1 files changed, 32 insertions, 18 deletions
diff --git a/tools/certtools.py b/tools/certtools.py index 2c97dfb..da5021a 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -88,8 +88,15 @@ def get_root_cert(issuer): return root_cert +def urlopen(url, data=None): + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=None)) + except TypeError: + opener = urllib2.build_opener(urllib2.HTTPSHandler()) + return opener.open(url, data) + def get_sth(baseurl): - result = urllib2.urlopen(baseurl + "ct/v1/get-sth").read() + result = urlopen(baseurl + "ct/v1/get-sth").read() return json.loads(result) def get_proof_by_hash(baseurl, hash, tree_size): @@ -97,7 +104,7 @@ def get_proof_by_hash(baseurl, hash, tree_size): params = urllib.urlencode({"hash":base64.b64encode(hash), "tree_size":tree_size}) result = \ - urllib2.urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() + urlopen(baseurl + "ct/v1/get-proof-by-hash?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -108,7 +115,7 @@ def get_consistency_proof(baseurl, tree_size1, tree_size2): params = urllib.urlencode({"first":tree_size1, "second":tree_size2}) result = \ - urllib2.urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read() + urlopen(baseurl + "ct/v1/get-sth-consistency?" + params).read() return json.loads(result)["consistency"] except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -131,7 +138,7 @@ def unpack_tls_array(packed_data, length_len): def add_chain(baseurl, submission): try: - result = urllib2.urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() + result = urlopen(baseurl + "ct/v1/add-chain", json.dumps(submission)).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR", e.code,":", e.read() @@ -148,7 +155,7 @@ def add_chain(baseurl, submission): def add_prechain(baseurl, submission): try: - result = urllib2.urlopen(baseurl + "ct/v1/add-pre-chain", + result = urlopen(baseurl + "ct/v1/add-pre-chain", json.dumps(submission)).read() return json.loads(result) except urllib2.HTTPError, e: @@ -167,7 +174,7 @@ def add_prechain(baseurl, submission): def get_entries(baseurl, start, end): try: params = urllib.urlencode({"start":start, "end":end}) - result = urllib2.urlopen(baseurl + "ct/v1/get-entries?" + params).read() + result = urlopen(baseurl + "ct/v1/get-entries?" + params).read() return json.loads(result) except urllib2.HTTPError, e: print "ERROR:", e.read() @@ -198,8 +205,9 @@ def encode_signature(hash_alg, signature_alg, unpacked_signature): signature += tls_array(unpacked_signature, 2) return signature -def check_signature(baseurl, signature, data): - publickey = base64.decodestring(publickeys[baseurl]) +def check_signature(baseurl, signature, data, publickey=None): + if publickey == None: + publickey = base64.decodestring(publickeys[baseurl]) (hash_alg, signature_alg, unpacked_signature) = decode_signature(signature) assert hash_alg == 4, \ "hash_alg is %d, expected 4" % (hash_alg,) # sha256 @@ -230,20 +238,25 @@ def check_auth_header(authheader, expected_key, publickeydir, data, path): return True def http_request(url, data=None, key=None, verifynode=None, publickeydir="."): - req = urllib2.Request(url, data) + try: + opener = urllib2.build_opener(urllib2.HTTPSHandler(context=None)) + except TypeError: + opener = urllib2.build_opener(urllib2.HTTPSHandler()) + (keyname, keyfile) = key privatekey = get_eckey_from_file(keyfile) sk = ecdsa.SigningKey.from_der(privatekey) parsed_url = urlparse.urlparse(url) if data == None: - data = parsed_url.query + data_to_sign = parsed_url.query method = "GET" else: + data_to_sign = data method = "POST" - signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data), hashfunc=hashlib.sha256, + signature = sk.sign("%s\0%s\0%s" % (method, parsed_url.path, data_to_sign), hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_der) - req.add_header('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname) - result = urllib2.urlopen(req) + opener.addheaders = [('X-Catlfish-Auth', base64.b64encode(signature) + ";key=" + keyname)] + result = opener.open(url, data) authheader = result.info().get('X-Catlfish-Auth') data = result.read() check_auth_header(authheader, verifynode, publickeydir, data, parsed_url.path) @@ -263,7 +276,7 @@ def create_signature(baseurl, data, key=None): unpacked_signature = get_signature(baseurl, data, key) return encode_signature(4, 3, unpacked_signature) -def check_sth_signature(baseurl, sth): +def check_sth_signature(baseurl, sth, publickey=None): signature = base64.decodestring(sth["tree_head_signature"]) version = struct.pack(">b", 0) @@ -273,7 +286,7 @@ def check_sth_signature(baseurl, sth): hash = base64.decodestring(sth["sha256_root_hash"]) tree_head = version + signature_type + timestamp + tree_size + hash - check_signature(baseurl, signature, tree_head) + check_signature(baseurl, signature, tree_head, publickey=publickey) def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None): version = struct.pack(">b", 0) @@ -284,8 +297,9 @@ def create_sth_signature(tree_size, timestamp, root_hash, baseurl, key=None): return create_signature(baseurl, tree_head, key=key) -def check_sct_signature(baseurl, signed_entry, sct, precert=False): - publickey = base64.decodestring(publickeys[baseurl]) +def check_sct_signature(baseurl, signed_entry, sct, precert=False, publickey=None): + if publickey == None: + publickey = base64.decodestring(publickeys[baseurl]) calculated_logid = hashlib.sha256(publickey).digest() received_logid = base64.decodestring(sct["id"]) assert calculated_logid == received_logid, \ @@ -306,7 +320,7 @@ def check_sct_signature(baseurl, signed_entry, sct, precert=False): entry_type + signed_entry + \ tls_array(base64.decodestring(sct["extensions"]), 2) - check_signature(baseurl, signature, signed_struct) + check_signature(baseurl, signature, signed_struct, publickey=publickey) def pack_mtl(timestamp, leafcert): entry_type = struct.pack(">H", 0) |