#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. import sys import struct import time import subprocess from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, write_chain, add_to_logorder from certtools import timing_point, build_merkle_tree def merge_fetch(args, config, localconfig): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" chainsdir = mergedb + "/chains" own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) timing = timing_point() logorder = get_logorder(logorderfile) timing_point(timing, "get logorder") certsinlog = set(logorder) new_entries_per_node = {} new_entries = set() entries_to_fetch = {} for storagenode in storagenodes: print >>sys.stderr, "getting new entries from", storagenode["name"] sys.stderr.flush() new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths)) new_entries.update(new_entries_per_node[storagenode["name"]]) entries_to_fetch[storagenode["name"]] = [] timing_point(timing, "get new entries") new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" sys.stderr.flush() if args.nomerge: sys.exit(0) for ehash in new_entries: for storagenode in storagenodes: if ehash in new_entries_per_node[storagenode["name"]]: entries_to_fetch[storagenode["name"]].append(ehash) break verifycert = subprocess.Popen( [paths["verifycert_bin"], paths["known_roots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) added_entries = 0 for storagenode in storagenodes: print >>sys.stderr, "getting %d entries from %s:" % \ (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), sys.stderr.flush() for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], own_key, paths, chunk) for ehash in chunk: entry = entries[ehash] verify_entry(verifycert, entry, ehash) write_chain(ehash, entry, chainsdir) add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 print >>sys.stderr, added_entries, sys.stderr.flush() print >>sys.stderr sys.stderr.flush() fsync_logorder(logorderfile) timing_point(timing, "add entries") print >>sys.stderr, "added", added_entries, "entries" sys.stderr.flush() verifycert.communicate(struct.pack("I", 0)) tree = build_merkle_tree(logorder) tree_size = len(logorder) root_hash = tree[-1][0] timestamp = int(time.time() * 1000) return (tree_size, root_hash, timestamp)