From 88c0aba850f0a79ecf92070f79c6dd3e95b8cc87 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 18 Jun 2015 09:20:14 +0100 Subject: Preliminary merge secondary support. Change merge db to lowercase. --- tools/compileconfig.py | 58 ++++++++++++---- tools/merge.py | 177 ++++++++++++++++++++++++++++++++++++++++++------- tools/mergetools.py | 14 ++-- 3 files changed, 207 insertions(+), 42 deletions(-) (limited to 'tools') diff --git a/tools/compileconfig.py b/tools/compileconfig.py index c48ba66..0e4a839 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -67,7 +67,7 @@ def parse_address(address): def get_node_config(nodename, config): nodetype = None nodeconfig = None - for t in ["frontendnodes", "storagenodes", "signingnodes"]: + for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]: for node in config[t]: if node["name"] == nodename: nodetype = t @@ -106,16 +106,31 @@ def gen_http_servers(nodetype, nodeconfig, bind_address, bind_publicaddress, bin elif nodetype == "signingnodes": return ([], [(Symbol("signing_https_api"), host, port, Symbol("signing"))]) + elif nodetype == "mergenodes": + return ([], + [(Symbol("frontend_https_api"), host, port, Symbol("frontend"))]) + else: + print >>sys.stderr, "unknown nodetype", nodetype + sys.exit(1) -def allowed_clients_frontend(mergenodenames): +def allowed_clients_frontend(mergenodenames, primarymergenode): return [ ("/ct/frontend/sendentry", mergenodenames), ("/ct/frontend/sendlog", mergenodenames), - ("/ct/frontend/sendsth", mergenodenames), + ("/ct/frontend/sendsth", [primarymergenode]), ("/ct/frontend/currentposition", mergenodenames), ("/ct/frontend/missingentries", mergenodenames), ] +def allowed_clients_mergesecondary(primarymergenode): + return [ + ("/catlfish/merge/sendentry", [primarymergenode]), + ("/catlfish/merge/sendlog", [primarymergenode]), + ("/catlfish/merge/verifyroot", [primarymergenode]), + ("/catlfish/merge/verifiedsize", [primarymergenode]), + ("/catlfish/merge/missingentries", [primarymergenode]), + ] + def allowed_clients_public(): noauth = Symbol("noauth") return [ @@ -129,10 +144,10 @@ def allowed_clients_public(): ("/ct/v1/get-roots", noauth), ] -def allowed_clients_signing(frontendnodenames, mergenodenames): +def allowed_clients_signing(frontendnodenames, primarymergenode): return [ ("/ct/signing/sct", frontendnodenames), - ("/ct/signing/sth", mergenodenames), + ("/ct/signing/sth", [primarymergenode]), ] def allowed_clients_storage(frontendnodenames, mergenodenames): @@ -182,11 +197,12 @@ def gen_config(nodename, config, localconfig): catlfishconfig = [] plopconfig = [] - if nodetype == "frontendnodes": + if nodetype in ("frontendnodes", "mergenodes"): catlfishconfig.append((Symbol("known_roots_path"), localconfig["paths"]["knownroots"])) + if nodetype == "frontendnodes": if "sctcaching" in options: catlfishconfig.append((Symbol("sctcache_root_path"), paths["db"] + "sctcache/")) - if localconfig["ratelimits"]: + if "ratelimits" in localconfig: ratelimits = map(parse_ratelimit, localconfig["ratelimits"].items()) catlfishconfig.append((Symbol("ratelimits"), ratelimits)) @@ -231,13 +247,23 @@ def gen_config(nodename, config, localconfig): (Symbol("sth_path"), paths["db"] + "sth"), (Symbol("entryhash_from_entry"), (Symbol("catlfish"), Symbol("entryhash_from_entry"))), + ] + if nodetype in ("frontendnodes", "mergenodes"): + plopconfig += [ (Symbol("verify_entry"), (Symbol("catlfish"), Symbol("verify_entry"))), ] + if nodetype == "mergenodes": + plopconfig += [ + (Symbol("verifiedsize_path"), paths["mergedb"] + "/verifiedsize"), + (Symbol("index_path"), paths["mergedb"] + "/logorder"), + (Symbol("entry_root_path"), paths["mergedb"] + "/chains/"), + ] signingnodes = config["signingnodes"] signingnodeaddresses = ["https://%s/ct/signing/" % node["address"] for node in config["signingnodes"]] mergenodenames = [node["name"] for node in config["mergenodes"]] + primarymergenode = config["primarymergenode"] storagenodeaddresses = ["https://%s/ct/storage/" % node["address"] for node in config["storagenodes"]] frontendnodenames = [node["name"] for node in config["frontendnodes"]] @@ -249,15 +275,21 @@ def gen_config(nodename, config, localconfig): plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses)) plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"])) services = [Symbol("ht")] - allowed_clients += allowed_clients_frontend(mergenodenames) + allowed_clients += allowed_clients_frontend(mergenodenames, primarymergenode) allowed_clients += allowed_clients_public() allowed_servers += allowed_servers_frontend([node["name"] for node in signingnodes], storagenodenames) elif nodetype == "storagenodes": allowed_clients += allowed_clients_storage(frontendnodenames, mergenodenames) services = [] elif nodetype == "signingnodes": - allowed_clients += allowed_clients_signing(frontendnodenames, mergenodenames) + allowed_clients += allowed_clients_signing(frontendnodenames, primarymergenode) services = [Symbol("sign")] + elif nodetype == "mergenodes": + storagenodenames = [node["name"] for node in config["storagenodes"]] + plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses)) + plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"])) + services = [Symbol("ht")] + allowed_clients += allowed_clients_mergesecondary(primarymergenode) plopconfig += [ (Symbol("publickey_path"), paths["publickeys"]), @@ -299,15 +331,17 @@ def gen_testmakefile(config, testmakefile, machines): configfile = open(testmakefile, "w") frontendnodenames = [node["name"] for node in config["frontendnodes"]] storagenodenames = [node["name"] for node in config["storagenodes"]] - signingnodename = [node["name"] for node in config["signingnodes"]] + signingnodenames = [node["name"] for node in config["signingnodes"]] + mergenodenames = [node["name"] for node in config["mergenodes"]] frontendnodeaddresses = [node["publicaddress"] for node in config["frontendnodes"]] storagenodeaddresses = [node["address"] for node in config["storagenodes"]] signingnodeaddresses = [node["address"] for node in config["signingnodes"]] + mergenodeaddresses = [node["address"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]] - print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodename) + print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodenames+mergenodenames) print >>configfile, "MACHINES=" + " ".join([str(e) for e in range(1, machines+1)]) - print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses) + print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses+mergenodeaddresses) print >>configfile, "BASEURL=" + config["baseurl"] configfile.close() diff --git a/tools/merge.py b/tools/merge.py index 9904b84..34d1697 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -38,7 +38,7 @@ localconfig = yaml.load(open(args.localconfig)) ctbaseurl = config["baseurl"] frontendnodes = config["frontendnodes"] storagenodes = config["storagenodes"] -secondaries = localconfig.get("secondary", []) +secondaries = config.get("mergenodes", []) paths = localconfig["paths"] mergedb = paths["mergedb"] @@ -54,8 +54,11 @@ logpublickey = get_public_key_from_file(paths["logpublickey"]) hashed_dir = True +def hexencode(key): + return base64.b16encode(key).lower() + def write_chain(key, value): - filename = base64.b16encode(key) + filename = hexencode(key) if hashed_dir: path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] try: @@ -70,7 +73,7 @@ def write_chain(key, value): def add_to_logorder(key): f = open(logorderfile, "a") - f.write(base64.b16encode(key) + "\n") + f.write(hexencode(key) + "\n") f.close() def fsync_logorder(): @@ -118,6 +121,20 @@ def get_curpos(node, baseurl): print >>sys.stderr, "ERROR: currentposition", e.read() sys.exit(1) +def get_verifiedsize(node, baseurl): + try: + result = http_request(baseurl + "catlfish/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"size"] + print >>sys.stderr, "ERROR: verifiedsize", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifiedsize", e.read() + sys.exit(1) + + + def sendlog(node, baseurl, submission): try: result = http_request(baseurl + "ct/frontend/sendlog", @@ -136,6 +153,24 @@ def sendlog(node, baseurl, submission): sys.stderr.flush() raise e +def backup_sendlog(node, baseurl, submission): + try: + result = http_request(baseurl + "catlfish/merge/sendlog", + json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendlog", e.read() + sys.stderr.flush() + return None + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def sendentry(node, baseurl, entry, hash): try: result = http_request(baseurl + "ct/frontend/sendentry", @@ -154,6 +189,24 @@ def sendentry(node, baseurl, entry, hash): sys.stderr.flush() raise e +def sendentry_merge(node, baseurl, entry, hash): + try: + result = http_request(baseurl + "catlfish/merge/sendentry", + json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, + verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendentry", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, hash + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def sendsth(node, baseurl, submission): try: result = http_request(baseurl + "ct/frontend/sendsth", @@ -171,6 +224,23 @@ def sendsth(node, baseurl, submission): sys.stderr.flush() raise e +def verifyroot(node, baseurl, treesize): + try: + result = http_request(baseurl + "catlfish/merge/verifyroot", + json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifyroot", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def get_missingentries(node, baseurl): try: result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) @@ -183,6 +253,18 @@ def get_missingentries(node, baseurl): print >>sys.stderr, "ERROR: missingentries", e.read() sys.exit(1) +def get_missingentriesforbackup(node, baseurl): + try: + result = http_request(baseurl + "catlfish/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"entries"] + print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() + sys.exit(1) + def chunks(l, n): return [l[i:i+n] for i in range(0, len(l), n)] @@ -257,30 +339,75 @@ root_hash = tree[-1][0] timestamp = int(time.time() * 1000) for secondary in secondaries: - remotehost = secondary["host"] - remotedir = remotehost + ":" + secondary["mergedir"] - localdir = mergedb - if localdir[:-1] != '/': - localdir = localdir + "/" - - print >>sys.stderr, "copying database to secondary:", remotehost + if secondary["name"] == config["primarymergenode"]: + continue + nodeaddress = "https://%s/" % secondary["address"] + nodename = secondary["name"] + timing = timing_point() + print >>sys.stderr, "backing up to node", nodename sys.stderr.flush() - rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir]) - if rsyncstatus: - print >>sys.stderr, "rsync failed:", rsyncstatus - sys.exit(1) - - print >>sys.stderr, "verifying database at secondary:", remotehost + verifiedsize = get_verifiedsize(nodename, nodeaddress) + timing_point(timing, "get verified size") + print >>sys.stderr, "verified size", verifiedsize sys.stderr.flush() - verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]], - stdout=subprocess.PIPE) - - (verifysecondaryresult, _) = verifysecondary.communicate() - - if root_hash != base64.b16decode(verifysecondaryresult.strip()): - print >>sys.stderr, "secondary root hash was", verifysecondaryresult.strip() - print >>sys.stderr, " expected", base64.b16encode(root_hash) + entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + print >>sys.stderr + timing_point(timing, "sendlog") + print >>sys.stderr, "log sent" + sys.stderr.flush() + missingentries = get_missingentriesforbackup(nodename, nodeaddress) + timing_point(timing, "get missing") + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + for missingentry in missingentries: + hash = base64.b64decode(missingentry) + sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "send sth:", sendentryresult + sys.exit(1) + fetched_entries += 1 + if added_entries % 1000 == 0: + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + verifyrootresult = verifyroot(nodename, nodeaddress, tree_size) + if verifyrootresult["result"] != "ok": + print >>sys.stderr, "verifyroot:", verifyrootresult + sys.exit(1) + secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + if root_hash != secondary_root_hash: + print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash) + print >>sys.stderr, " expected", hexencode(root_hash) sys.exit(1) + timing_point(timing, "verifyroot") + # XXX: set verifiedsize + if args.timing: + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() tree_head_signature = None for signingnode in signingnodes: @@ -307,7 +434,7 @@ if args.timing: print >>sys.stderr, timing["deltatimes"] sys.stderr.flush() -print base64.b16encode(root_hash) +print hexencode(root_hash) sys.stdout.flush() for frontendnode in frontendnodes: diff --git a/tools/mergetools.py b/tools/mergetools.py index 5cb36c4..9e84038 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -6,19 +6,23 @@ import struct from certtools import get_leaf_hash def parselogrow(row): - return base64.b16decode(row) + return base64.b16decode(row, casefold=True) def get_logorder(filename): f = open(filename, "r") return [parselogrow(row.rstrip()) for row in f] -def read_chain(chainsdir, key): - filename = base64.b16encode(key) +def read_chain_open(chainsdir, filename): path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] + f = open(path + "/" + filename, "r") + return f + +def read_chain(chainsdir, key): + filename = base64.b16encode(key).upper() try: - f = open(path + "/" + filename, "r") + f = read_chain_open(chainsdir, filename) except IOError, e: - f = open(chainsdir + "/" + filename, "r") + f = read_chain_open(chainsdir, filename.lower()) value = f.read() f.close() return value -- cgit v1.1