From 803adf14ec4116d2997feca38c4c259d5d47f71a Mon Sep 17 00:00:00 2001
From: Magnus Ahltorp <map@kth.se>
Date: Thu, 18 Jun 2015 09:20:14 +0100
Subject: Preliminary merge secondary support. Change merge db to lowercase.

---
 tools/compileconfig.py |  58 ++++++++++++----
 tools/merge.py         | 177 ++++++++++++++++++++++++++++++++++++++++++-------
 tools/mergetools.py    |  14 ++--
 3 files changed, 207 insertions(+), 42 deletions(-)

(limited to 'tools')

diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index c48ba66..0e4a839 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -67,7 +67,7 @@ def parse_address(address):
 def get_node_config(nodename, config):
     nodetype = None
     nodeconfig = None
-    for t in ["frontendnodes", "storagenodes", "signingnodes"]:
+    for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]:
         for node in config[t]:
             if node["name"] == nodename:
                 nodetype = t
@@ -106,16 +106,31 @@ def gen_http_servers(nodetype, nodeconfig, bind_address, bind_publicaddress, bin
     elif nodetype == "signingnodes":
         return ([],
                 [(Symbol("signing_https_api"), host, port, Symbol("signing"))])
+    elif nodetype == "mergenodes":
+        return ([],
+                [(Symbol("frontend_https_api"), host, port, Symbol("frontend"))])
+    else:
+        print >>sys.stderr, "unknown nodetype", nodetype
+        sys.exit(1)
 
-def allowed_clients_frontend(mergenodenames):
+def allowed_clients_frontend(mergenodenames, primarymergenode):
     return [
         ("/ct/frontend/sendentry", mergenodenames),
         ("/ct/frontend/sendlog", mergenodenames),
-        ("/ct/frontend/sendsth", mergenodenames),
+        ("/ct/frontend/sendsth", [primarymergenode]),
         ("/ct/frontend/currentposition", mergenodenames),
         ("/ct/frontend/missingentries", mergenodenames),
     ]
 
+def allowed_clients_mergesecondary(primarymergenode):
+    return [
+        ("/catlfish/merge/sendentry", [primarymergenode]),
+        ("/catlfish/merge/sendlog", [primarymergenode]),
+        ("/catlfish/merge/verifyroot", [primarymergenode]),
+        ("/catlfish/merge/verifiedsize", [primarymergenode]),
+        ("/catlfish/merge/missingentries", [primarymergenode]),
+    ]
+
 def allowed_clients_public():
     noauth = Symbol("noauth")
     return [
@@ -129,10 +144,10 @@ def allowed_clients_public():
         ("/ct/v1/get-roots", noauth),
     ]
 
-def allowed_clients_signing(frontendnodenames, mergenodenames):
+def allowed_clients_signing(frontendnodenames, primarymergenode):
     return [
         ("/ct/signing/sct", frontendnodenames),
-        ("/ct/signing/sth", mergenodenames),
+        ("/ct/signing/sth", [primarymergenode]),
     ]
 
 def allowed_clients_storage(frontendnodenames, mergenodenames):
@@ -182,11 +197,12 @@ def gen_config(nodename, config, localconfig):
     catlfishconfig = []
     plopconfig = []
 
-    if nodetype == "frontendnodes":
+    if nodetype in ("frontendnodes", "mergenodes"):
         catlfishconfig.append((Symbol("known_roots_path"), localconfig["paths"]["knownroots"]))
+    if nodetype == "frontendnodes":
         if "sctcaching" in options:
             catlfishconfig.append((Symbol("sctcache_root_path"), paths["db"] + "sctcache/"))
-        if localconfig["ratelimits"]:
+        if "ratelimits" in localconfig:
             ratelimits = map(parse_ratelimit, localconfig["ratelimits"].items())
             catlfishconfig.append((Symbol("ratelimits"), ratelimits))
 
@@ -231,13 +247,23 @@ def gen_config(nodename, config, localconfig):
             (Symbol("sth_path"), paths["db"] + "sth"),
             (Symbol("entryhash_from_entry"),
              (Symbol("catlfish"), Symbol("entryhash_from_entry"))),
+        ]
+    if nodetype in ("frontendnodes", "mergenodes"):
+        plopconfig += [
             (Symbol("verify_entry"),
              (Symbol("catlfish"), Symbol("verify_entry"))),
         ]
