diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-25 10:54:17 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-25 10:54:17 +0100 |
commit | 9f436ce835aba793cc06525e160bd2e07dd8f7cd (patch) | |
tree | 4a6a0a74cd6b054b51a6ab8f1767caf820970d59 /tools/merge_dist.py | |
parent | 034c40cc84f28fd970fc649ffe7eb7fe797479a6 (diff) | |
parent | ccfe7c55a5d1658c0f98aac2c45e76444dcd0bc2 (diff) |
Merge remote-tracking branch 'refs/remotes/map/robust-distribution'
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-x | tools/merge_dist.py | 134 |
1 files changed, 82 insertions, 52 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py index 2af1d6c..ded25a1 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -6,14 +6,78 @@ import sys import json +import base64 +import requests from time import sleep from base64 import b64encode, b64decode from certtools import timing_point, \ create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ - sendsth, sendlog, sendentry, parse_args, perm + publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \ + frontend_verify_entries + +def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths): + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = sendlog(nodename, nodeaddress, + own_key, paths, + {"start": curpos, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + sleep(10) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + curpos += len(chunk) + print >>sys.stderr, curpos, + sys.stderr.flush() + print >>sys.stderr + print >>sys.stderr, "log sent" + sys.stderr.flush() + +def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing): + missingentries = get_missingentries(nodename, nodeaddress, own_key, + paths) + timing_point(timing, "get missing") + + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + sent_entries = 0 + print >>sys.stderr, "sending missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] + sendentryresult = sendentries(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentries:", sendentryresult + sys.exit(1) + sent_entries += len(missingentry_hashes) + print >>sys.stderr, sent_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentries(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + def merge_dist(args, localconfig, frontendnodes, timestamp): + maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -53,62 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): print >>sys.stderr, "current position", curpos sys.stderr.flush() - entries = [b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, - own_key, paths, - {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - sleep(10) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get verified size") + print >>sys.stderr, "verified size", verifiedsize - missingentries = get_missingentries(nodename, nodeaddress, own_key, - paths) - timing_point(timing, "get missing") + assert verifiedsize >= curpos - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - sent_entries = 0 - print >>sys.stderr, "send missing entries", - sys.stderr.flush() - for missingentry in missingentries: - ehash = b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, - chainsdb.get(ehash), ehash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry:", sendentryresult - sys.exit(1) - sent_entries += 1 - if sent_entries % 1000 == 0: - print >>sys.stderr, sent_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") + while verifiedsize < len(logorder): + uptopos = min(verifiedsize + maxwindow, len(logorder)) + + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths) + timing_point(timing, "sendlog") + + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing) + verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos) + print >>sys.stderr, "sending sth to node", nodename sys.stderr.flush() - sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "sendsth:", sendsthresult + publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth) + if publishsthresult["result"] != "ok": + print >>sys.stderr, "publishsth:", publishsthresult sys.exit(1) timing_point(timing, "send sth") |