From b13432988901f1419a04a0f2f4c411bdd3267999 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Sat, 3 Dec 2016 14:34:15 +0100 Subject: merge_dist: Distribute independently to each frontend node. --- tools/merge_dist.py | 125 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 77 insertions(+), 48 deletions(-) diff --git a/tools/merge_dist.py b/tools/merge_dist.py index bc9c676..d81f0a1 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -114,79 +114,112 @@ def do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s): if args.timing: logging.debug("timing: merge_dist: %s", timing["deltatimes"]) -def merge_dist(args, localconfig, frontendnodes, timestamp): +def merge_dist_sequenced(args, localconfig, frontendnodes, chainsdb, s): paths = localconfig["paths"] mergedb = paths["mergedb"] - chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" sthfile = mergedb + "/sth" - statusfile = mergedb + "/merge_dist.status" - s = Status(statusfile) - create_ssl_context(cafile=paths["https_cacertfile"]) + timestamp = 0 timing = timing_point() try: sth = json.loads(open(sthfile, 'r').read()) except (IOError, ValueError): logging.warning("No valid STH file found in %s", sthfile) - return timestamp, 0 + return timestamp if sth['timestamp'] < timestamp: logging.warning("New STH file older than the previous one: %d < %d", sth['timestamp'], timestamp) - return timestamp, 0 + return timestamp if sth['timestamp'] == timestamp: - return timestamp, 0 + return timestamp timestamp = sth['timestamp'] logorder = get_logorder(logorderfile, sth['tree_size']) timing_point(timing, "get logorder") - procs = {} for frontendnode in frontendnodes: - nodename = frontendnode["name"] + do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s) - if args.mergeinterval: - name = 'dist_%s' % nodename - p, pipe = start_worker(name, - lambda _, argv: do_send(*(argv)), - (args, localconfig, frontendnode, logorder, sth, chainsdb, s)) - procs[p] = (nodename, pipe) - else: - do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s) + return timestamp - if not args.mergeinterval: - return timestamp, 0 +def dist_worker(_, argv): + args, localconfig, frontendnode, chainsdb, s = argv + paths = localconfig["paths"] + mergedb = paths["mergedb"] + sthfile = mergedb + "/sth" + logorderfile = mergedb + "/logorder" + nodename = frontendnode["name"] + + wait = max(3, args.mergeinterval / 10) + timestamp = 0 + while True: + try: + sth = json.loads(open(sthfile, 'r').read()) + except (IOError, ValueError): + logging.error("%s: No valid STH file found in %s", nodename, sthfile) + sleep(wait) + continue + + if sth['timestamp'] < timestamp: + logging.error( + "%s: New STH file older than the previous one: %d < %d", + nodename, sth['timestamp'], timestamp) + sleep(wait) + continue + + if sth['timestamp'] == timestamp: + logging.info( + "%s: sth still at %d (%d), sleeping %s seconds", + nodename, sth['tree_size'], timestamp, wait) + sleep(wait) + continue + + timestamp = sth['timestamp'] + logorder = get_logorder(logorderfile, sth['tree_size']) + do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s) + +def merge_dist_parallel(args, localconfig, frontendnodes, chainsdb, s): + procs = {} + for frontendnode in frontendnodes: + nodename = frontendnode["name"] + procname = 'dist_%s' % nodename + p, pipe = start_worker(procname, dist_worker, + (args, localconfig, frontendnode, + chainsdb, s)) + procs[p] = (frontendnode, pipe) - failures = 0 while True: for p in list(procs): if not p.is_alive(): p.join() - nodename, _ = procs[p] - if p.exitcode != 0: - logging.warning("%s failure: %d", nodename, p.exitcode) - failures += 1 - del procs[p] - if not procs: - break + frontendnode, _ = procs[p] + nodename = frontendnode["name"] + logging.warning("%s exited with %d, restarting", nodename, + p.exitcode) + procname = 'dist_%s' % nodename + newproc, pipe = \ + start_worker(procname, dist_worker, + (args, localconfig, frontendnode, + chainsdb, s)) + procs[p] = (frontendnode, pipe) sleep(1) - return timestamp, failures + return -1 def main(): """ - Wait until 'sth' exists and read it. + Distribute missing entries and the STH to all frontend nodes, in + parallel if `--mergeinterval'. - Distribute missing entries and the STH to all frontend nodes. - - If `--mergeinterval', start over again. + If `--mergeinterval', re-read 'sth' when it changes and keep + distributing. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] - sth_path = localconfig["paths"]["mergedb"] + "/sth" + chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") lockfile = mergedb + "/.merge_dist.lock" - timestamp = 0 loginit(args, "merge_dist.log") @@ -194,24 +227,20 @@ def main(): logging.critical("unable to take lock %s", lockfile) return 1 + statusfile = mergedb + "/merge_dist.status" + s = Status(statusfile) + + create_ssl_context(cafile=paths["https_cacertfile"]) + if len(args.node) == 0: nodes = config["frontendnodes"] else: nodes = [n for n in config["frontendnodes"] if n["name"] in args.node] - sth_statinfo = waitforfile(sth_path) - while True: - timestamp, failures = merge_dist(args, localconfig, nodes, timestamp) - if not args.mergeinterval: - break - sth_statinfo_old = sth_statinfo - while sth_statinfo == sth_statinfo_old: - sleep(max(3, args.mergeinterval / 10)) - if failures > 0: - break - sth_statinfo = stat(sth_path) - - return 0 + if args.mergeinterval: + return merge_dist_parallel(args, localconfig, nodes, chainsdb, s) + else: + merge_dist_sequenced(args, localconfig, nodes, chainsdb, s) if __name__ == '__main__': sys.exit(main()) -- cgit v1.1