1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
import datetime
import time
import base64
import struct
import hashlib
import ecdsa
try:
from certkeys import publickeys
import ecdsa
from Crypto.Hash import SHA256
import Crypto.PublicKey.RSA as RSA
from Crypto.Signature import PKCS1_v1_5
except:
print "Some imports failed, some functionality may be unavailable"
class UTC(datetime.tzinfo):
def utcoffset(self, dt):
return datetime.timedelta(hours=0)
def dst(self, dt):
return datetime.timedelta(0)
def bits(n):
p = 0
while n > 0:
n >>= 1
p += 1
return p
def merkle_height(n):
if n == 0:
return 1
return bits(n - 1)
def node_above((pathp, pathl), levels=1):
return (pathp >> levels, pathl + levels)
def node_even((pathp, pathl)):
return pathp & 1 == 0
def node_odd((pathp, pathl)):
return pathp & 1 == 1
def node_lower((path1p, path1l), (path2p, path2l)):
return path1l < path2l
def node_higher((path1p, path1l), (path2p, path2l)):
return path1l > path2l
def node_level((path1p, path1l)):
return path1l
def node_outside((path1p, path1l), (path2p, path2l)):
assert path1l == path2l
return path1p > path2p
def internal_hash(pair):
if len(pair) == 1:
return pair[0]
else:
hash = hashlib.sha256()
hash.update(struct.pack(">b", 1))
hash.update(pair[0])
hash.update(pair[1])
digest = hash.digest()
return digest
def time_str(ts = None):
if ts is None:
return datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
else:
return datetime.datetime.fromtimestamp(ts / 1000, UTC()).strftime("%Y-%m-%d %H:%M:%S")
def unpack_tls_array(packed_data, length_len):
padded_length = ["\x00"] * 8
padded_length[-length_len:] = packed_data[:length_len]
(length,) = struct.unpack(">Q", "".join(padded_length))
unpacked_data = packed_data[length_len:length_len+length]
assert len(unpacked_data) == length, \
"data is only %d bytes long, but length is %d bytes" % \
(len(unpacked_data), length)
rest_data = packed_data[length_len+length:]
return (unpacked_data, rest_data)
def decode_signature(signature):
(hash_alg, signature_alg) = struct.unpack(">bb", signature[0:2])
(unpacked_signature, rest) = unpack_tls_array(signature[2:], 2)
assert rest == ""
return (hash_alg, signature_alg, unpacked_signature)
def check_signature(baseurl, signature, data, publickey=None):
if publickey == None:
if baseurl in publickeys:
publickey = base64.decodestring(publickeys[baseurl])
else:
print >>sys.stderr, "Public key for", baseurl, \
"not found, specify key file with --publickey"
# sys.exit(1)
raise Exception
(hash_alg, signature_alg, unpacked_signature) = decode_signature(signature)
assert hash_alg == 4, \
"hash_alg is %d, expected 4" % (hash_alg,) # sha256
assert (signature_alg == 3 or signature_alg == 1), \
"signature_alg is %d, expected 1 or 3" % (signature_alg,) # ecdsa
if signature_alg == 3:
vk = ecdsa.VerifyingKey.from_der(publickey)
vk.verify(unpacked_signature, data, hashfunc=hashlib.sha256,
sigdecode=ecdsa.util.sigdecode_der)
else:
h = SHA256.new(data)
rsa_key = RSA.importKey(publickey)
verifier = PKCS1_v1_5.new(rsa_key)
assert verifier.verify(h, unpacked_signature), \
"could not verify RSA signature"
def check_sth_signature(baseurl, sth, publickey=None):
signature = base64.decodestring(sth["tree_head_signature"])
version = struct.pack(">b", 0)
signature_type = struct.pack(">b", 1)
timestamp = struct.pack(">Q", sth["timestamp"])
tree_size = struct.pack(">Q", sth["tree_size"])
hash = base64.decodestring(sth["sha256_root_hash"])
tree_head = version + signature_type + timestamp + tree_size + hash
check_signature(baseurl, signature, tree_head, publickey=publickey)
def verify_consistency_proof(consistency_proof, first, second, oldhash_input):
if 2 ** bits(first - 1) == first:
consistency_proof = [oldhash_input] + consistency_proof
chain = zip(nodes_for_subtree(first, second), consistency_proof)
assert len(nodes_for_subtree(first, second)) == len(consistency_proof)
(_, hash) = reduce(lambda e1, e2: combine_two_hashes(e1, e2, second), chain)
(_, oldhash) = reduce(lambda e1, e2: combine_two_hashes(e1, e2, first), chain)
return (oldhash, hash)
def nodes_for_subtree(subtreesize, treesize):
height = merkle_height(treesize)
nodes = []
level = 0
pos = subtreesize
while pos > 0 and pos & 1 == 0:
pos >>= 1
level += 1
if pos & 1:
nodes.append((pos ^ 1, level))
#print pos, level
while level < height:
pos_level0 = pos * (2 ** level)
#print pos, level
if pos_level0 < treesize:
nodes.append((pos, level))
pos >>= 1
pos ^= 1
level += 1
return nodes
def combine_two_hashes((path1, hash1), (path2, hash2), treesize):
assert not node_higher(path1, path2)
edge_node = (treesize - 1, 0)
if node_lower(path1, path2):
assert path1 == node_above(edge_node, levels=node_level(path1))
while node_even(path1):
path1 = node_above(path1)
assert node_above(path1) == node_above(path2)
assert (node_even(path1) and node_odd(path2)) or (node_odd(path1) and node_even(path2))
if node_outside(path2, node_above(edge_node, levels=node_level(path2))):
return (node_above(path1), hash1)
if node_even(path1):
newhash = internal_hash((hash1, hash2))
else:
newhash = internal_hash((hash2, hash1))
return (node_above(path1), newhash)
def unpack_mtl(merkle_tree_leaf):
version = merkle_tree_leaf[0:1]
leaf_type = merkle_tree_leaf[1:2]
timestamped_entry = merkle_tree_leaf[2:]
(timestamp, entry_type) = struct.unpack(">QH", timestamped_entry[0:10])
if entry_type == 0:
issuer_key_hash = None
(leafcert, rest_entry) = unpack_tls_array(timestamped_entry[10:], 3)
elif entry_type == 1:
issuer_key_hash = timestamped_entry[10:42]
(leafcert, rest_entry) = unpack_tls_array(timestamped_entry[42:], 3)
return (leafcert, timestamp, issuer_key_hash)
|