diff options
author | Linus Nordberg <linus@nordu.net> | 2016-07-11 15:00:29 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-07-11 15:00:29 +0200 |
commit | 3d4a9fdd338713c2f63da2b92940904762878d98 (patch) | |
tree | 2e8ee7375619d507f0f206be2c713aa12d17f048 /tools/merge_backup.py | |
parent | 1a36628401658def9ab9595f7cbcf72b8cb4eb6a (diff) | |
parent | bbf254d6d7f1708503f425c0eb8926af1b715b9c (diff) |
Merge remote-tracking branch 'refs/remotes/map/python-requests-chunked'
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-x | tools/merge_backup.py | 102 |
1 files changed, 69 insertions, 33 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 4fa4225..ac16a6a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -7,6 +7,7 @@ import sys import base64 import select +import requests from time import sleep from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context @@ -15,6 +16,21 @@ from mergetools import chunks, backup_sendlog, get_logorder, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args +def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): + for trynumber in range(5, 0, -1): + sendlogresult = \ + backup_sendlog(nodename, nodeaddress, own_key, paths, + {"start": verifiedsize, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + return None + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + return sendlogresult + + def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -27,6 +43,7 @@ def merge_backup(args, config, localconfig, secondaries): timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) + timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") @@ -49,21 +66,34 @@ def merge_backup(args, config, localconfig, secondaries): sys.stderr.flush() entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] + + print >>sys.stderr, "determining end of log:", + for chunk in chunks(entries, 100000): + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) + if sendlogresult == None: + print >>sys.stderr, "sendlog result was None" + sys.exit(1) + if sendlogresult["result"] != "ok": + print >>sys.stderr, "backup_sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + + if verifiedsize > 100000: + verifiedsize -= 100000 + else: + verifiedsize = 0 + + timing_point(timing, "checklog") + + entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] print >>sys.stderr, "sending log:", sys.stderr.flush() for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = \ - backup_sendlog(nodename, nodeaddress, own_key, paths, - {"start": verifiedsize, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) + if sendlogresult == None: + sys.exit(1) if sendlogresult["result"] != "ok": print >>sys.stderr, "backup_sendlog:", sendlogresult sys.exit(1) @@ -78,28 +108,34 @@ def merge_backup(args, config, localconfig, secondaries): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry_chunk in chunks(missingentries, 100): - missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] - hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes] - sendentryresult = sendentries_merge(nodename, nodeaddress, - own_key, paths, - hashes_and_entries) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if fetched_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes] + sendentryresult = sendentries_merge(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentry_merge:", sendentryresult + sys.exit(1) + fetched_entries += len(missingentry_hashes) + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) |