summaryrefslogtreecommitdiff
path: root/tools/merge_dist.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-12-03 14:34:15 +0100
committerLinus Nordberg <linus@nordu.net>2016-12-03 14:40:51 +0100
commitb13432988901f1419a04a0f2f4c411bdd3267999 (patch)
treebf9887ae2a053c33f2a6d2bcedfcb482819c5a00 /tools/merge_dist.py
parent29f42488a1376b25e162da8fbaf500955cd4dcfe (diff)
merge_dist: Distribute independently to each frontend node.
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-xtools/merge_dist.py125
1 files changed, 77 insertions, 48 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index bc9c676..d81f0a1 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -114,79 +114,112 @@ def do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s):
if args.timing:
logging.debug("timing: merge_dist: %s", timing["deltatimes"])
-def merge_dist(args, localconfig, frontendnodes, timestamp):
+def merge_dist_sequenced(args, localconfig, frontendnodes, chainsdb, s):
paths = localconfig["paths"]
mergedb = paths["mergedb"]
- chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
sthfile = mergedb + "/sth"
- statusfile = mergedb + "/merge_dist.status"
- s = Status(statusfile)
- create_ssl_context(cafile=paths["https_cacertfile"])
+ timestamp = 0
timing = timing_point()
try:
sth = json.loads(open(sthfile, 'r').read())
except (IOError, ValueError):
logging.warning("No valid STH file found in %s", sthfile)
- return timestamp, 0
+ return timestamp
if sth['timestamp'] < timestamp:
logging.warning("New STH file older than the previous one: %d < %d",
sth['timestamp'], timestamp)
- return timestamp, 0
+ return timestamp
if sth['timestamp'] == timestamp:
- return timestamp, 0
+ return timestamp
timestamp = sth['timestamp']
logorder = get_logorder(logorderfile, sth['tree_size'])
timing_point(timing, "get logorder")
- procs = {}
for frontendnode in frontendnodes:
- nodename = frontendnode["name"]
+ do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s)
- if args.mergeinterval:
- name = 'dist_%s' % nodename
- p, pipe = start_worker(name,
- lambda _, argv: do_send(*(argv)),
- (args, localconfig, frontendnode, logorder, sth, chainsdb, s))
- procs[p] = (nodename, pipe)
- else:
- do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s)
+ return timestamp
- if not args.mergeinterval:
- return timestamp, 0
+def dist_worker(_, argv):
+ args, localconfig, frontendnode, chainsdb, s = argv
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ sthfile = mergedb + "/sth"
+ logorderfile = mergedb + "/logorder"
+ nodename = frontendnode["name"]
+
+ wait = max(3, args.mergeinterval / 10)
+ timestamp = 0
+ while True:
+ try:
+ sth = json.loads(open(sthfile, 'r').read())
+ except (IOError, ValueError):
+ logging.error("%s: No valid STH file found in %s", nodename, sthfile)
+ sleep(wait)
+ continue
+
+ if sth['timestamp'] < timestamp:
+ logging.error(
+ "%s: New STH file older than the previous one: %d < %d",
+ nodename, sth['timestamp'], timestamp)
+ sleep(wait)
+ continue
+
+ if sth['timestamp'] == timestamp:
+ logging.info(
+ "%s: sth still at %d (%d), sleeping %s seconds",
+ nodename, sth['tree_size'], timestamp, wait)
+ sleep(wait)
+ continue
+
+ timestamp = sth['timestamp']
+ logorder = get_logorder(logorderfile, sth['tree_size'])
+ do_send(args, localconfig, frontendnode, logorder, sth, chainsdb, s)
+
+def merge_dist_parallel(args, localconfig, frontendnodes, chainsdb, s):
+ procs = {}
+ for frontendnode in frontendnodes:
+ nodename = frontendnode["name"]
+ procname = 'dist_%s' % nodename
+ p, pipe = start_worker(procname, dist_worker,
+ (args, localconfig, frontendnode,
+ chainsdb, s))
+ procs[p] = (frontendnode, pipe)
- failures = 0
while True:
for p in list(procs):
if not p.is_alive():
p.join()
- nodename, _ = procs[p]
- if p.exitcode != 0:
- logging.warning("%s failure: %d", nodename, p.exitcode)
- failures += 1
- del procs[p]
- if not procs:
- break
+ frontendnode, _ = procs[p]
+ nodename = frontendnode["name"]
+ logging.warning("%s exited with %d, restarting", nodename,
+ p.exitcode)
+ procname = 'dist_%s' % nodename
+ newproc, pipe = \
+ start_worker(procname, dist_worker,
+ (args, localconfig, frontendnode,
+ chainsdb, s))
+ procs[p] = (frontendnode, pipe)
sleep(1)
- return timestamp, failures
+ return -1
def main():
"""
- Wait until 'sth' exists and read it.
+ Distribute missing entries and the STH to all frontend nodes, in
+ parallel if `--mergeinterval'.
- Distribute missing entries and the STH to all frontend nodes.
-
- If `--mergeinterval', start over again.
+ If `--mergeinterval', re-read 'sth' when it changes and keep
+ distributing.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
mergedb = paths["mergedb"]
- sth_path = localconfig["paths"]["mergedb"] + "/sth"
+ chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
lockfile = mergedb + "/.merge_dist.lock"
- timestamp = 0
loginit(args, "merge_dist.log")
@@ -194,24 +227,20 @@ def main():
logging.critical("unable to take lock %s", lockfile)
return 1
+ statusfile = mergedb + "/merge_dist.status"
+ s = Status(statusfile)
+
+ create_ssl_context(cafile=paths["https_cacertfile"])
+
if len(args.node) == 0:
nodes = config["frontendnodes"]
else:
nodes = [n for n in config["frontendnodes"] if n["name"] in args.node]
- sth_statinfo = waitforfile(sth_path)
- while True:
- timestamp, failures = merge_dist(args, localconfig, nodes, timestamp)
- if not args.mergeinterval:
- break
- sth_statinfo_old = sth_statinfo
- while sth_statinfo == sth_statinfo_old:
- sleep(max(3, args.mergeinterval / 10))
- if failures > 0:
- break
- sth_statinfo = stat(sth_path)
-
- return 0
+ if args.mergeinterval:
+ return merge_dist_parallel(args, localconfig, nodes, chainsdb, s)
+ else:
+ merge_dist_sequenced(args, localconfig, nodes, chainsdb, s)
if __name__ == '__main__':
sys.exit(main())