From 06c6290ac4f0507374dfbf703e6577dfe48dfae7 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Tue, 15 Mar 2016 12:52:51 +0100 Subject: Do detection of where log ends before sending new hashes Try to send entries until missing entries is empty --- tools/merge_backup.py | 100 +++++++++++++++++++++++++++++++++----------------- tools/mergetools.py | 5 ++- 2 files changed, 70 insertions(+), 35 deletions(-) diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 123347a..0c283e5 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -7,6 +7,7 @@ import sys import base64 import select +import requests from time import sleep from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context @@ -15,6 +16,21 @@ from mergetools import chunks, backup_sendlog, get_logorder, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args +def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): + for trynumber in range(5, 0, -1): + sendlogresult = \ + backup_sendlog(nodename, nodeaddress, own_key, paths, + {"start": verifiedsize, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + return None + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + return sendlogresult + + def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -27,6 +43,7 @@ def merge_backup(args, config, localconfig, secondaries): timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) + timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") @@ -49,21 +66,34 @@ def merge_backup(args, config, localconfig, secondaries): sys.stderr.flush() entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] + + print >>sys.stderr, "determining end of log:", + for chunk in chunks(entries, 100000): + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) + if sendlogresult == None: + print >>sys.stderr, "sendlog result was None" + sys.exit(1) + if sendlogresult["result"] != "ok": + print >>sys.stderr, "backup_sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + + if verifiedsize > 100000: + verifiedsize -= 100000 + else: + verifiedsize = 0 + + timing_point(timing, "checklog") + + entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] print >>sys.stderr, "sending log:", sys.stderr.flush() for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = \ - backup_sendlog(nodename, nodeaddress, own_key, paths, - {"start": verifiedsize, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) + if sendlogresult == None: + sys.exit(1) if sendlogresult["result"] != "ok": print >>sys.stderr, "backup_sendlog:", sendlogresult sys.exit(1) @@ -78,28 +108,32 @@ def merge_backup(args, config, localconfig, secondaries): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry_chunk in chunks(missingentries, 100): - missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] - hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes] - sendentryresult = sendentries_merge(nodename, nodeaddress, - own_key, paths, - hashes_and_entries) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if fetched_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes] + sendentryresult = sendentries_merge(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentry_merge:", sendentryresult + sys.exit(1) + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) diff --git a/tools/mergetools.py b/tools/mergetools.py index f6e8bd5..ec4fd2a 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -286,13 +286,14 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash): def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)]) -def sendentries_merge(node, baseurl, own_key, paths, entries): +def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] result = http_request( baseurl + "plop/v1/merge/sendentry", json.dumps(json_entries), - key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + key=own_key, verifynode=node, publickeydir=paths["publickeys"], + session=session) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: sendentry_merge", e.response -- cgit v1.1