From 5f16e0b43df17d556905e0885abed28cef8b8e29 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 19 Feb 2015 13:23:08 +0100 Subject: merge.py: Only ask node that actually has the entry. Fetch multiple entries from storage node. Chunk sendlog. --- tools/merge.py | 91 ++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 21 deletions(-) diff --git a/tools/merge.py b/tools/merge.py index 2b83f54..f4a007d 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -11,7 +11,7 @@ import urllib import urllib2 import sys import time -from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file +from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file, timing_point parser = argparse.ArgumentParser(description="") parser.add_argument("--baseurl", metavar="url", help="Base URL for CT server", required=True) @@ -19,6 +19,7 @@ parser.add_argument("--frontend", action="append", metavar="url", help="Base URL parser.add_argument("--storage", action="append", metavar="url", help="Base URL for storage server", required=True) parser.add_argument("--mergedb", metavar="dir", help="Merge database directory", required=True) parser.add_argument("--keyfile", metavar="keyfile", help="File containing log key", required=True) +parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge") args = parser.parse_args() ctbaseurl = args.baseurl @@ -56,23 +57,23 @@ def get_new_entries(baseurl): result = urllib2.urlopen(baseurl + "ct/storage/fetchnewentries").read() parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": - return parsed_result[u"entries"] + return [base64.b64decode(entry) for entry in parsed_result[u"entries"]] print "ERROR: fetchnewentries", parsed_result sys.exit(1) except urllib2.HTTPError, e: print "ERROR: fetchnewentries", e.read() sys.exit(1) -def get_entry(baseurl, hash): +def get_entries(baseurl, hashes): try: - params = urllib.urlencode({"hash":base64.b64encode(hash)}) + params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True) result = urllib2.urlopen(baseurl + "ct/storage/getentry?" + params).read() parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": - entries = parsed_result[u"entries"] - assert len(entries) == 1 - assert base64.b64decode(entries[0]["hash"]) == hash - return base64.b64decode(entries[0]["entry"]) + entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) + assert len(entries) == len(hashes) + assert set(entries.keys()) == set(hashes) + return entries print "ERROR: getentry", parsed_result sys.exit(1) except urllib2.HTTPError, e: @@ -151,23 +152,56 @@ def get_missingentries(baseurl): print "ERROR: missingentries", e.read() sys.exit(1) +def chunks(l, n): + return [l[i:i+n] for i in range(0, len(l), n)] + +timing = timing_point() logorder = get_logorder() + +timing_point(timing, "get logorder") + certsinlog = set(logorder) -new_entries = [entry for storagenode in storagenodes for entry in get_new_entries(storagenode)] +new_entries_per_node = {} +new_entries = set() +entries_to_fetch = {} + +for storagenode in storagenodes: + print "getting new entries from", storagenode + new_entries_per_node[storagenode] = set(get_new_entries(storagenode)) + new_entries.update(new_entries_per_node[storagenode]) + entries_to_fetch[storagenode] = [] + +timing_point(timing, "get new entries") + +new_entries -= certsinlog + +print "adding", len(new_entries), "entries" + +if args.nomerge: + sys.exit(0) + +for hash in new_entries: + for storagenode in storagenodes: + if hash in new_entries_per_node[storagenode]: + entries_to_fetch[storagenode].append(hash) + break + -print "adding entries" added_entries = 0 -for new_entry in new_entries: - hash = base64.b64decode(new_entry) - if hash not in certsinlog: - entry = get_entry(storagenode, hash) - write_chain(hash, entry) - add_to_logorder(hash) - logorder.append(hash) - certsinlog.add(hash) - added_entries += 1 +for storagenode in storagenodes: + print "getting", len(entries_to_fetch[storagenode]), "entries from", storagenode + for chunk in chunks(entries_to_fetch[storagenode], 100): + entries = get_entries(storagenode, chunk) + for hash in chunk: + entry = entries[hash] + write_chain(hash, entry) + add_to_logorder(hash) + logorder.append(hash) + certsinlog.add(hash) + added_entries += 1 +timing_point(timing, "add entries") print "added", added_entries, "entries" tree = build_merkle_tree(logorder) @@ -185,18 +219,33 @@ sth = {"tree_size": tree_size, "timestamp": timestamp, check_sth_signature(ctbaseurl, sth) +timing_point(timing, "build sth") + +print timing["deltatimes"] + print "root hash", base64.b16encode(root_hash) for frontendnode in frontendnodes: + timing = timing_point() print "distributing for node", frontendnode curpos = get_curpos(frontendnode) + timing_point(timing, "get curpos") print "current position", curpos entries = [base64.b64encode(entry) for entry in logorder[curpos:]] - sendlog(frontendnode, {"start": curpos, "hashes": entries}) + for chunk in chunks(entries, 1000): + sendlog(frontendnode, {"start": curpos, "hashes": chunk}) + curpos += len(chunk) + print curpos, + sys.stdout.flush() + timing_point(timing, "sendlog") print "log sent" missingentries = get_missingentries(frontendnode) - print "missing entries:", missingentries + timing_point(timing, "get missing") + print "missing entries:", len(missingentries) for missingentry in missingentries: hash = base64.b64decode(missingentry) sendentry(frontendnode, read_chain(hash), hash) + timing_point(timing, "send missing") sendsth(frontendnode, sth) + timing_point(timing, "send sth") + print timing["deltatimes"] -- cgit v1.1