#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. # # Copy entries indicated by file 'fetched' to all secondary merge nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import select import requests import errno import logging from time import sleep from base64 import b64encode, b64decode from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): sendlogresult = \ backup_sendlog(nodename, nodeaddress, own_key, paths, {"start": verifiedsize, "hashes": chunk}) if sendlogresult == None: if trynumber == 1: return None select.select([], [], [], 10.0) logging.info("tries left: %d", trynumber) continue return sendlogresult sendlog_discover_chunksize = 100000 def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, statusupdates): logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: sys.exit(1) if sendlogresult["result"] != "ok": logging.error("backup_sendlog: %s", sendlogresult) sys.exit(1) verifiedsize += len(chunk) statusupdates.status("PROG sending log: %d" % verifiedsize) logging.info("log sent") def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, statusupdates): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: logging.info("about to send %d missing entries", len(missingentries)) fetched_entries = 0 with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [b64decode(missingentry) for missingentry in missingentry_chunk] hashes_and_entries = [(ehash, chainsdb.get(ehash)) for ehash in missingentry_hashes] sendentryresult = sendentries_merge(nodename, nodeaddress, own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": logging.error("sendentries_merge: %s", sendentryresult) sys.exit(1) fetched_entries += len(missingentry_hashes) statusupdates.status("PROG sending missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing): tree = build_merkle_tree(logorder[:tree_size]) root_hash = tree[-1][0] timing_point(timing, "build tree") verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) if verifyrootresult["result"] != "ok": logging.error("verifyroot: %s", verifyrootresult) sys.exit(1) secondary_root_hash = b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: logging.error("secondary root hash was %s while expected was %s", hexencode(secondary_root_hash), hexencode(root_hash)) sys.exit(1) timing_point(timing, "verifyroot") return root_hash def merge_backup(args, config, localconfig, secondaries): maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" statusfile = mergedb + "/merge_backup.status" s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() logging.info("backing up to node %s", nodename) verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) timing_point(timing, "get verified size") logging.info("verified size %d", verifiedsize) if verifiedsize == tree_size: root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) else: while verifiedsize < tree_size: uptopos = min(verifiedsize + maxwindow, tree_size) entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) timing_point(timing, "sendlog") fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) verifiedsize = uptopos setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: logging.debug("timing: merge_backup: %s", timing["deltatimes"]) return 0 def main(): """ Wait until 'fetched' exists and read it. Read 'logorder' up until what's indicated by 'fetched' and build the tree. Distribute entries to all secondaries, write tree size and tree head to 'backup.' files as each secondary is verified to have the entries. If `--mergeinterval', wait until 'fetched' is updated and read it and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] lockfile = mergedb + "/.merge_backup.lock" fetched_path = mergedb + "/fetched" loglevel = getattr(logging, args.loglevel.upper()) if args.mergeinterval is None: logging.basicConfig(level=loglevel) else: logging.basicConfig(filename=args.logdir + "/merge_backup.log", level=loglevel) if not flock_ex_or_fail(lockfile): logging.critical("unable to take lock %s", lockfile) return 1 all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: nodes = all_secondaries else: nodes = [n for n in all_secondaries if n["name"] in args.node] if args.mergeinterval is None: return merge_backup(args, config, localconfig, nodes) fetched_statinfo = waitforfile(fetched_path) while True: err = merge_backup(args, config, localconfig, nodes) if err: return err fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: sleep(max(3, args.mergeinterval / 10)) fetched_statinfo = stat(fetched_path) return 0 if __name__ == '__main__': sys.exit(main())