summaryrefslogtreecommitdiff
path: root/tools/merge.py
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-06-18 09:20:14 +0100
committerLinus Nordberg <linus@nordu.net>2015-07-31 15:51:29 +0200
commit803adf14ec4116d2997feca38c4c259d5d47f71a (patch)
tree310a31650464d3eb69792f2b45ecf74b042ac615 /tools/merge.py
parentb6af0ebacb8cda356edc4769c2751eea06d43aea (diff)
Preliminary merge secondary support. Change merge db to lowercase.
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-xtools/merge.py177
1 files changed, 152 insertions, 25 deletions
diff --git a/tools/merge.py b/tools/merge.py
index 9904b84..34d1697 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -38,7 +38,7 @@ localconfig = yaml.load(open(args.localconfig))
ctbaseurl = config["baseurl"]
frontendnodes = config["frontendnodes"]
storagenodes = config["storagenodes"]
-secondaries = localconfig.get("secondary", [])
+secondaries = config.get("mergenodes", [])
paths = localconfig["paths"]
mergedb = paths["mergedb"]
@@ -54,8 +54,11 @@ logpublickey = get_public_key_from_file(paths["logpublickey"])
hashed_dir = True
+def hexencode(key):
+ return base64.b16encode(key).lower()
+
def write_chain(key, value):
- filename = base64.b16encode(key)
+ filename = hexencode(key)
if hashed_dir:
path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
try:
@@ -70,7 +73,7 @@ def write_chain(key, value):
def add_to_logorder(key):
f = open(logorderfile, "a")
- f.write(base64.b16encode(key) + "\n")
+ f.write(hexencode(key) + "\n")
f.close()
def fsync_logorder():
@@ -118,6 +121,20 @@ def get_curpos(node, baseurl):
print >>sys.stderr, "ERROR: currentposition", e.read()
sys.exit(1)
+def get_verifiedsize(node, baseurl):
+ try:
+ result = http_request(baseurl + "catlfish/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"size"]
+ print >>sys.stderr, "ERROR: verifiedsize", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifiedsize", e.read()
+ sys.exit(1)
+
+
+
def sendlog(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendlog",
@@ -136,6 +153,24 @@ def sendlog(node, baseurl, submission):
sys.stderr.flush()
raise e
+def backup_sendlog(node, baseurl, submission):
+ try:
+ result = http_request(baseurl + "catlfish/merge/sendlog",
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendlog", e.read()
+ sys.stderr.flush()
+ return None
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def sendentry(node, baseurl, entry, hash):
try:
result = http_request(baseurl + "ct/frontend/sendentry",
@@ -154,6 +189,24 @@ def sendentry(node, baseurl, entry, hash):
sys.stderr.flush()
raise e
+def sendentry_merge(node, baseurl, entry, hash):
+ try:
+ result = http_request(baseurl + "catlfish/merge/sendentry",
+ json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+ verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendentry", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, hash
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def sendsth(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendsth",
@@ -171,6 +224,23 @@ def sendsth(node, baseurl, submission):
sys.stderr.flush()
raise e
+def verifyroot(node, baseurl, treesize):
+ try:
+ result = http_request(baseurl + "catlfish/merge/verifyroot",
+ json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifyroot", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def get_missingentries(node, baseurl):
try:
result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
@@ -183,6 +253,18 @@ def get_missingentries(node, baseurl):
print >>sys.stderr, "ERROR: missingentries", e.read()
sys.exit(1)
+def get_missingentriesforbackup(node, baseurl):
+ try:
+ result = http_request(baseurl + "catlfish/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"entries"]
+ print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
+ sys.exit(1)
+
def chunks(l, n):
return [l[i:i+n] for i in range(0, len(l), n)]
@@ -257,30 +339,75 @@ root_hash = tree[-1][0]
timestamp = int(time.time() * 1000)
for secondary in secondaries:
- remotehost = secondary["host"]
- remotedir = remotehost + ":" + secondary["mergedir"]
- localdir = mergedb
- if localdir[:-1] != '/':
- localdir = localdir + "/"
-
- print >>sys.stderr, "copying database to secondary:", remotehost
+ if secondary["name"] == config["primarymergenode"]:
+ continue
+ nodeaddress = "https://%s/" % secondary["address"]
+ nodename = secondary["name"]
+ timing = timing_point()
+ print >>sys.stderr, "backing up to node", nodename
sys.stderr.flush()
- rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir])
- if rsyncstatus:
- print >>sys.stderr, "rsync failed:", rsyncstatus
- sys.exit(1)
-
- print >>sys.stderr, "verifying database at secondary:", remotehost
+ verifiedsize = get_verifiedsize(nodename, nodeaddress)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
sys.stderr.flush()
- verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]],
- stdout=subprocess.PIPE)
-
- (verifysecondaryresult, _) = verifysecondary.communicate()
-
- if root_hash != base64.b16decode(verifysecondaryresult.strip()):
- print >>sys.stderr, "secondary root hash was", verifysecondaryresult.strip()
- print >>sys.stderr, " expected", base64.b16encode(root_hash)
+ entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ select.select([], [], [], 10.0)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+ print >>sys.stderr
+ timing_point(timing, "sendlog")
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress)
+ timing_point(timing, "get missing")
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+ fetched_entries = 0
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
+ for missingentry in missingentries:
+ hash = base64.b64decode(missingentry)
+ sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "send sth:", sendentryresult
+ sys.exit(1)
+ fetched_entries += 1
+ if added_entries % 1000 == 0:
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+ verifyrootresult = verifyroot(nodename, nodeaddress, tree_size)
+ if verifyrootresult["result"] != "ok":
+ print >>sys.stderr, "verifyroot:", verifyrootresult
+ sys.exit(1)
+ secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ if root_hash != secondary_root_hash:
+ print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash)
+ print >>sys.stderr, " expected", hexencode(root_hash)
sys.exit(1)
+ timing_point(timing, "verifyroot")
+ # XXX: set verifiedsize
+ if args.timing:
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
tree_head_signature = None
for signingnode in signingnodes:
@@ -307,7 +434,7 @@ if args.timing:
print >>sys.stderr, timing["deltatimes"]
sys.stderr.flush()
-print base64.b16encode(root_hash)
+print hexencode(root_hash)
sys.stdout.flush()
for frontendnode in frontendnodes: