summaryrefslogtreecommitdiff
path: root/tools/mergetools.py
blob: 947d7f441dbda9d570d57ab6d07002d19debae67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# Copyright (c) 2015, NORDUnet A/S.
# See LICENSE for licensing information.
import base64
import hashlib
import sys
import struct
from certtools import get_leaf_hash

def parselogrow(row):
    return base64.b16decode(row, casefold=True)

def get_logorder(filename):
    f = open(filename, "r")
    return [parselogrow(row.rstrip()) for row in f]

def read_chain_open(chainsdir, filename):
    path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
    f = open(path + "/" + filename, "r")
    return f

def read_chain(chainsdir, key):
    filename = base64.b16encode(key).upper()
    try:
        f = read_chain_open(chainsdir, filename)
    except IOError, e:
        f = read_chain_open(chainsdir, filename.lower())
    value = f.read()
    f.close()
    return value

def tlv_decode(data):
    (length,) = struct.unpack(">I", data[0:4])
    type = data[4:8]
    value = data[8:length]
    rest = data[length:]
    return (type, value, rest)

def tlv_encode(type, value):
    assert(len(type) == 4)
    data = struct.pack(">I", len(value) + 8) + type + value
    return data

def tlv_decodelist(data):
    l = []
    while len(data):
        (type, value, rest) = tlv_decode(data)
        l.append((type, value))
        data = rest
    return l

def tlv_encodelist(l):
    data = ""
    for (type, value) in l:
        data += tlv_encode(type, value)
    return data

def unwrap_entry(entry):
    ploplevel = tlv_decodelist(entry)
    assert(len(ploplevel) == 2)
    (ploptype, plopdata) = ploplevel[0]
    (plopchecksumtype, plopchecksum) = ploplevel[1]
    assert(ploptype == "PLOP")
    assert(plopchecksumtype == "S256")
    computedchecksum = hashlib.sha256(plopdata).digest()
    assert(computedchecksum == plopchecksum)
    return plopdata

def wrap_entry(entry):
    return tlv_encodelist([("PLOP", entry),
                        ("S256", hashlib.sha256(entry).digest())])

def verify_entry(verifycert, entry, hash):
    packed = unwrap_entry(entry)
    unpacked = tlv_decodelist(packed)
    (mtltype, mtl) = unpacked[0]
    assert hash == get_leaf_hash(mtl)
    assert mtltype == "MTL1"
    s = struct.pack(">I", len(packed)) + packed
    try:
        verifycert.stdin.write(s)
    except IOError, e:
        sys.stderr.write("merge: unable to write to verifycert process: ")
        while 1:
            line = verifycert.stdout.readline()
            if line:
                sys.stderr.write(line)
            else:
                sys.exit(1)
    result_length_packed = verifycert.stdout.read(4)
    (result_length,) = struct.unpack(">I", result_length_packed)
    result = verifycert.stdout.read(result_length)
    assert len(result) == result_length
    (error_code,) = struct.unpack("B", result[0:1])
    if error_code != 0:
        print >>sys.stderr, result[1:]
        sys.exit(1)