# Copyright (c) 2015, NORDUnet A/S. # See LICENSE for licensing information. import base64 import hashlib import sys import struct from certtools import get_leaf_hash def parselogrow(row): return base64.b16decode(row, casefold=True) def get_logorder(filename): f = open(filename, "r") return [parselogrow(row.rstrip()) for row in f] def read_chain_open(chainsdir, filename): path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] f = open(path + "/" + filename, "r") return f def read_chain(chainsdir, key): filename = base64.b16encode(key).upper() try: f = read_chain_open(chainsdir, filename) except IOError, e: f = read_chain_open(chainsdir, filename.lower()) value = f.read() f.close() return value def tlv_decode(data): (length,) = struct.unpack(">I", data[0:4]) type = data[4:8] value = data[8:length] rest = data[length:] return (type, value, rest) def tlv_encode(type, value): assert(len(type) == 4) data = struct.pack(">I", len(value) + 8) + type + value return data def tlv_decodelist(data): l = [] while len(data): (type, value, rest) = tlv_decode(data) l.append((type, value)) data = rest return l def tlv_encodelist(l): data = "" for (type, value) in l: data += tlv_encode(type, value) return data def unwrap_entry(entry): ploplevel = tlv_decodelist(entry) assert(len(ploplevel) == 2) (ploptype, plopdata) = ploplevel[0] (plopchecksumtype, plopchecksum) = ploplevel[1] assert(ploptype == "PLOP") assert(plopchecksumtype == "S256") computedchecksum = hashlib.sha256(plopdata).digest() assert(computedchecksum == plopchecksum) return plopdata def wrap_entry(entry): return tlv_encodelist([("PLOP", entry), ("S256", hashlib.sha256(entry).digest())]) def verify_entry(verifycert, entry, hash): packed = unwrap_entry(entry) unpacked = tlv_decodelist(packed) (mtltype, mtl) = unpacked[0] assert hash == get_leaf_hash(mtl) assert mtltype == "MTL1" s = struct.pack(">I", len(packed)) + packed try: verifycert.stdin.write(s) except IOError, e: sys.stderr.write("merge: unable to write to verifycert process: ") while 1: line = verifycert.stdout.readline() if line: sys.stderr.write(line) else: sys.exit(1) result_length_packed = verifycert.stdout.read(4) (result_length,) = struct.unpack(">I", result_length_packed) result = verifycert.stdout.read(result_length) assert len(result) == result_length (error_code,) = struct.unpack("B", result[0:1]) if error_code != 0: print >>sys.stderr, result[1:] sys.exit(1)