summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-06-11 17:45:14 +0200
committerMagnus Ahltorp <map@kth.se>2015-06-11 17:45:14 +0200
commitb8c86cf28520b7125aeda20adeee27f3a036055e (patch)
tree1ec0afdcb971ad79d29ef549535b289a8638b91f
parente109e12abb56414cbf64aad7a7e38c19062cdbb9 (diff)
Improve merge progress reportingmergeprogress
-rwxr-xr-xtools/merge.py123
1 files changed, 79 insertions, 44 deletions
diff --git a/tools/merge.py b/tools/merge.py
index 8766491..9904b84 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -84,10 +84,10 @@ def get_new_entries(node, baseurl):
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
- print "ERROR: fetchnewentries", parsed_result
+ print >>sys.stderr, "ERROR: fetchnewentries", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
- print "ERROR: fetchnewentries", e.read()
+ print >>sys.stderr, "ERROR: fetchnewentries", e.read()
sys.exit(1)
def get_entries(node, baseurl, hashes):
@@ -100,10 +100,10 @@ def get_entries(node, baseurl, hashes):
assert len(entries) == len(hashes)
assert set(entries.keys()) == set(hashes)
return entries
- print "ERROR: getentry", parsed_result
+ print >>sys.stderr, "ERROR: getentry", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
- print "ERROR: getentry", e.read()
+ print >>sys.stderr, "ERROR: getentry", e.read()
sys.exit(1)
def get_curpos(node, baseurl):
@@ -112,10 +112,10 @@ def get_curpos(node, baseurl):
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return parsed_result[u"position"]
- print "ERROR: currentposition", parsed_result
+ print >>sys.stderr, "ERROR: currentposition", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
- print "ERROR: currentposition", e.read()
+ print >>sys.stderr, "ERROR: currentposition", e.read()
sys.exit(1)
def sendlog(node, baseurl, submission):
@@ -124,14 +124,16 @@ def sendlog(node, baseurl, submission):
json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
- print "ERROR: sendlog", e.read()
+ print >>sys.stderr, "ERROR: sendlog", e.read()
+ sys.stderr.flush()
return None
except ValueError, e:
- print "==== FAILED REQUEST ===="
- print submission
- print "======= RESPONSE ======="
- print result
- print "========================"
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
raise e
def sendentry(node, baseurl, entry, hash):
@@ -141,14 +143,15 @@ def sendentry(node, baseurl, entry, hash):
verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
- print "ERROR: sendentry", e.read()
+ print >>sys.stderr, "ERROR: sendentry", e.read()
sys.exit(1)
except ValueError, e:
- print "==== FAILED REQUEST ===="
- print hash
- print "======= RESPONSE ======="
- print result
- print "========================"
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, hash
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
raise e
def sendsth(node, baseurl, submission):
@@ -157,14 +160,15 @@ def sendsth(node, baseurl, submission):
json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
- print "ERROR: sendsth", e.read()
+ print >>sys.stderr, "ERROR: sendsth", e.read()
sys.exit(1)
except ValueError, e:
- print "==== FAILED REQUEST ===="
- print submission
- print "======= RESPONSE ======="
- print result
- print "========================"
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
raise e
def get_missingentries(node, baseurl):
@@ -173,10 +177,10 @@ def get_missingentries(node, baseurl):
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return parsed_result[u"entries"]
- print "ERROR: missingentries", parsed_result
+ print >>sys.stderr, "ERROR: missingentries", parsed_result
sys.exit(1)
except urllib2.HTTPError, e:
- print "ERROR: missingentries", e.read()
+ print >>sys.stderr, "ERROR: missingentries", e.read()
sys.exit(1)
def chunks(l, n):
@@ -195,7 +199,8 @@ new_entries = set()
entries_to_fetch = {}
for storagenode in storagenodes:
- print "getting new entries from", storagenode["name"]
+ print >>sys.stderr, "getting new entries from", storagenode["name"]
+ sys.stderr.flush()
new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"]))
new_entries.update(new_entries_per_node[storagenode["name"]])
entries_to_fetch[storagenode["name"]] = []
@@ -206,7 +211,8 @@ timing_point(timing, "get new entries")
new_entries -= certsinlog
-print "adding", len(new_entries), "entries"
+print >>sys.stderr, "adding", len(new_entries), "entries"
+sys.stderr.flush()
if args.nomerge:
sys.exit(0)
@@ -222,7 +228,8 @@ verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
added_entries = 0
for storagenode in storagenodes:
- print "getting", len(entries_to_fetch[storagenode["name"]]), "entries from", storagenode["name"]
+ print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
+ sys.stderr.flush()
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk)
for hash in chunk:
@@ -233,9 +240,14 @@ for storagenode in storagenodes:
logorder.append(hash)
certsinlog.add(hash)
added_entries += 1
+ print >>sys.stderr, added_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
fsync_logorder()
timing_point(timing, "add entries")
-print "added", added_entries, "entries"
+print >>sys.stderr, "added", added_entries, "entries"
+sys.stderr.flush()
verifycert.communicate(struct.pack("I", 0))
@@ -252,12 +264,14 @@ for secondary in secondaries:
localdir = localdir + "/"
print >>sys.stderr, "copying database to secondary:", remotehost
+ sys.stderr.flush()
rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir])
if rsyncstatus:
print >>sys.stderr, "rsync failed:", rsyncstatus
sys.exit(1)
print >>sys.stderr, "verifying database at secondary:", remotehost
+ sys.stderr.flush()
verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]],
stdout=subprocess.PIPE)
@@ -275,7 +289,8 @@ for signingnode in signingnodes:
root_hash, "https://%s/" % signingnode["address"], key=own_key)
break
except urllib2.URLError, e:
- print e
+ print >>sys.stderr, e
+ sys.stderr.flush()
if tree_head_signature == None:
print >>sys.stderr, "Could not contact any signing nodes"
sys.exit(1)
@@ -289,19 +304,25 @@ check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
timing_point(timing, "build sth")
if args.timing:
- print timing["deltatimes"]
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
-print "root hash", base64.b16encode(root_hash)
+print base64.b16encode(root_hash)
+sys.stdout.flush()
for frontendnode in frontendnodes:
nodeaddress = "https://%s/" % frontendnode["address"]
nodename = frontendnode["name"]
timing = timing_point()
- print "distributing for node", nodename
+ print >>sys.stderr, "distributing for node", nodename
+ sys.stderr.flush()
curpos = get_curpos(nodename, nodeaddress)
timing_point(timing, "get curpos")
- print "current position", curpos
+ print >>sys.stderr, "current position", curpos
+ sys.stderr.flush()
entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
for chunk in chunks(entries, 1000):
for trynumber in range(5, 0, -1):
sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk})
@@ -309,31 +330,45 @@ for frontendnode in frontendnodes:
if trynumber == 1:
sys.exit(1)
select.select([], [], [], 10.0)
- print "tries left:", trynumber
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
continue
break
if sendlogresult["result"] != "ok":
- print "sendlog:", sendlogresult
+ print >>sys.stderr, "sendlog:", sendlogresult
sys.exit(1)
curpos += len(chunk)
- print curpos,
- sys.stdout.flush()
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
timing_point(timing, "sendlog")
- print "log sent"
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
missingentries = get_missingentries(nodename, nodeaddress)
timing_point(timing, "get missing")
- print "missing entries:", len(missingentries)
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+ fetched_entries = 0
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
for missingentry in missingentries:
hash = base64.b64decode(missingentry)
sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
if sendentryresult["result"] != "ok":
- print "send sth:", sendentryresult
+ print >>sys.stderr, "send sth:", sendentryresult
sys.exit(1)
+ fetched_entries += 1
+ if added_entries % 1000 == 0:
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
timing_point(timing, "send missing")
sendsthresult = sendsth(nodename, nodeaddress, sth)
if sendsthresult["result"] != "ok":
- print "send sth:", sendsthresult
+ print >>sys.stderr, "send sth:", sendsthresult
sys.exit(1)
timing_point(timing, "send sth")
if args.timing:
- print timing["deltatimes"]
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()