+    if nodetype == "mergenodes":
+        plopconfig += [
+            (Symbol("verifiedsize_path"), paths["mergedb"] + "/verifiedsize"),
+            (Symbol("index_path"), paths["mergedb"] + "/logorder"),
+            (Symbol("entry_root_path"), paths["mergedb"] + "/chains/"),
+            ]
 
     signingnodes = config["signingnodes"]
     signingnodeaddresses = ["https://%s/ct/signing/" % node["address"] for node in config["signingnodes"]]
     mergenodenames = [node["name"] for node in config["mergenodes"]]
+    primarymergenode = config["primarymergenode"]
     storagenodeaddresses = ["https://%s/ct/storage/" % node["address"] for node in config["storagenodes"]]
     frontendnodenames = [node["name"] for node in config["frontendnodes"]]
 
@@ -249,15 +275,21 @@ def gen_config(nodename, config, localconfig):
         plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses))
         plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"]))
         services = [Symbol("ht")]
-        allowed_clients += allowed_clients_frontend(mergenodenames)
+        allowed_clients += allowed_clients_frontend(mergenodenames, primarymergenode)
         allowed_clients += allowed_clients_public()
         allowed_servers += allowed_servers_frontend([node["name"] for node in signingnodes], storagenodenames)
     elif nodetype == "storagenodes":
         allowed_clients += allowed_clients_storage(frontendnodenames, mergenodenames)
         services = []
     elif nodetype == "signingnodes":
-        allowed_clients += allowed_clients_signing(frontendnodenames, mergenodenames)
+        allowed_clients += allowed_clients_signing(frontendnodenames, primarymergenode)
         services = [Symbol("sign")]
+    elif nodetype == "mergenodes":
+        storagenodenames = [node["name"] for node in config["storagenodes"]]
+        plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses))
+        plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"]))
+        services = [Symbol("ht")]
+        allowed_clients += allowed_clients_mergesecondary(primarymergenode)
 
     plopconfig += [
         (Symbol("publickey_path"), paths["publickeys"]),
@@ -299,15 +331,17 @@ def gen_testmakefile(config, testmakefile, machines):
     configfile = open(testmakefile, "w")
     frontendnodenames = [node["name"] for node in config["frontendnodes"]]
     storagenodenames = [node["name"] for node in config["storagenodes"]]
-    signingnodename = [node["name"] for node in config["signingnodes"]]
+    signingnodenames = [node["name"] for node in config["signingnodes"]]
+    mergenodenames = [node["name"] for node in config["mergenodes"]]
 
     frontendnodeaddresses = [node["publicaddress"] for node in config["frontendnodes"]]
     storagenodeaddresses = [node["address"] for node in config["storagenodes"]]
     signingnodeaddresses = [node["address"] for node in config["signingnodes"]]
+    mergenodeaddresses = [node["address"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]]
 
-    print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodename)
+    print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodenames+mergenodenames)
     print >>configfile, "MACHINES=" + " ".join([str(e) for e in range(1, machines+1)])
-    print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses)
+    print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses+mergenodeaddresses)
     print >>configfile, "BASEURL=" + config["baseurl"]
 
     configfile.close()
diff --git a/tools/merge.py b/tools/merge.py
index 9904b84..34d1697 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -38,7 +38,7 @@ localconfig = yaml.load(open(args.localconfig))
 ctbaseurl = config["baseurl"]
 frontendnodes = config["frontendnodes"]
 storagenodes = config["storagenodes"]
-secondaries = localconfig.get("secondary", [])
+secondaries = config.get("mergenodes", [])
 paths = localconfig["paths"]
 mergedb = paths["mergedb"]
 
@@ -54,8 +54,11 @@ logpublickey = get_public_key_from_file(paths["logpublickey"])
 
 hashed_dir = True
 
+def hexencode(key):
+    return base64.b16encode(key).lower()
+
 def write_chain(key, value):
-    filename = base64.b16encode(key)
+    filename = hexencode(key)
     if hashed_dir:
         path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
         try:
@@ -70,7 +73,7 @@ def write_chain(key, value):
 
 def add_to_logorder(key):
     f = open(logorderfile, "a")
-    f.write(base64.b16encode(key) + "\n")
+    f.write(hexencode(key) + "\n")
     f.close()
 
 def fsync_logorder():
@@ -118,6 +121,20 @@ def get_curpos(node, baseurl):
         print >>sys.stderr, "ERROR: currentposition", e.read()
         sys.exit(1)
 
