From 19a2a611a839c0318f58347e2d93943c8e2401a5 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 23 Nov 2016 17:09:48 +0100 Subject: WIP Merge can run as four separate processes, plus a fifth controlling proces 'merge'. Tests are limited to testcase1.py and they're failing because of the test with the dead merge secondary. Tests are also time consuming because they're waiting for 60s each time a merge needs to be verified. This could be improved by peeking at the control files, for example. --- tools/merge_fetch.py | 59 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 24 deletions(-) (limited to 'tools/merge_fetch.py') diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index db274a3..7973fae 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,10 +10,11 @@ import sys import struct import subprocess +import logging from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ - hexencode, parse_args, perm + hexencode, parse_args, perm, flock_ex_or_fail, Status from certtools import timing_point, write_file, create_ssl_context def merge_fetch(args, config, localconfig): @@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig): storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" + statusfile = mergedb + "/merge_fetch.status" + s = Status(statusfile) chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig): entries_to_fetch = {} for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() + logging.info("getting new entries from %s", storagenode["name"]) new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig): timing_point(timing, "get new entries") new_entries -= certsinlog - print >>sys.stderr, "adding", len(new_entries), "entries" - sys.stderr.flush() + logging.info("adding %d entries", len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig): added_entries = 0 for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % \ - (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() + nentries = len(entries_to_fetch[storagenode["name"]]) + logging.info("getting %d entries from %s", nentries, storagenode["name"]) for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig): logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: getting %d entries from %s: %d" % + (nentries, storagenode["name"], added_entries)) chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") - print >>sys.stderr, "added", added_entries, "entries" - sys.stderr.flush() + logging.info("added %d entries", added_entries) verifycert.communicate(struct.pack("I", 0)) if args.timing: - print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_fetch: %s", timing["deltatimes"]) tree_size = len(logorder) if tree_size == 0: @@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig): def main(): """ - Fetch new entries from all storage nodes. + Fetch new entries from all storage nodes, in sequence. - Indicate current position by writing the index in the logorder file - (0-based) to the 'fetched' file. + Indicate the current position by writing the hash and its 'logorder' + index, 0-based, to 'fetched'. - Sleep some and start over. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" + lockfile = mergedb + "/.merge_fetch.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_fetch.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} - #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize + logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) - if args.interval is None: + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + logging.debug("sleeping %d seconds", args.mergeinterval / 10) + sleep(args.mergeinterval / 10) + + return 0 if __name__ == '__main__': sys.exit(main()) -- cgit v1.1