diff options
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-x | tools/merge_fetch.py | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index db274a3..7973fae 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,10 +10,11 @@ import sys import struct import subprocess +import logging from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ - hexencode, parse_args, perm + hexencode, parse_args, perm, flock_ex_or_fail, Status from certtools import timing_point, write_file, create_ssl_context def merge_fetch(args, config, localconfig): @@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig): storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" + statusfile = mergedb + "/merge_fetch.status" + s = Status(statusfile) chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig): entries_to_fetch = {} for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() + logging.info("getting new entries from %s", storagenode["name"]) new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig): timing_point(timing, "get new entries") new_entries -= certsinlog - print >>sys.stderr, "adding", len(new_entries), "entries" - sys.stderr.flush() + logging.info("adding %d entries", len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig): added_entries = 0 for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % \ - (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() + nentries = len(entries_to_fetch[storagenode["name"]]) + logging.info("getting %d entries from %s", nentries, storagenode["name"]) for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig): logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: getting %d entries from %s: %d" % + (nentries, storagenode["name"], added_entries)) chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") - print >>sys.stderr, "added", added_entries, "entries" - sys.stderr.flush() + logging.info("added %d entries", added_entries) verifycert.communicate(struct.pack("I", 0)) if args.timing: - print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_fetch: %s", timing["deltatimes"]) tree_size = len(logorder) if tree_size == 0: @@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig): def main(): """ - Fetch new entries from all storage nodes. + Fetch new entries from all storage nodes, in sequence. - Indicate current position by writing the index in the logorder file - (0-based) to the 'fetched' file. + Indicate the current position by writing the hash and its 'logorder' + index, 0-based, to 'fetched'. - Sleep some and start over. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" + lockfile = mergedb + "/.merge_fetch.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_fetch.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} - #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize + logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) - if args.interval is None: + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + logging.debug("sleeping %d seconds", args.mergeinterval / 10) + sleep(args.mergeinterval / 10) + + return 0 if __name__ == '__main__': sys.exit(main()) |