+def get_verifiedsize(node, baseurl):
+    try:
+        result = http_request(baseurl + "catlfish/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+        parsed_result = json.loads(result)
+        if parsed_result.get(u"result") == u"ok":
+            return parsed_result[u"size"]
+        print >>sys.stderr, "ERROR: verifiedsize", parsed_result
+        sys.exit(1)
+    except urllib2.HTTPError, e:
+        print >>sys.stderr, "ERROR: verifiedsize", e.read()
+        sys.exit(1)
+
+
+        
 def sendlog(node, baseurl, submission):
     try:
         result = http_request(baseurl + "ct/frontend/sendlog",
@@ -136,6 +153,24 @@ def sendlog(node, baseurl, submission):
         sys.stderr.flush()
         raise e
 
+def backup_sendlog(node, baseurl, submission):
+    try:
+        result = http_request(baseurl + "catlfish/merge/sendlog",
+            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+        return json.loads(result)
+    except urllib2.HTTPError, e:
+        print >>sys.stderr, "ERROR: sendlog", e.read()
+        sys.stderr.flush()
+        return None
+    except ValueError, e:
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, submission
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
+        raise e
+
 def sendentry(node, baseurl, entry, hash):
     try:
         result = http_request(baseurl + "ct/frontend/sendentry",
@@ -154,6 +189,24 @@ def sendentry(node, baseurl, entry, hash):
         sys.stderr.flush()
         raise e
 
+def sendentry_merge(node, baseurl, entry, hash):
+    try:
+        result = http_request(baseurl + "catlfish/merge/sendentry",
+            json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+            verifynode=node, publickeydir=paths["publickeys"])
+        return json.loads(result)
+    except urllib2.HTTPError, e:
+        print >>sys.stderr, "ERROR: sendentry", e.read()
+        sys.exit(1)
+    except ValueError, e:
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, hash
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
+        raise e
+
 def sendsth(node, baseurl, submission):
     try:
         result = http_request(baseurl + "ct/frontend/sendsth",
@@ -171,6 +224,23 @@ def sendsth(node, baseurl, submission):
         sys.stderr.flush()
         raise e
 
+def verifyroot(node, baseurl, treesize):
+    try:
+        result = http_request(baseurl + "catlfish/merge/verifyroot",
+            json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+        return json.loads(result)
+    except urllib2.HTTPError, e:
+        print >>sys.stderr, "ERROR: verifyroot", e.read()
+        sys.exit(1)
+    except ValueError, e:
+        print >>sys.stderr, "==== FAILED REQUEST ===="
+        print >>sys.stderr, submission
+        print >>sys.stderr, "======= RESPONSE ======="
+        print >>sys.stderr, result
+        print >>sys.stderr, "========================"
+        sys.stderr.flush()
+        raise e
+
 def get_missingentries(node, baseurl):
     try:
         result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
@@ -183,6 +253,18 @@ def get_missingentries(node, baseurl):
         print >>sys.stderr, "ERROR: missingentries", e.read()
         sys.exit(1)
 
+def get_missingentriesforbackup(node, baseurl):
+    try:
+        result = http_request(baseurl + "catlfish/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+        parsed_result = json.loads(result)
+        if parsed_result.get(u"result") == u"ok":
+            return parsed_result[u"entries"]
+        print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
+        sys.exit(1)
+    except urllib2.HTTPError, e:
+        print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
+        sys.exit(1)
+
 def chunks(l, n):
     return [l[i:i+n] for i in range(0, len(l), n)]
 
@@ -257,30 +339,75 @@ root_hash = tree[-1][0]
 timestamp = int(time.time() * 1000)
 
 for secondary in secondaries:
-    remotehost = secondary["host"]
-    remotedir = remotehost + ":" + secondary["mergedir"]
-    localdir = mergedb
-    if localdir[:-1] != '/':
-        localdir = localdir + "/"
-
-    print >>sys.stderr, "copying database to secondary:", remotehost
+    if secondary["name"] == config["primarymergenode"]:
+        continue
+    nodeaddress = "https://%s/" % secondary["address"]
+    nodename = secondary["name"]
+    timing = timing_point()
+    print >>sys.stderr, "backing up to node", nodename
     sys.stderr.flush()
-    rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir])
-    if rsyncstatus:
-        print >>sys.stderr, "rsync failed:", rsyncstatus
-        sys.exit(1)
-
-    print >>sys.stderr, "verifying database at secondary:", remotehost
+    verifiedsize = get_verifiedsize(nodename, nodeaddress)
+    timing_point(timing, "get verified size")
+    print >>sys.stderr, "verified size", verifiedsize
     sys.stderr.flush()
-    verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]],
-                                    stdout=subprocess.PIPE)
-
-    (verifysecondaryresult, _) = verifysecondary.communicate()
-
-    if root_hash != base64.b16decode(verifysecondaryresult.strip()):
-        print >>sys.stderr, "secondary root hash was", verifysecondaryresult.strip()
-        print >>sys.stderr, "  expected", base64.b16encode(root_hash)
+    entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
+    print >>sys.stderr, "sending log:",
+    sys.stderr.flush()
+    for chunk in chunks(entries, 1000):
+        for trynumber in range(5, 0, -1):
+            sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk})
+            if sendlogresult == None:
+                if trynumber == 1:
+                    sys.exit(1)
+                select.select([], [], [], 10.0)
+                print >>sys.stderr, "tries left:", trynumber
+                sys.stderr.flush()
+                continue
+            break
+        if sendlogresult["result"] != "ok":
+            print >>sys.stderr, "sendlog:", sendlogresult
+            sys.exit(1)
+        verifiedsize += len(chunk)
+        print >>sys.stderr, verifiedsize,
+        sys.stderr.flush()
+    print >>sys.stderr
+    timing_point(timing, "sendlog")
+    print >>sys.stderr, "log sent"
+    sys.stderr.flush()
+    missingentries = get_missingentriesforbackup(nodename, nodeaddress)
+    timing_point(timing, "get missing")
+    print >>sys.stderr, "missing entries:", len(missingentries)
+    sys.stderr.flush()
+    fetched_entries = 0
+    print >>sys.stderr, "fetching missing entries",
+    sys.stderr.flush()
+    for missingentry in missingentries:
+        hash = base64.b64decode(missingentry)
+        sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
+        if sendentryresult["result"] != "ok":
+            print >>sys.stderr, "send sth:", sendentryresult
+            sys.exit(1)
+        fetched_entries += 1
+        if added_entries % 1000 == 0:
+            print >>sys.stderr, fetched_entries,
+            sys.stderr.flush()
+    print >>sys.stderr
+    sys.stderr.flush()
+    timing_point(timing, "send missing")
+    verifyrootresult = verifyroot(nodename, nodeaddress, tree_size)
+    if verifyrootresult["result"] != "ok":
+        print >>sys.stderr, "verifyroot:", verifyrootresult
+        sys.exit(1)
+    secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+    if root_hash != secondary_root_hash:
+        print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash)
+        print >>sys.stderr, "  expected", hexencode(root_hash)
         sys.exit(1)
