diff options
author | Linus Nordberg <linus@nordu.net> | 2015-09-24 16:47:32 +0200 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2015-11-10 12:48:46 +0100 |
commit | aaed8aff7c425c8ac2a15e584e24317da327f5e4 (patch) | |
tree | 229a7e5718b0d4c9750918d654484dba354f5194 /tools/merge_dist.py | |
parent | ab327d88f7a8f458b6150efd6b21b5615210e571 (diff) |
Merge is now run by shell script tools/merge.
tools/merge run merge_fetch.py, merge_backup.py, merge_sth.py and
merge_dist.py sequentially.
TODO: test backupquorum != 0
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-x[-rw-r--r--] | tools/merge_dist.py | 102 |
1 files changed, 54 insertions, 48 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py index bfc0e61..0e85984 100644..100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -5,74 +5,56 @@ # See LICENSE for licensing information. import sys -import urllib2 -import base64 -import select -from certtools import timing_point, check_sth_signature, \ - create_sth_signature, get_public_key_from_file +import json +from time import sleep +from base64 import b64encode, b64decode +from certtools import timing_point, \ + create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ - sendsth, hexencode, sendlog, sendentry, read_chain + sendsth, sendlog, sendentry, read_chain, parse_args -def merge_dist(args, config, localconfig, sth_in): +def merge_dist(args, config, localconfig, timestamp): paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) frontendnodes = config["frontendnodes"] - signingnodes = config["signingnodes"] - ctbaseurl = config["baseurl"] - logpublickey = get_public_key_from_file(paths["logpublickey"]) mergedb = paths["mergedb"] chainsdir = mergedb + "/chains" logorderfile = mergedb + "/logorder" + sthfile = mergedb + "/sth" + create_ssl_context(cafile=paths["https_cacertfile"]) timing = timing_point() - logorder = get_logorder(logorderfile) - timing_point(timing, "get logorder") - - (tree_size, root_hash, timestamp) = sth_in - tree_head_signature = None - for signingnode in signingnodes: - try: - tree_head_signature = \ - create_sth_signature(tree_size, timestamp, - root_hash, - "https://%s/" % signingnode["address"], - key=own_key) - break - except urllib2.URLError, err: - print >>sys.stderr, err - sys.stderr.flush() - if tree_head_signature == None: - print >>sys.stderr, "Could not contact any signing nodes" - sys.exit(1) - - sth = {"tree_size": tree_size, "timestamp": timestamp, - "sha256_root_hash": base64.b64encode(root_hash), - "tree_head_signature": base64.b64encode(tree_head_signature)} - - check_sth_signature(ctbaseurl, sth, publickey=logpublickey) + try: + sth = json.loads(open(sthfile, 'r').read()) + except (IOError, ValueError): + print >>sys.stderr, "No valid STH file found in", sthfile + return timestamp + if sth['timestamp'] < timestamp: + print >>sys.stderr, "New STH file older than the previous one:", \ + sth['timestamp'], "<", timestamp + return timestamp + if sth['timestamp'] == timestamp: + return timestamp + timestamp = sth['timestamp'] - timing_point(timing, "build sth") - - if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() - - print hexencode(root_hash) - sys.stdout.flush() + logorder = get_logorder(logorderfile, sth['tree_size']) + timing_point(timing, "get logorder") for frontendnode in frontendnodes: nodeaddress = "https://%s/" % frontendnode["address"] nodename = frontendnode["name"] timing = timing_point() + print >>sys.stderr, "distributing for node", nodename sys.stderr.flush() curpos = get_curpos(nodename, nodeaddress, own_key, paths) timing_point(timing, "get curpos") print >>sys.stderr, "current position", curpos sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[curpos:]] + + entries = [b64encode(entry) for entry in logorder[curpos:]] print >>sys.stderr, "sending log:", sys.stderr.flush() for chunk in chunks(entries, 1000): @@ -83,7 +65,7 @@ def merge_dist(args, config, localconfig, sth_in): if sendlogresult == None: if trynumber == 1: sys.exit(1) - select.select([], [], [], 10.0) + sleep(10) print >>sys.stderr, "tries left:", trynumber sys.stderr.flush() continue @@ -98,20 +80,22 @@ def merge_dist(args, config, localconfig, sth_in): timing_point(timing, "sendlog") print >>sys.stderr, "log sent" sys.stderr.flush() + missingentries = get_missingentries(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") + print >>sys.stderr, "missing entries:", len(missingentries) sys.stderr.flush() fetched_entries = 0 print >>sys.stderr, "fetching missing entries", sys.stderr.flush() for missingentry in missingentries: - ehash = base64.b64decode(missingentry) + ehash = b64decode(missingentry) sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, read_chain(chainsdir, ehash), ehash) if sendentryresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendentryresult + print >>sys.stderr, "sendentry:", sendentryresult sys.exit(1) fetched_entries += 1 if fetched_entries % 1000 == 0: @@ -120,11 +104,33 @@ def merge_dist(args, config, localconfig, sth_in): print >>sys.stderr sys.stderr.flush() timing_point(timing, "send missing") + + print >>sys.stderr, "sending sth to node", nodename + sys.stderr.flush() sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) if sendsthresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendsthresult + print >>sys.stderr, "sendsth:", sendsthresult sys.exit(1) timing_point(timing, "send sth") + if args.timing: print >>sys.stderr, timing["deltatimes"] sys.stderr.flush() + + return timestamp + +def main(): + """ + Distribute missing entries and the STH to all frontend nodes. + """ + args, config, localconfig = parse_args() + timestamp = 0 + while True: + timestamp = merge_dist(args, config, localconfig, timestamp) + if args.interval is None: + break + print >>sys.stderr, "sleeping", args.interval, "seconds" + sleep(args.interval) + +if __name__ == '__main__': + sys.exit(main()) |