#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. # # Copy entries indicated by file 'fetched' to all secondary merge nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import base64 import select import requests import errno import logging from time import sleep from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): sendlogresult = \ backup_sendlog(nodename, nodeaddress, own_key, paths, {"start": verifiedsize, "hashes": chunk}) if sendlogresult == None: if trynumber == 1: return None select.select([], [], [], 10.0) logging.info("tries left: %d", trynumber) continue return sendlogresult def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" statusfile = mergedb + "/merge_backup.status" s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") tree = build_merkle_tree(logorder) root_hash = tree[-1][0] timing_point(timing, "build tree") for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() logging.info("backing up to node %s", nodename) try: verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) except requests.exceptions.ConnectionError, e: logging.error("connection error when getting verified size from %s", nodename) return 1 timing_point(timing, "get verified size") logging.info("verified size %d", verifiedsize) entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] logging.info("determining end of log") for chunk in chunks(entries, 100000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) if sendlogresult == None: logging.error("sendlog result was None") return 1 if sendlogresult["result"] != "ok": logging.error("backup_sendlog: %s", sendlogresult) return 1 verifiedsize += len(chunk) s.status("INFO: determining end of log: %d" % verifiedsize) if verifiedsize > 100000: verifiedsize -= 100000 else: verifiedsize = 0 logging.info("end of log determined") timing_point(timing, "checklog") entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: return 1 if sendlogresult["result"] != "ok": logging.error("backup_sendlog: %s", sendlogresult) return 1 verifiedsize += len(chunk) s.status("INFO: sending log: %d" % verifiedsize) timing_point(timing, "sendlog") logging.info("log sent") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: logging.info("missing entries: %d", len(missingentries)) fetched_entries = 0 logging.info("fetching missing entries") with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] sendentryresult = sendentries_merge(nodename, nodeaddress, own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": logging.error("sendentry_merge: %s", sendentryresult) return 1 fetched_entries += len(missingentry_hashes) s.status("INFO: fetching missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") try: verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) except requests.exceptions.ConnectionError, e: logging.error("connection error when verifying root at %s", nodename) return 1 if verifyrootresult["result"] != "ok": logging.error("verifyroot: %s", verifyrootresult) return 1 secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: logging.error("secondary root hash was %s, expected %s", hexencode(secondary_root_hash), hexencode(root_hash)) return 1 timing_point(timing, "verifyroot") setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: logging.debug("timing: merge_backup: %s", timing["deltatimes"]) return 0 def main(): """ Wait until 'fetched' exists and read it. Read 'logorder' up until what's indicated by 'fetched' and build the tree. Distribute entries to all secondaries, write tree size and tree head to 'backup.' files as each secondary is verified to have the entries. If `--mergeinterval', wait until 'fetched' is updated and read it and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] lockfile = mergedb + "/.merge_backup.lock" fetched_path = mergedb + "/fetched" loglevel = getattr(logging, args.loglevel.upper()) if args.mergeinterval is None: logging.basicConfig(level=loglevel) else: logging.basicConfig(filename=args.logdir + "/merge_backup.log", level=loglevel) if not flock_ex_or_fail(lockfile): logging.critical("unable to take lock %s", lockfile) return 1 all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: nodes = all_secondaries else: nodes = [n for n in all_secondaries if n["name"] in args.node] if args.mergeinterval is None: return merge_backup(args, config, localconfig, nodes) fetched_statinfo = waitforfile(fetched_path) while True: err = merge_backup(args, config, localconfig, nodes) if err != 0: return err fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: sleep(max(3, args.mergeinterval / 10)) fetched_statinfo = stat(fetched_path) return 0 if __name__ == '__main__': sys.exit(main())