diff options
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-x | tools/merge_backup.py | 121 |
1 files changed, 71 insertions, 50 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py index e7cce26..f25b22a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -8,17 +8,19 @@ # See catlfish/doc/merge.txt for more about the merge process. # import sys -import base64 import select import requests +import errno +import logging from time import sleep from base64 import b64encode, b64decode +from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -29,57 +31,49 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): if trynumber == 1: return None select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() + logging.info("tries left: %d", trynumber) continue return sendlogresult sendlog_discover_chunksize = 100000 -def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths): - print >>sys.stderr, "sending log:", - sys.stderr.flush() +def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, + statusupdates): + logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: sys.exit(1) if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult + logging.error("backup_sendlog: %s", sendlogresult) sys.exit(1) verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr - print >>sys.stderr, "log sent" - sys.stderr.flush() + statusupdates.status("PROG sending log: %d" % verifiedsize) + logging.info("log sent") -def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing): +def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, + timing, statusupdates): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("about to send %d missing entries", len(missingentries)) fetched_entries = 0 - print >>sys.stderr, "sending missing entries", - sys.stderr.flush() with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): - missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] - hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] + missingentry_hashes = [b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(ehash, chainsdb.get(ehash)) for ehash in missingentry_hashes] sendentryresult = sendentries_merge(nodename, nodeaddress, own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentries_merge:", sendentryresult + logging.error("sendentries_merge: %s", sendentryresult) sys.exit(1) fetched_entries += len(missingentry_hashes) - #print >>sys.stderr, fetched_entries, - #sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + statusupdates.status("PROG sending missing entries: %d" % + fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, @@ -93,17 +87,16 @@ def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timin verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult + logging.error("verifyroot: %s", verifyrootresult) sys.exit(1) - secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + secondary_root_hash = b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) + logging.error("secondary root hash was %s while expected was %s", + hexencode(secondary_root_hash), hexencode(root_hash)) sys.exit(1) timing_point(timing, "verifyroot") return root_hash - + def merge_backup(args, config, localconfig, secondaries): maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] @@ -114,6 +107,8 @@ def merge_backup(args, config, localconfig, secondaries): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" + statusfile = mergedb + "/merge_backup.status" + s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) @@ -128,12 +123,10 @@ def merge_backup(args, config, localconfig, secondaries): nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() + logging.info("backing up to node %s", nodename) verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() + logging.info("verified size %d", verifiedsize) if verifiedsize == tree_size: root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) @@ -142,10 +135,10 @@ def merge_backup(args, config, localconfig, secondaries): uptopos = min(verifiedsize + maxwindow, tree_size) entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] - sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths) + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) timing_point(timing, "sendlog") - fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing) + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) @@ -155,29 +148,48 @@ def merge_backup(args, config, localconfig, secondaries): backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} - #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata + logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: - print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read logorder file up until what's indicated by fetched file and - build the tree. + Wait until 'fetched' exists and read it. + + Read 'logorder' up until what's indicated by 'fetched' and build the + tree. Distribute entries to all secondaries, write tree size and tree head - to backup.<secondary> files as each secondary is verified to have + to 'backup.<secondary>' files as each secondary is verified to have the entries. - Sleep some and start over. + If `--mergeinterval', wait until 'fetched' is updated and read it + and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_backup.lock" + fetched_path = mergedb + "/fetched" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_backup.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - paths = localconfig["paths"] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: @@ -185,12 +197,21 @@ def main(): else: nodes = [n for n in all_secondaries if n["name"] in args.node] + if args.mergeinterval is None: + return merge_backup(args, config, localconfig, nodes) + + fetched_statinfo = waitforfile(fetched_path) + while True: - merge_backup(args, config, localconfig, nodes) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + err = merge_backup(args, config, localconfig, nodes) + if err: + return err + fetched_statinfo_old = fetched_statinfo + while fetched_statinfo == fetched_statinfo_old: + sleep(max(3, args.mergeinterval / 10)) + fetched_statinfo = stat(fetched_path) + + return 0 if __name__ == '__main__': sys.exit(main()) |