From 1b8b22c59af8b0b9bdb282a6bc3949dfc853a2d1 Mon Sep 17 00:00:00 2001
From: Magnus Ahltorp <map@kth.se>
Date: Thu, 19 Feb 2015 13:23:08 +0100
Subject: merge.py: Only ask node that actually has the entry. Fetch multiple
 entries from storage node. Chunk sendlog.

---
 tools/merge.py | 91 ++++++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 70 insertions(+), 21 deletions(-)

(limited to 'tools/merge.py')

diff --git a/tools/merge.py b/tools/merge.py
index 2b83f54..f4a007d 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -11,7 +11,7 @@ import urllib
 import urllib2
 import sys
 import time
-from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file
+from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file, timing_point
 
 parser = argparse.ArgumentParser(description="")
 parser.add_argument("--baseurl", metavar="url", help="Base URL for CT server", required=True)
@@ -19,6 +19,7 @@ parser.add_argument("--frontend", action="append", metavar="url", help="Base URL
 parser.add_argument("--storage", action="append", metavar="url", help="Base URL for storage server", required=True)
 parser.add_argument("--mergedb", metavar="dir", help="Merge database directory", required=True)
 parser.add_argument("--keyfile", metavar="keyfile", help="File containing log key", required=True)
+parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge")
 args = parser.parse_args()
 
 ctbaseurl = args.baseurl
@@ -56,23 +57,23 @@ def get_new_entries(baseurl):
         result = urllib2.urlopen(baseurl + "ct/storage/fetchnewentries").read()
         parsed_result = json.loads(result)
         if parsed_result.get(u"result") == u"ok":
-            return parsed_result[u"entries"]
+            return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
         print "ERROR: fetchnewentries", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
         print "ERROR: fetchnewentries", e.read()
         sys.exit(1)
 
-def get_entry(baseurl, hash):
+def get_entries(baseurl, hashes):
     try:
-        params = urllib.urlencode({"hash":base64.b64encode(hash)})
+        params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True)
         result = urllib2.urlopen(baseurl + "ct/storage/getentry?" + params).read()
         parsed_result = json.loads(result)
         if parsed_result.get(u"result") == u"ok":
-            entries = parsed_result[u"entries"]
-            assert len(entries) == 1
-            assert base64.b64decode(entries[0]["hash"]) == hash
-            return base64.b64decode(entries[0]["entry"])
+            entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
+            assert len(entries) == len(hashes)
+            assert set(entries.keys()) == set(hashes)
+            return entries
         print "ERROR: getentry", parsed_result
         sys.exit(1)
     except urllib2.HTTPError, e:
@@ -151,23 +152,56 @@ def get_missingentries(baseurl):
         print "ERROR: missingentries", e.read()
         sys.exit(1)
 
+def chunks(l, n):
+    return [l[i:i+n] for i in range(0, len(l), n)]
+
+timing = timing_point()
 
 logorder = get_logorder()
+
+timing_point(timing, "get logorder")
+
 certsinlog = set(logorder)
 
-new_entries = [entry for storagenode in storagenodes for entry in get_new_entries(storagenode)]
+new_entries_per_node = {}
+new_entries = set()
+entries_to_fetch = {}
+
+for storagenode in storagenodes:
+    print "getting new entries from", storagenode
+    new_entries_per_node[storagenode] = set(get_new_entries(storagenode))
+    new_entries.update(new_entries_per_node[storagenode])
+    entries_to_fetch[storagenode] = []
+
+timing_point(timing, "get new entries")
+
+new_entries -= certsinlog
+
+print "adding", len(new_entries), "entries"
+
+if args.nomerge:
+    sys.exit(0)
+
+for hash in new_entries:
+    for storagenode in storagenodes:
+        if hash in new_entries_per_node[storagenode]:
+            entries_to_fetch[storagenode].append(hash)
+            break
+
 
-print "adding entries"
 added_entries = 0
-for new_entry in new_entries:
-    hash = base64.b64decode(new_entry)
-    if hash not in certsinlog:
-        entry = get_entry(storagenode, hash)
-        write_chain(hash, entry)
-        add_to_logorder(hash)
-        logorder.append(hash)
-        certsinlog.add(hash)
-        added_entries += 1
+for storagenode in storagenodes:
+    print "getting", len(entries_to_fetch[storagenode]), "entries from", storagenode
+    for chunk in chunks(entries_to_fetch[storagenode], 100):
+        entries = get_entries(storagenode, chunk)
+        for hash in chunk:
+            entry = entries[hash]
+            write_chain(hash, entry)
+            add_to_logorder(hash)
+            logorder.append(hash)
+            certsinlog.add(hash)
+            added_entries += 1
+timing_point(timing, "add entries")
 print "added", added_entries, "entries"
 
 tree = build_merkle_tree(logorder)
@@ -185,18 +219,33 @@ sth = {"tree_size": tree_size, "timestamp": timestamp,
 
 check_sth_signature(ctbaseurl, sth)
 
+timing_point(timing, "build sth")
+
+print timing["deltatimes"]
+
 print "root hash", base64.b16encode(root_hash)
 
 for frontendnode in frontendnodes:
+    timing = timing_point()
     print "distributing for node", frontendnode
     curpos = get_curpos(frontendnode)
+    timing_point(timing, "get curpos")
     print "current position", curpos
     entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
-    sendlog(frontendnode, {"start": curpos, "hashes": entries})
+    for chunk in chunks(entries, 1000):
+        sendlog(frontendnode, {"start": curpos, "hashes": chunk})
+        curpos += len(chunk)
+        print curpos,
+        sys.stdout.flush()
+    timing_point(timing, "sendlog")
     print "log sent"
     missingentries = get_missingentries(frontendnode)
-    print "missing entries:", missingentries
+    timing_point(timing, "get missing")
+    print "missing entries:", len(missingentries)
     for missingentry in missingentries:
         hash = base64.b64decode(missingentry)
         sendentry(frontendnode, read_chain(hash), hash)
+    timing_point(timing, "send missing")
     sendsth(frontendnode, sth)
+    timing_point(timing, "send sth")
+    print timing["deltatimes"]
-- 
cgit v1.1