1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014-2015, NORDUnet A/S.
# See LICENSE for licensing information.
import sys
import struct
import time
import subprocess
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, write_chain, add_to_logorder
from certtools import timing_point, build_merkle_tree
def merge_fetch(args, config, localconfig):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
logorderfile = mergedb + "/logorder"
chainsdir = mergedb + "/chains"
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
timing = timing_point()
logorder = get_logorder(logorderfile)
timing_point(timing, "get logorder")
certsinlog = set(logorder)
new_entries_per_node = {}
new_entries = set()
entries_to_fetch = {}
for storagenode in storagenodes:
print >>sys.stderr, "getting new entries from", storagenode["name"]
sys.stderr.flush()
new_entries_per_node[storagenode["name"]] = \
set(get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
own_key, paths))
new_entries.update(new_entries_per_node[storagenode["name"]])
entries_to_fetch[storagenode["name"]] = []
timing_point(timing, "get new entries")
new_entries -= certsinlog
print >>sys.stderr, "adding", len(new_entries), "entries"
sys.stderr.flush()
if args.nomerge:
sys.exit(0)
for ehash in new_entries:
for storagenode in storagenodes:
if ehash in new_entries_per_node[storagenode["name"]]:
entries_to_fetch[storagenode["name"]].append(ehash)
break
verifycert = subprocess.Popen(
[paths["verifycert_bin"], paths["known_roots"]],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
added_entries = 0
for storagenode in storagenodes:
print >>sys.stderr, "getting %d entries from %s:" % \
(len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
sys.stderr.flush()
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
own_key, paths, chunk)
for ehash in chunk:
entry = entries[ehash]
verify_entry(verifycert, entry, ehash)
write_chain(ehash, entry, chainsdir)
add_to_logorder(logorderfile, ehash)
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
print >>sys.stderr, added_entries,
sys.stderr.flush()
print >>sys.stderr
sys.stderr.flush()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
print >>sys.stderr, "added", added_entries, "entries"
sys.stderr.flush()
verifycert.communicate(struct.pack("I", 0))
tree = build_merkle_tree(logorder)
tree_size = len(logorder)
root_hash = tree[-1][0]
timestamp = int(time.time() * 1000)
return (tree_size, root_hash, timestamp)
|