diff options
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-x | tools/merge.py | 177 |
1 files changed, 152 insertions, 25 deletions
diff --git a/tools/merge.py b/tools/merge.py index 9904b84..34d1697 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -38,7 +38,7 @@ localconfig = yaml.load(open(args.localconfig)) ctbaseurl = config["baseurl"] frontendnodes = config["frontendnodes"] storagenodes = config["storagenodes"] -secondaries = localconfig.get("secondary", []) +secondaries = config.get("mergenodes", []) paths = localconfig["paths"] mergedb = paths["mergedb"] @@ -54,8 +54,11 @@ logpublickey = get_public_key_from_file(paths["logpublickey"]) hashed_dir = True +def hexencode(key): + return base64.b16encode(key).lower() + def write_chain(key, value): - filename = base64.b16encode(key) + filename = hexencode(key) if hashed_dir: path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] try: @@ -70,7 +73,7 @@ def write_chain(key, value): def add_to_logorder(key): f = open(logorderfile, "a") - f.write(base64.b16encode(key) + "\n") + f.write(hexencode(key) + "\n") f.close() def fsync_logorder(): @@ -118,6 +121,20 @@ def get_curpos(node, baseurl): print >>sys.stderr, "ERROR: currentposition", e.read() sys.exit(1) +def get_verifiedsize(node, baseurl): + try: + result = http_request(baseurl + "catlfish/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"size"] + print >>sys.stderr, "ERROR: verifiedsize", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifiedsize", e.read() + sys.exit(1) + + + def sendlog(node, baseurl, submission): try: result = http_request(baseurl + "ct/frontend/sendlog", @@ -136,6 +153,24 @@ def sendlog(node, baseurl, submission): sys.stderr.flush() raise e +def backup_sendlog(node, baseurl, submission): + try: + result = http_request(baseurl + "catlfish/merge/sendlog", + json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendlog", e.read() + sys.stderr.flush() + return None + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def sendentry(node, baseurl, entry, hash): try: result = http_request(baseurl + "ct/frontend/sendentry", @@ -154,6 +189,24 @@ def sendentry(node, baseurl, entry, hash): sys.stderr.flush() raise e +def sendentry_merge(node, baseurl, entry, hash): + try: + result = http_request(baseurl + "catlfish/merge/sendentry", + json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, + verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendentry", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, hash + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def sendsth(node, baseurl, submission): try: result = http_request(baseurl + "ct/frontend/sendsth", @@ -171,6 +224,23 @@ def sendsth(node, baseurl, submission): sys.stderr.flush() raise e +def verifyroot(node, baseurl, treesize): + try: + result = http_request(baseurl + "catlfish/merge/verifyroot", + json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifyroot", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def get_missingentries(node, baseurl): try: result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) @@ -183,6 +253,18 @@ def get_missingentries(node, baseurl): print >>sys.stderr, "ERROR: missingentries", e.read() sys.exit(1) +def get_missingentriesforbackup(node, baseurl): + try: + result = http_request(baseurl + "catlfish/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"entries"] + print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() + sys.exit(1) + def chunks(l, n): return [l[i:i+n] for i in range(0, len(l), n)] @@ -257,30 +339,75 @@ root_hash = tree[-1][0] timestamp = int(time.time() * 1000) for secondary in secondaries: - remotehost = secondary["host"] - remotedir = remotehost + ":" + secondary["mergedir"] - localdir = mergedb - if localdir[:-1] != '/': - localdir = localdir + "/" - - print >>sys.stderr, "copying database to secondary:", remotehost + if secondary["name"] == config["primarymergenode"]: + continue + nodeaddress = "https://%s/" % secondary["address"] + nodename = secondary["name"] + timing = timing_point() + print >>sys.stderr, "backing up to node", nodename sys.stderr.flush() - rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir]) - if rsyncstatus: - print >>sys.stderr, "rsync failed:", rsyncstatus - sys.exit(1) - - print >>sys.stderr, "verifying database at secondary:", remotehost + verifiedsize = get_verifiedsize(nodename, nodeaddress) + timing_point(timing, "get verified size") + print >>sys.stderr, "verified size", verifiedsize sys.stderr.flush() - verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]], - stdout=subprocess.PIPE) - - (verifysecondaryresult, _) = verifysecondary.communicate() - - if root_hash != base64.b16decode(verifysecondaryresult.strip()): - print >>sys.stderr, "secondary root hash was", verifysecondaryresult.strip() - print >>sys.stderr, " expected", base64.b16encode(root_hash) + entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + print >>sys.stderr + timing_point(timing, "sendlog") + print >>sys.stderr, "log sent" + sys.stderr.flush() + missingentries = get_missingentriesforbackup(nodename, nodeaddress) + timing_point(timing, "get missing") + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + for missingentry in missingentries: + hash = base64.b64decode(missingentry) + sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "send sth:", sendentryresult + sys.exit(1) + fetched_entries += 1 + if added_entries % 1000 == 0: + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + verifyrootresult = verifyroot(nodename, nodeaddress, tree_size) + if verifyrootresult["result"] != "ok": + print >>sys.stderr, "verifyroot:", verifyrootresult + sys.exit(1) + secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + if root_hash != secondary_root_hash: + print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash) + print >>sys.stderr, " expected", hexencode(root_hash) sys.exit(1) + timing_point(timing, "verifyroot") + # XXX: set verifiedsize + if args.timing: + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() tree_head_signature = None for signingnode in signingnodes: @@ -307,7 +434,7 @@ if args.timing: print >>sys.stderr, timing["deltatimes"] sys.stderr.flush() -print base64.b16encode(root_hash) +print hexencode(root_hash) sys.stdout.flush() for frontendnode in frontendnodes: |