#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. # # Copy entries indicated by file 'fetched' to all secondary merge nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import select import requests import errno import logging import signal from time import sleep from base64 import b64encode, b64decode from os import stat from multiprocessing import Process, Pipe, active_children from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, \ Status, loginit, start_worker, terminate_child_procs def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): sendlogresult = \ backup_sendlog(nodename, nodeaddress, own_key, paths, {"start": verifiedsize, "hashes": chunk}) if sendlogresult == None: if trynumber == 1: return None select.select([], [], [], 10.0) logging.info("%s: tries left: %d", nodename, trynumber) continue return sendlogresult sendlog_discover_chunksize = 100000 def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, statusupdates): logging.info("%s: sending log", nodename) for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: sys.exit(1) if sendlogresult["result"] != "ok": logging.error("%s: backup_sendlog: %s", nodename, sendlogresult) sys.exit(1) verifiedsize += len(chunk) statusupdates.status("PROG sending log: %d" % verifiedsize) logging.info("%s: log sent", nodename) def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, statusupdates): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: logging.info("%s: about to send %d missing entries", nodename, len(missingentries)) fetched_entries = 0 with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [b64decode(missingentry) for missingentry in missingentry_chunk] hashes_and_entries = [(ehash, chainsdb.get(ehash)) for ehash in missingentry_hashes] sendentryresult = sendentries_merge(nodename, nodeaddress, own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": logging.error("%s: sendentries_merge: %s", nodename, sendentryresult) sys.exit(1) fetched_entries += len(missingentry_hashes) statusupdates.status("PROG sending missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing): tree = build_merkle_tree(logorder[:tree_size]) root_hash = tree[-1][0] timing_point(timing, "build tree") verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) if verifyrootresult["result"] != "ok": logging.error("%s: verifyroot: %s", nodename, verifyrootresult) sys.exit(1) secondary_root_hash = b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: logging.error("%s: secondary root hash was %s while expected was %s", nodename, hexencode(secondary_root_hash), hexencode(root_hash)) sys.exit(1) timing_point(timing, "verifyroot") return root_hash def do_send(backupargs): args, secondary, localconfig, chainsdb, logorder, s, timing = backupargs maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] nodename = secondary["name"] nodeaddress = "https://%s/" % secondary["address"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) tree_size = len(logorder) logging.info("%s: backing up", nodename) verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) timing_point(timing, "get verified size") logging.info("%s: verified size %d", nodename, verifiedsize) if verifiedsize == tree_size: root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) else: while verifiedsize < tree_size: uptopos = min(verifiedsize + maxwindow, tree_size) entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) timing_point(timing, "sendlog") fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) verifiedsize = uptopos setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) if args.timing: logging.debug("%s: timing: merge_backup: %s", nodename, timing["deltatimes"]) return root_hash def update_backupfile(mergedb, nodename, tree_size, root_hash): backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} logging.debug("%s: writing to %s: %s", nodename, backuppath, backupdata) write_file(backuppath, backupdata) def merge_backup(args, config, localconfig, secondaries): failures = 0 paths = localconfig["paths"] mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains", write_enabled=False) logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" statusfile = mergedb + "/merge_backup.status" s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") procs = {} for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue nodename = secondary["name"] timing = timing_point() backupargs = (args, secondary, localconfig, chainsdb, logorder, s, timing) if args.mergeinterval: name = 'backup_%s' % nodename p, pipe = start_worker(name, lambda cpipe, argv: cpipe.send(do_send(argv)), backupargs) procs[p] = (nodename, pipe) else: root_hash = do_send(backupargs) update_backupfile(mergedb, nodename, tree_size, root_hash) if args.mergeinterval: while True: for p in list(procs): if not p.is_alive(): p.join() nodename, pipe = procs[p] if p.exitcode == 0: root_hash = pipe.recv() update_backupfile(mergedb, nodename, tree_size, root_hash) else: logging.warning("%s failure: %d", nodename, p.exitcode) failures += 1 del procs[p] if not procs: break sleep(1) return failures def term(signal, arg): terminate_child_procs() sys.exit(1) def main(): """ Wait until 'fetched' exists and read it. Read 'logorder' up until what's indicated by 'fetched' and build the tree. Distribute entries to all secondaries, in parallel if `--mergeinterval'. Write tree size and tree head to 'backup.' files as each secondary is verified to have the entries. If `--mergeinterval', wait until 'fetched' is updated and read it and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] lockfile = mergedb + "/.merge_backup.lock" fetched_path = mergedb + "/fetched" loginit(args, "merge_backup.log") if not flock_ex_or_fail(lockfile): logging.critical("unable to take lock %s", lockfile) return 1 all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] if len(args.node) == 0: nodes = all_secondaries else: nodes = [n for n in all_secondaries if n["name"] in args.node] signal.signal(signal.SIGTERM, term) create_ssl_context(cafile=paths["https_cacertfile"]) fetched_statinfo = waitforfile(fetched_path) while True: failures = merge_backup(args, config, localconfig, nodes) if not args.mergeinterval: break fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: sleep(max(3, args.mergeinterval / 10)) if failures > 0: break fetched_statinfo = stat(fetched_path) return 0 if __name__ == '__main__': sys.exit(main())