summaryrefslogtreecommitdiff
path: root/tools/merge.py
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-02-19 13:23:08 +0100
committerMagnus Ahltorp <map@kth.se>2015-02-19 13:23:08 +0100
commit1b8b22c59af8b0b9bdb282a6bc3949dfc853a2d1 (patch)
tree88d4c04fbfb522a4980f3d6ed8404738d4dd77bd /tools/merge.py
parent27f5d5042fdba70b97899d33e762666fb80415f1 (diff)
merge.py: Only ask node that actually has the entry.
Fetch multiple entries from storage node. Chunk sendlog.
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-xtools/merge.py91
1 files changed, 70 insertions, 21 deletions
diff --git a/tools/merge.py b/tools/merge.py
index 2b83f54..f4a007d 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -11,7 +11,7 @@ import urllib
import urllib2
import sys
import time
-from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file
+from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file, timing_point
parser = argparse.ArgumentParser(description="")
parser.add_argument("--baseurl", metavar="url", help="Base URL for CT server", required=True)
@@ -19,6 +19,7 @@ parser.add_argument("--frontend", action="append", metavar="url", help="Base URL
parser.add_argument("--storage", action="append", metavar="url", help="Base URL for storage server", required=True)
parser.add_argument("--mergedb", metavar="dir", help="Merge database directory", required=True)
parser.add_argument("--keyfile", metavar="keyfile", help="File containing log key", required=True)
+parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge")
args = parser.parse_args()
ctbaseurl = args.baseurl
@@ -56,23 +57,23 @@ def get_new_entries(baseurl):
result = urllib2.urlopen(baseurl + "ct/storage/fetchnewentries").read()
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"entries"]
+ return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
print "ERROR: fetchnewentries", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
print "ERROR: fetchnewentries", e.read()
sys.exit(1)
-def get_entry(baseurl, hash):
+def get_entries(baseurl, hashes):
try:
- params = urllib.urlencode({"hash":base64.b64encode(hash)})
+ params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True)
result = urllib2.urlopen(baseurl + "ct/storage/getentry?" + params).read()
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
- entries = parsed_result[u"entries"]
- assert len(entries) == 1
- assert base64.b64decode(entries[0]["hash"]) == hash
- return base64.b64decode(entries[0]["entry"])
+ entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
+ assert len(entries) == len(hashes)
+ assert set(entries.keys()) == set(hashes)
+ return entries
print "ERROR: getentry", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
@@ -151,23 +152,56 @@ def get_missingentries(baseurl):
print "ERROR: missingentries", e.read()
sys.exit(1)
+def chunks(l, n):
+ return [l[i:i+n] for i in range(0, len(l), n)]
+
+timing = timing_point()
logorder = get_logorder()
+
+timing_point(timing, "get logorder")
+
certsinlog = set(logorder)
-new_entries = [entry for storagenode in storagenodes for entry in get_new_entries(storagenode)]
+new_entries_per_node = {}
+new_entries = set()
+entries_to_fetch = {}
+
+for storagenode in storagenodes:
+ print "getting new entries from", storagenode
+ new_entries_per_node[storagenode] = set(get_new_entries(storagenode))
+ new_entries.update(new_entries_per_node[storagenode])
+ entries_to_fetch[storagenode] = []
+
+timing_point(timing, "get new entries")
+
+new_entries -= certsinlog
+
+print "adding", len(new_entries), "entries"
+
+if args.nomerge:
+ sys.exit(0)
+
+for hash in new_entries:
+ for storagenode in storagenodes:
+ if hash in new_entries_per_node[storagenode]:
+ entries_to_fetch[storagenode].append(hash)
+ break
+
-print "adding entries"
added_entries = 0
-for new_entry in new_entries:
- hash = base64.b64decode(new_entry)
- if hash not in certsinlog:
- entry = get_entry(storagenode, hash)
- write_chain(hash, entry)
- add_to_logorder(hash)
- logorder.append(hash)
- certsinlog.add(hash)
- added_entries += 1
+for storagenode in storagenodes:
+ print "getting", len(entries_to_fetch[storagenode]), "entries from", storagenode
+ for chunk in chunks(entries_to_fetch[storagenode], 100):
+ entries = get_entries(storagenode, chunk)
+ for hash in chunk:
+ entry = entries[hash]
+ write_chain(hash, entry)
+ add_to_logorder(hash)
+ logorder.append(hash)
+ certsinlog.add(hash)
+ added_entries += 1
+timing_point(timing, "add entries")
print "added", added_entries, "entries"
tree = build_merkle_tree(logorder)
@@ -185,18 +219,33 @@ sth = {"tree_size": tree_size, "timestamp": timestamp,
check_sth_signature(ctbaseurl, sth)
+timing_point(timing, "build sth")
+
+print timing["deltatimes"]
+
print "root hash", base64.b16encode(root_hash)
for frontendnode in frontendnodes:
+ timing = timing_point()
print "distributing for node", frontendnode
curpos = get_curpos(frontendnode)
+ timing_point(timing, "get curpos")
print "current position", curpos
entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
- sendlog(frontendnode, {"start": curpos, "hashes": entries})
+ for chunk in chunks(entries, 1000):
+ sendlog(frontendnode, {"start": curpos, "hashes": chunk})
+ curpos += len(chunk)
+ print curpos,
+ sys.stdout.flush()
+ timing_point(timing, "sendlog")
print "log sent"
missingentries = get_missingentries(frontendnode)
- print "missing entries:", missingentries
+ timing_point(timing, "get missing")
+ print "missing entries:", len(missingentries)
for missingentry in missingentries:
hash = base64.b64decode(missingentry)
sendentry(frontendnode, read_chain(hash), hash)
+ timing_point(timing, "send missing")
sendsth(frontendnode, sth)
+ timing_point(timing, "send sth")
+ print timing["deltatimes"]