diff options
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-x | tools/merge_backup.py | 141 |
1 files changed, 84 insertions, 57 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 4f688c3..723fc7a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -11,13 +11,16 @@ import sys import base64 import select import requests +import errno +import logging from time import sleep +from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): if trynumber == 1: return None select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() + logging.info("tries left: %d", trynumber) continue return sendlogresult - def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" + statusfile = mergedb + "/merge_backup.status" + s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) @@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries): nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + logging.info("backing up to node %s", nodename) + try: + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when getting verified size from %s", nodename) + return 1 timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() + logging.info("verified size %d", verifiedsize) entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "determining end of log:", + logging.info("determining end of log") for chunk in chunks(entries, 100000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) + logging.error("sendlog result was None") + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() + s.status("INFO: determining end of log: %d" % verifiedsize) if verifiedsize > 100000: verifiedsize -= 100000 else: verifiedsize = 0 + logging.info("end of log determined") timing_point(timing, "checklog") entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: - sys.exit(1) + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sending log: %d" % verifiedsize) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + logging.info("log sent") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("missing entries: %d", len(missingentries)) fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() + logging.info("fetching missing entries") with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] @@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries): own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) + logging.error("sendentry_merge: %s", sendentryresult) + return 1 fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: fetching missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) + try: + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when verifying root at %s", nodename) + return 1 if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) + logging.error("verifyroot: %s", verifyrootresult) + return 1 secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) + logging.error("secondary root hash was %s, expected %s", + hexencode(secondary_root_hash), + hexencode(root_hash)) + return 1 timing_point(timing, "verifyroot") setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} - #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata + logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: - print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read logorder file up until what's indicated by fetched file and - build the tree. + Wait until 'fetched' exists and read it. + + Read 'logorder' up until what's indicated by 'fetched' and build the + tree. Distribute entries to all secondaries, write tree size and tree head - to backup.<secondary> files as each secondary is verified to have + to 'backup.<secondary>' files as each secondary is verified to have the entries. - Sleep some and start over. + If `--mergeinterval', wait until 'fetched' is updated and read it + and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_backup.lock" + fetched_path = mergedb + "/fetched" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_backup.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - paths = localconfig["paths"] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: @@ -187,12 +206,20 @@ def main(): else: nodes = [n for n in all_secondaries if n["name"] in args.node] + if args.mergeinterval is None: + return merge_backup(args, config, localconfig, nodes) + + fetched_statinfo = waitforfile(fetched_path) + while True: - merge_backup(args, config, localconfig, nodes) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + err = merge_backup(args, config, localconfig, nodes) + if err != 0: + return err + fetched_statinfo_old = fetched_statinfo + while fetched_statinfo == fetched_statinfo_old: + sleep(args.mergeinterval / 30) + fetched_statinfo = stat(fetched_path) + return 0 if __name__ == '__main__': sys.exit(main()) |