From 0d7c8181f07e1c34ef3a9a8740c8ce2d7b982f74 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Wed, 19 Aug 2015 09:55:02 +0200 Subject: Script for converting database to new format --- tools/convertdb.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++ tools/fetchallcerts.py | 4 ++- tools/mergetools.py | 15 +++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 tools/convertdb.py (limited to 'tools') diff --git a/tools/convertdb.py b/tools/convertdb.py new file mode 100644 index 0000000..c036843 --- /dev/null +++ b/tools/convertdb.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2014, NORDUnet A/S. +# See LICENSE for licensing information. + +import argparse +import urllib2 +import urllib +import json +import base64 +import sys +import struct +import hashlib +import itertools +from certtools import * +from mergetools import * +import zipfile +import os +import time +import shutil + +def write_file(fn, contents): + tempname = fn + ".new" + open(tempname, 'w').write(contents) + shutil.move(tempname, fn) + +def unpack_entry(entry): + pieces = [] + while len(entry): + (length,) = struct.unpack(">I", entry[0:4]) + data = entry[4:4+length] + entry = entry[4+length:] + pieces.append(data) + return pieces + +def read_old_entry(entry, hash): + unpacked = unpack_entry(entry) + mtl = unpacked[0] + assert hash == get_leaf_hash(mtl) + (leafcert, timestamp, issuer_key_hash) = unpack_mtl(mtl) + certchain = decode_certificate_chain(unpacked[1]) + if issuer_key_hash: + leafcert = certchain[0] + certchain = certchain[1:] + certtype = "PRC1" + else: + certtype = "EEC1" + return (mtl, leafcert, certtype, certchain) + +def convertentry(entry, hash): + (mtl, leafcert, certtype, chain) = read_old_entry(entry, hash) + entry = tlv_encodelist([("MTL1", mtl), + (certtype, leafcert), + ("CHN1", tlv_encodelist([("X509", cert) for cert in chain]))]) + return wrap_entry(entry) + +parser = argparse.ArgumentParser(description='') +parser.add_argument('path', help="Path to database to convert") +args = parser.parse_args() + +for (dirpath, dirnames, filenames) in os.walk(args.path): + for filename in filenames: + fullpath = dirpath + "/" + filename + entry = open(fullpath).read() + entry = convertentry(entry, base64.b16decode(filename.upper())) + if entry != None: + print "writing new entry for", filename + write_file(fullpath, entry) + else: + print "not writing new entry for", filename diff --git a/tools/fetchallcerts.py b/tools/fetchallcerts.py index 943759e..66fde74 100755 --- a/tools/fetchallcerts.py +++ b/tools/fetchallcerts.py @@ -22,6 +22,7 @@ parser = argparse.ArgumentParser(description='') parser.add_argument('baseurl', help="Base URL for CT server") parser.add_argument('--store', default=None, metavar="dir", help='Store certificates in directory dir') parser.add_argument('--write-sth', action='store_true', help='Write STH') +parser.add_argument('--no-check-signature', action='store_true', help='Don\'t check signature') parser.add_argument('--publickey', default=None, metavar="file", help='Public key for the CT log') parser.add_argument('--cafile', default=None, metavar="file", help='File containing the CA cert') args = parser.parse_args() @@ -46,7 +47,8 @@ def print_layer(layer): logpublickey = get_public_key_from_file(args.publickey) if args.publickey else None sth = get_sth(args.baseurl) -check_sth_signature(args.baseurl, sth, publickey=logpublickey) +if not args.no_check_signature: + check_sth_signature(args.baseurl, sth, publickey=logpublickey) tree_size = sth["tree_size"] root_hash = base64.decodestring(sth["sha256_root_hash"]) diff --git a/tools/mergetools.py b/tools/mergetools.py index c3e9688..947d7f4 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -35,6 +35,11 @@ def tlv_decode(data): rest = data[length:] return (type, value, rest) +def tlv_encode(type, value): + assert(len(type) == 4) + data = struct.pack(">I", len(value) + 8) + type + value + return data + def tlv_decodelist(data): l = [] while len(data): @@ -43,6 +48,12 @@ def tlv_decodelist(data): data = rest return l +def tlv_encodelist(l): + data = "" + for (type, value) in l: + data += tlv_encode(type, value) + return data + def unwrap_entry(entry): ploplevel = tlv_decodelist(entry) assert(len(ploplevel) == 2) @@ -54,6 +65,10 @@ def unwrap_entry(entry): assert(computedchecksum == plopchecksum) return plopdata +def wrap_entry(entry): + return tlv_encodelist([("PLOP", entry), + ("S256", hashlib.sha256(entry).digest())]) + def verify_entry(verifycert, entry, hash): packed = unwrap_entry(entry) unpacked = tlv_decodelist(packed) -- cgit v1.1