#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2016, NORDUnet A/S. # See LICENSE for licensing information. # # Fetch new entries from all storage nodes. # See catlfish/doc/merge.txt for more about the merge process. # import sys import struct import subprocess import requests from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, parse_args, perm from certtools import timing_point, write_file, create_ssl_context, http_request import json def reportstatus(statusservers, own_key, target, variable, status): for statusserver in statusservers: do_reportstatus(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, variable, status) def do_reportstatus(node, baseurl, own_key, target, variable, status): try: result = http_request(baseurl + "plop/v1/status/merge_fetch", json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key, verifynode=node) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: reportstatus", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, target, variable, status print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, e.response print >>sys.stderr, "========================" sys.stderr.flush() raise e def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) assert(localconfig["nodename"] == config["primarymergenode"]) statusservers = config.get("statusservers") timing = timing_point() logorder = get_logorder(logorderfile) timing_point(timing, "get logorder") certsinlog = set(logorder) new_entries_per_node = {} new_entries = set() entries_to_fetch = {} for storagenode in storagenodes: try: print >>sys.stderr, "getting new entries from", storagenode["name"] sys.stderr.flush() new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths)) new_entries.update(new_entries_per_node[storagenode["name"]]) entries_to_fetch[storagenode["name"]] = [] except requests.exceptions.ConnectionError: pass timing_point(timing, "get new entries") new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" sys.stderr.flush() reportstatus(statusservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: if storagenode["name"] not in new_entries_per_node: continue if ehash in new_entries_per_node[storagenode["name"]]: entries_to_fetch[storagenode["name"]].append(ehash) break verifycert = subprocess.Popen( [paths["verifycert_bin"], paths["knownroots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) noncommitted = 0 added_entries = 0 for storagenode in storagenodes: if storagenode["name"] not in entries_to_fetch: continue print >>sys.stderr, "getting %d entries from %s:" % \ (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), sys.stderr.flush() with requests.sessions.Session() as session: for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths, chunk, session=session) for ehash in chunk: entry = entries[ehash] verify_entry(verifycert, entry, ehash) chainsdb.add(ehash, entry) noncommitted += 1 add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 if noncommitted >= 1000: chainsdb.commit() fsync_logorder(logorderfile) noncommitted = 0 tree_size = len(logorder) currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])} write_file(currentsizefile, currentsize) reportstatus(statusservers, own_key, "fetch", "fetched", tree_size) print >>sys.stderr, added_entries, sys.stderr.flush() print >>sys.stderr sys.stderr.flush() chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") print >>sys.stderr, "added", added_entries, "entries" sys.stderr.flush() verifycert.communicate(struct.pack("I", 0)) if args.timing: print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"] sys.stderr.flush() tree_size = len(logorder) reportstatus(statusservers, own_key, "fetch", "fetched", tree_size) if tree_size == 0: return (0, '') else: return (tree_size, logorder[tree_size-1]) def main(): """ Fetch new entries from all storage nodes. Indicate current position by writing the index in the logorder file (0-based) to the 'fetched' file. Sleep some and start over. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize write_file(currentsizefile, currentsize) if args.interval is None: break print >>sys.stderr, "sleeping", args.interval, "seconds" sleep(args.interval) if __name__ == '__main__': sys.exit(main())