diff options
Diffstat (limited to 'tools/merge_fetch.py')
-rw-r--r-- | tools/merge_fetch.py | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py new file mode 100644 index 0000000..a0a0396 --- /dev/null +++ b/tools/merge_fetch.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import struct +import time +import subprocess +from mergetools import get_logorder, verify_entry, get_new_entries, \ + chunks, fsync_logorder, get_entries, write_chain, add_to_logorder +from certtools import timing_point, build_merkle_tree + +def merge_fetch(args, config, localconfig): + paths = localconfig["paths"] + storagenodes = config["storagenodes"] + mergedb = paths["mergedb"] + logorderfile = mergedb + "/logorder" + chainsdir = mergedb + "/chains" + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + + timing = timing_point() + + logorder = get_logorder(logorderfile) + timing_point(timing, "get logorder") + + certsinlog = set(logorder) + + new_entries_per_node = {} + new_entries = set() + entries_to_fetch = {} + + for storagenode in storagenodes: + print >>sys.stderr, "getting new entries from", storagenode["name"] + sys.stderr.flush() + new_entries_per_node[storagenode["name"]] = \ + set(get_new_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths)) + new_entries.update(new_entries_per_node[storagenode["name"]]) + entries_to_fetch[storagenode["name"]] = [] + timing_point(timing, "get new entries") + + new_entries -= certsinlog + print >>sys.stderr, "adding", len(new_entries), "entries" + sys.stderr.flush() + + if args.nomerge: + sys.exit(0) + + for ehash in new_entries: + for storagenode in storagenodes: + if ehash in new_entries_per_node[storagenode["name"]]: + entries_to_fetch[storagenode["name"]].append(ehash) + break + + verifycert = subprocess.Popen( + [paths["verifycert_bin"], paths["known_roots"]], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + added_entries = 0 + for storagenode in storagenodes: + print >>sys.stderr, "getting %d entries from %s:" % \ + (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), + sys.stderr.flush() + for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): + entries = get_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths, chunk) + for ehash in chunk: + entry = entries[ehash] + verify_entry(verifycert, entry, ehash) + write_chain(ehash, entry, chainsdir) + add_to_logorder(logorderfile, ehash) + logorder.append(ehash) + certsinlog.add(ehash) + added_entries += 1 + print >>sys.stderr, added_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + fsync_logorder(logorderfile) + timing_point(timing, "add entries") + print >>sys.stderr, "added", added_entries, "entries" + sys.stderr.flush() + + verifycert.communicate(struct.pack("I", 0)) + + tree = build_merkle_tree(logorder) + tree_size = len(logorder) + root_hash = tree[-1][0] + timestamp = int(time.time() * 1000) + + return (tree_size, root_hash, timestamp) |