diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-23 17:09:48 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-23 17:09:48 +0100 |
commit | 19a2a611a839c0318f58347e2d93943c8e2401a5 (patch) | |
tree | 18cd302161a88d4546b39792a4bff6b1ade95c77 /tools/merge_backup.py | |
parent | 27e368196ce65e109c027987c706a697356f7bc5 (diff) |
WIP
Merge can run as four separate processes, plus a fifth controlling
proces 'merge'.
Tests are limited to testcase1.py and they're failing because of the
test with the dead merge secondary. Tests are also time consuming
because they're waiting for 60s each time a merge needs to be
verified. This could be improved by peeking at the control files, for
example.
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-x | tools/merge_backup.py | 141 |
1 files changed, 84 insertions, 57 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 4f688c3..723fc7a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -11,13 +11,16 @@ import sys import base64 import select import requests +import errno +import logging from time import sleep +from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): if trynumber == 1: return None select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() + logging.info("tries left: %d", trynumber) continue return sendlogresult - def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" + statusfile = mergedb + "/merge_backup.status" + s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) @@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries): nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + logging.info("backing up to node %s", nodename) + try: + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when getting verified size from %s", nodename) + return 1 timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() + logging.info("verified size %d", verifiedsize) entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "determining end of log:", + logging.info("determining end of log") for chunk in chunks(entries, 100000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) + logging.error("sendlog result was None") + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() + s.status("INFO: determining end of log: %d" % verifiedsize) if verifiedsize > 100000: verifiedsize -= 100000 else: verifiedsize = 0 + logging.info("end of log determined") timing_point(timing, "checklog") entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: - sys.exit(1) + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sending log: %d" % verifiedsize) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + logging.info("log sent") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("missing entries: %d", len(missingentries)) fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() + logging.info("fetching missing entries") with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] @@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries): own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) + logging.error("sendentry_merge: %s", sendentryresult) + return 1 fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: fetching missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) + try: + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when verifying root at %s", nodename) + return 1 if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) + logging.error("verifyroot: %s", verifyrootresult) + return 1 secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) + logging.error("secondary root hash was %s, expected %s", + hexencode(secondary_root_hash), + hexencode(root_hash)) + return 1 timing_point(timing, "verifyroot") setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} - #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata + logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: - print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read logorder file up until what's indicated by fetched file and - build the tree. + Wait until 'fetched' exists and read it. + + Read 'logorder' up until what's indicated by 'fetched' and build the + tree. Distribute entries to all secondaries, write tree size and tree head - to backup.<secondary> files as each secondary is verified to have + to 'backup.<secondary>' files as each secondary is verified to have the entries. - Sleep some and start over. + If `--mergeinterval', wait until 'fetched' is updated and read it + and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_backup.lock" + fetched_path = mergedb + "/fetched" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_backup.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - paths = localconfig["paths"] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: @@ -187,12 +206,20 @@ def main(): else: nodes = [n for n in all_secondaries if n["name"] in args.node] + if args.mergeinterval is None: + return merge_backup(args, config, localconfig, nodes) + + fetched_statinfo = waitforfile(fetched_path) + while True: - merge_backup(args, config, localconfig, nodes) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + err = merge_backup(args, config, localconfig, nodes) + if err != 0: + return err + fetched_statinfo_old = fetched_statinfo + while fetched_statinfo == fetched_statinfo_old: + sleep(args.mergeinterval / 30) + fetched_statinfo = stat(fetched_path) + return 0 if __name__ == '__main__': sys.exit(main()) |