diff options
author | Magnus Ahltorp <map@kth.se> | 2016-11-02 13:10:37 +0100 |
---|---|---|
committer | Magnus Ahltorp <map@kth.se> | 2016-11-02 13:10:37 +0100 |
commit | b48c1689e36bdcc65a34b4ab12763478b072a716 (patch) | |
tree | f812f21492567b97aecba2c80dc0ce97a8e6eb6a /tools/merge_backup.py | |
parent | 6659a1c08dda0d6ec20f945e135b23b544db55a4 (diff) |
Change algorithm for merge backup and merge dist
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-x | tools/merge_backup.py | 166 |
1 files changed, 82 insertions, 84 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 05679a1..2c17d90 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -9,6 +9,7 @@ import base64 import select import requests from time import sleep +from base64 import b64encode, b64decode from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ @@ -30,8 +31,78 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): continue return sendlogresult +sendlog_discover_chunksize = 100000 +def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths): + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) + if sendlogresult == None: + sys.exit(1) + if sendlogresult["result"] != "ok": + print >>sys.stderr, "backup_sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + print >>sys.stderr + print >>sys.stderr, "log sent" + sys.stderr.flush() + +def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing): + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + fetched_entries = 0 + print >>sys.stderr, "sending missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] + sendentryresult = sendentries_merge(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentries_merge:", sendentryresult + sys.exit(1) + fetched_entries += len(missingentry_hashes) + #print >>sys.stderr, fetched_entries, + #sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + +def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing): + tree = build_merkle_tree(logorder[:tree_size]) + root_hash = tree[-1][0] + timing_point(timing, "build tree") + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + if verifyrootresult["result"] != "ok": + print >>sys.stderr, "verifyroot:", verifyrootresult + sys.exit(1) + secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + if root_hash != secondary_root_hash: + print >>sys.stderr, "secondary root hash was", \ + hexencode(secondary_root_hash) + print >>sys.stderr, " expected", hexencode(root_hash) + sys.exit(1) + timing_point(timing, "verifyroot") + return root_hash + def merge_backup(args, config, localconfig, secondaries): + maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -48,10 +119,6 @@ def merge_backup(args, config, localconfig, secondaries): tree_size = len(logorder) timing_point(timing, "get logorder") - tree = build_merkle_tree(logorder) - root_hash = tree[-1][0] - timing_point(timing, "build tree") - for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue @@ -65,92 +132,23 @@ def merge_backup(args, config, localconfig, secondaries): print >>sys.stderr, "verified size", verifiedsize sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - - print >>sys.stderr, "determining end of log:", - for chunk in chunks(entries, 100000): - sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) - if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) - if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - - if verifiedsize > 100000: - verifiedsize -= 100000 + if verifiedsize == tree_size: + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) else: - verifiedsize = 0 + while verifiedsize < tree_size: + uptopos = min(verifiedsize + maxwindow, tree_size) - timing_point(timing, "checklog") + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths) + timing_point(timing, "sendlog") - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) - if sendlogresult == None: - sys.exit(1) - if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing) - missingentries = get_missingentriesforbackup(nodename, nodeaddress, - own_key, paths) - timing_point(timing, "get missing") - - while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - with requests.sessions.Session() as session: - for missingentry_chunk in chunks(missingentries, 100): - missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] - hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] - sendentryresult = sendentries_merge(nodename, nodeaddress, - own_key, paths, - hashes_and_entries, session) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentries_merge:", sendentryresult - sys.exit(1) - fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - - missingentries = get_missingentriesforbackup(nodename, nodeaddress, - own_key, paths) - timing_point(timing, "get missing") - - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) - if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) - secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) - if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) - timing_point(timing, "verifyroot") + verifiedsize = uptopos + setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) - setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} |