#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. # # Fetch new entries from all storage nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import struct import subprocess import logging from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, parse_args, perm, flock_ex_or_fail, Status from certtools import timing_point, write_file, create_ssl_context def merge_fetch(args, config, localconfig): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" statusfile = mergedb + "/merge_fetch.status" s = Status(statusfile) chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) timing = timing_point() logorder = get_logorder(logorderfile) timing_point(timing, "get logorder") certsinlog = set(logorder) new_entries_per_node = {} new_entries = set() entries_to_fetch = {} for storagenode in storagenodes: logging.info("getting new entries from %s", storagenode["name"]) new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths)) new_entries.update(new_entries_per_node[storagenode["name"]]) entries_to_fetch[storagenode["name"]] = [] timing_point(timing, "get new entries") new_entries -= certsinlog logging.info("adding %d entries", len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: if ehash in new_entries_per_node[storagenode["name"]]: entries_to_fetch[storagenode["name"]].append(ehash) break verifycert = subprocess.Popen( [paths["verifycert_bin"], paths["known_roots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) added_entries = 0 for storagenode in storagenodes: nentries = len(entries_to_fetch[storagenode["name"]]) logging.info("getting %d entries from %s", nentries, storagenode["name"]) for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths, chunk) for ehash in chunk: entry = entries[ehash] verify_entry(verifycert, entry, ehash) chainsdb.add(ehash, entry) add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 s.status("INFO: getting %d entries from %s: %d" % (nentries, storagenode["name"], added_entries)) chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") logging.info("added %d entries", added_entries) verifycert.communicate(struct.pack("I", 0)) if args.timing: logging.debug("timing: merge_fetch: %s", timing["deltatimes"]) tree_size = len(logorder) if tree_size == 0: return (0, '') else: return (tree_size, logorder[tree_size-1]) def main(): """ Fetch new entries from all storage nodes, in sequence. Indicate the current position by writing the hash and its 'logorder' index, 0-based, to 'fetched'. Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" lockfile = mergedb + "/.merge_fetch.lock" loglevel = getattr(logging, args.loglevel.upper()) if args.mergeinterval is None: logging.basicConfig(level=loglevel) else: logging.basicConfig(filename=args.logdir + "/merge_fetch.log", level=loglevel) if not flock_ex_or_fail(lockfile): logging.critical("unable to take lock %s", lockfile) return 1 create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) if args.mergeinterval is None: break logging.debug("sleeping %d seconds", args.mergeinterval / 10) sleep(args.mergeinterval / 10) return 0 if __name__ == '__main__': sys.exit(main())