+    timing_point(timing, "verifyroot")
+    # XXX: set verifiedsize
+    if args.timing:
+        print >>sys.stderr, timing["deltatimes"]
+        sys.stderr.flush()
 
 tree_head_signature = None
 for signingnode in signingnodes:
@@ -307,7 +434,7 @@ if args.timing:
     print >>sys.stderr, timing["deltatimes"]
     sys.stderr.flush()
 
-print base64.b16encode(root_hash)
+print hexencode(root_hash)
 sys.stdout.flush()
 
 for frontendnode in frontendnodes:
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 5cb36c4..9e84038 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -6,19 +6,23 @@ import struct
 from certtools import get_leaf_hash
 
 def parselogrow(row):
-    return base64.b16decode(row)
+    return base64.b16decode(row, casefold=True)
 
 def get_logorder(filename):
     f = open(filename, "r")
     return [parselogrow(row.rstrip()) for row in f]
 
-def read_chain(chainsdir, key):
-    filename = base64.b16encode(key)
+def read_chain_open(chainsdir, filename):
     path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
+    f = open(path + "/" + filename, "r")
+    return f
+
+def read_chain(chainsdir, key):
+    filename = base64.b16encode(key).upper()
     try:
-        f = open(path + "/" + filename, "r")
+        f = read_chain_open(chainsdir, filename)
     except IOError, e:
-        f = open(chainsdir + "/" + filename, "r")
+        f = read_chain_open(chainsdir, filename.lower())
     value = f.read()
     f.close()
     return value
-- 
cgit v1.1