#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. # # Copy entries indicated by file 'fetched' to all secondary merge nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import select import requests import errno import logging from time import sleep from base64 import b64encode, b64decode from os import stat from multiprocessing import Process, Pipe, active_children from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, \ Status, loginit, start_worker def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): sendlogresult = \ backup_sendlog(nodename, nodeaddress, own_key, paths, {"start": verifiedsize, "hashes": chunk}) if sendlogresult == None: if trynumber == 1: return None select.select([], [], [], 10.0) logging.info("tries left: %d", trynumber) continue return sendlogresult sendlog_discover_chunksize = 100000 def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, statusupdates): logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: sys.exit(1) if sendlogresult["result"] != "ok": logging.error("backup_sendlog: %s", sendlogresult) sys.exit(1) verifiedsize += len(chunk) statusupdates.status("PROG sending log: %d" % verifiedsize) logging.info("log sent") def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, statusupdates): missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: logging.info("about to send %d missing entries", len(missingentries)) fetched_entries = 0 with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [b64decode(missingentry) for missingentry in missingentry_chunk] hashes_and_entries = [(ehash, chainsdb.get(ehash)) for ehash in missingentry_hashes] sendentryresult = sendentries_merge(nodename, nodeaddress, own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": logging.error("sendentries_merge: %s", sendentryresult) sys.exit(1) fetched_entries += len(missingentry_hashes) statusupdates.status("PROG sending missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing): tree = build_merkle_tree(logorder[:tree_size]) root_hash = tree[-1][0] timing_point(timing, "build tree") verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, tree_size) if verifyrootresult["result"] != "ok": logging.error("verifyroot: %s", verifyrootresult) sys.exit(1) secondary_root_hash = b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: logging.error("secondary root hash was %s while expected was %s", hexencode(secondary_root_hash), hexencode(root_hash)) sys.exit(1) timing_point(timing, "verifyroot") return root_hash def do_send(backupargs): secondary, localconfig, chainsdb, logorder, s, timing = backupargs maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] nodename = secondary["name"] nodeaddress = "https://%s/" % secondary["address"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) tree_size = len(logorder) logging.info("backing up to node %s", nodename) verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) timing_point(timing, "get verified size") logging.info("verified size %d", verifiedsize) if verifiedsize == tree_size: root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) else: while verifiedsize < tree_size: uptopos = min(verifiedsize + maxwindow, tree_size) entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s) timing_point(timing, "sendlog") fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s) root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) verifiedsize = uptopos setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) return root_hash def update_backupfile(mergedb, nodename, tree_size, root_hash): backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) def merge_backup(args, config, localconfig, secondaries): failures = 0 paths = localconfig["paths"] mergedb = paths["mergedb"] chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" statusfile = mergedb + "/merge_backup.status" s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) timing_point(timing, "get nfetched") logorder = get_logorder(logorderfile, nfetched) tree_size = len(logorder) timing_point(timing, "get logorder") procs = {} for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue nodename = secondary["name"] timing = timing_point() backupargs = (secondary, localconfig, chainsdb, logorder, s, timing) if args.mergeinterval: name = 'backup_%s' % nodename p, pipe = start_worker(name, lambda cpipe, argv: cpipe.send(do_send(argv)), backupargs) procs[p] = (nodename, pipe) else: root_hash = do_send(backupargs) update_backupfile(mergedb, nodename, tree_size, root_hash) if args.mergeinterval: while True: for p in list(procs): if not p.is_alive(): p.join() nodename, pipe = procs[p] if p.exitcode == 0: root_hash = pipe.recv() update_backupfile(mergedb, nodename, tree_size, root_hash) else: logging.warning("%s failure: %d", nodename, p.exitcode) failures += 1 del procs[p] if not procs: break sleep(1) if args.timing: logging.debug("timing: merge_backup: %s", timing["deltatimes"]) return failures def main(): """ Wait until 'fetched' exists and read it. Read 'logorder' up until what's indicated by 'fetched' and build the tree. Distribute entries to all secondaries, in parallel if `--mergeinterval'. Write tree size and tree head to 'backup.' files as each secondary is verified to have the entries. If `--mergeinterval', wait until 'fetched' is updated and read it and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] lockfile = mergedb + "/.merge_backup.lock" fetched_path = mergedb + "/fetched" loginit(args, "merge_backup.log") if not flock_ex_or_fail(lockfile): logging.critical("unable to take lock %s", lockfile) return 1 all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] if len(args.node) == 0: nodes = all_secondaries else: nodes = [n for n in all_secondaries if n["name"] in args.node] create_ssl_context(cafile=paths["https_cacertfile"]) fetched_statinfo = waitforfile(fetched_path) while True: failures = merge_backup(args, config, localconfig, nodes) if not args.mergeinterval: break fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: sleep(max(3, args.mergeinterval / 10)) if failures > 0: break fetched_statinfo = stat(fetched_path) return 0 if __name__ == '__main__': sys.exit(main())