diff options
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-x | tools/merge.py | 542 |
1 files changed, 31 insertions, 511 deletions
diff --git a/tools/merge.py b/tools/merge.py index 2065a2d..212c171 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -1,518 +1,38 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2014, NORDUnet A/S. +# Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. import argparse -import json -import base64 -import urllib -import urllib2 -import sys -import time -import ecdsa -import hashlib -import urlparse -import os import yaml -import select -import struct -from certtools import build_merkle_tree, create_sth_signature, \ - check_sth_signature, get_eckey_from_file, timing_point, http_request, \ - get_public_key_from_file, get_leaf_hash, decode_certificate_chain, \ - create_ssl_context -from mergetools import parselogrow, get_logorder, read_chain, \ - verify_entry - -parser = argparse.ArgumentParser(description="") -parser.add_argument('--config', help="System configuration", required=True) -parser.add_argument('--localconfig', help="Local configuration", required=True) -parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge") -parser.add_argument("--timing", action='store_true', help="Print timing information") -args = parser.parse_args() - -config = yaml.load(open(args.config)) -localconfig = yaml.load(open(args.localconfig)) - -ctbaseurl = config["baseurl"] -frontendnodes = config["frontendnodes"] -storagenodes = config["storagenodes"] -secondaries = config.get("mergenodes", []) -paths = localconfig["paths"] -mergedb = paths["mergedb"] - -signingnodes = config["signingnodes"] -create_ssl_context(cafile=paths["https_cacertfile"]) - -chainsdir = mergedb + "/chains" -logorderfile = mergedb + "/logorder" - -own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) - -logpublickey = get_public_key_from_file(paths["logpublickey"]) - -hashed_dir = True - -def hexencode(key): - return base64.b16encode(key).lower() - -def write_chain(key, value): - filename = hexencode(key) - if hashed_dir: - path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] - try: - os.makedirs(path) - except Exception, e: - pass - else: - path = chainsdir - f = open(path + "/" + filename, "w") - f.write(value) - f.close() - -def add_to_logorder(key): - f = open(logorderfile, "a") - f.write(hexencode(key) + "\n") - f.close() - -def fsync_logorder(): - f = open(logorderfile, "a") - os.fsync(f.fileno()) - f.close() - -def get_new_entries(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return [base64.b64decode(entry) for entry in parsed_result[u"entries"]] - print >>sys.stderr, "ERROR: fetchnewentries", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: fetchnewentries", e.read() - sys.exit(1) - -def get_entries(node, baseurl, hashes): - try: - params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True) - result = http_request(baseurl + "plop/v1/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) - assert len(entries) == len(hashes) - assert set(entries.keys()) == set(hashes) - return entries - print >>sys.stderr, "ERROR: getentry", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: getentry", e.read() - sys.exit(1) - -def get_curpos(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"position"] - print >>sys.stderr, "ERROR: currentposition", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: currentposition", e.read() - sys.exit(1) - -def get_verifiedsize(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"size"] - print >>sys.stderr, "ERROR: verifiedsize", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: verifiedsize", e.read() - sys.exit(1) - - - -def sendlog(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/frontend/sendlog", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.read() - sys.stderr.flush() - return None - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def backup_sendlog(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/merge/sendlog", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.read() - sys.stderr.flush() - return None - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendentry(node, baseurl, entry, hash): - try: - result = http_request(baseurl + "plop/v1/frontend/sendentry", - json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, - verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendentry_merge(node, baseurl, entry, hash): - try: - result = http_request(baseurl + "plop/v1/merge/sendentry", - json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, - verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendsth(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/frontend/sendsth", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendsth", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def verifyroot(node, baseurl, treesize): - try: - result = http_request(baseurl + "plop/v1/merge/verifyroot", - json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: verifyroot", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def setverifiedsize(node, baseurl, treesize): - try: - result = http_request(baseurl + "plop/v1/merge/setverifiedsize", - json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: setverifiedsize", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def get_missingentries(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"entries"] - print >>sys.stderr, "ERROR: missingentries", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: missingentries", e.read() - sys.exit(1) - -def get_missingentriesforbackup(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"entries"] - print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() - sys.exit(1) - -def chunks(l, n): - return [l[i:i+n] for i in range(0, len(l), n)] - -timing = timing_point() - -logorder = get_logorder(logorderfile) - -timing_point(timing, "get logorder") - -certsinlog = set(logorder) - -new_entries_per_node = {} -new_entries = set() -entries_to_fetch = {} - -for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() - new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"])) - new_entries.update(new_entries_per_node[storagenode["name"]]) - entries_to_fetch[storagenode["name"]] = [] - -import subprocess - -timing_point(timing, "get new entries") - -new_entries -= certsinlog - -print >>sys.stderr, "adding", len(new_entries), "entries" -sys.stderr.flush() - -if args.nomerge: - sys.exit(0) - -for hash in new_entries: - for storagenode in storagenodes: - if hash in new_entries_per_node[storagenode["name"]]: - entries_to_fetch[storagenode["name"]].append(hash) - break - -verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - -added_entries = 0 -for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() - for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): - entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk) - for hash in chunk: - entry = entries[hash] - verify_entry(verifycert, entry, hash) - write_chain(hash, entry) - add_to_logorder(hash) - logorder.append(hash) - certsinlog.add(hash) - added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() -fsync_logorder() -timing_point(timing, "add entries") -print >>sys.stderr, "added", added_entries, "entries" -sys.stderr.flush() - -verifycert.communicate(struct.pack("I", 0)) - -tree = build_merkle_tree(logorder) -tree_size = len(logorder) -root_hash = tree[-1][0] -timestamp = int(time.time() * 1000) - -for secondary in secondaries: - if secondary["name"] == config["primarymergenode"]: - continue - nodeaddress = "https://%s/" % secondary["address"] - nodename = secondary["name"] - timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress) - timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() - missingentries = get_missingentriesforbackup(nodename, nodeaddress) - timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry in missingentries: - hash = base64.b64decode(missingentry) - sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if added_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - verifyrootresult = verifyroot(nodename, nodeaddress, tree_size) - if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) - secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) - if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) - timing_point(timing, "verifyroot") - setverifiedsize(nodename, nodeaddress, tree_size) - if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() - -tree_head_signature = None -for signingnode in signingnodes: - try: - tree_head_signature = create_sth_signature(tree_size, timestamp, - root_hash, "https://%s/" % signingnode["address"], key=own_key) - break - except urllib2.URLError, e: - print >>sys.stderr, e - sys.stderr.flush() -if tree_head_signature == None: - print >>sys.stderr, "Could not contact any signing nodes" - sys.exit(1) - -sth = {"tree_size": tree_size, "timestamp": timestamp, - "sha256_root_hash": base64.b64encode(root_hash), - "tree_head_signature": base64.b64encode(tree_head_signature)} - -check_sth_signature(ctbaseurl, sth, publickey=logpublickey) - -timing_point(timing, "build sth") - -if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() - -print hexencode(root_hash) -sys.stdout.flush() - -for frontendnode in frontendnodes: - nodeaddress = "https://%s/" % frontendnode["address"] - nodename = frontendnode["name"] - timing = timing_point() - print >>sys.stderr, "distributing for node", nodename - sys.stderr.flush() - curpos = get_curpos(nodename, nodeaddress) - timing_point(timing, "get curpos") - print >>sys.stderr, "current position", curpos - sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() - missingentries = get_missingentries(nodename, nodeaddress) - timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry in missingentries: - hash = base64.b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if added_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - sendsthresult = sendsth(nodename, nodeaddress, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendsthresult - sys.exit(1) - timing_point(timing, "send sth") - if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() +import sys +from certtools import create_ssl_context +from merge_fetch import merge_fetch +from merge_backup import merge_backup +from merge_dist import merge_dist + +def main(): + parser = argparse.ArgumentParser(description="") + parser.add_argument('--config', help="System configuration", + required=True) + parser.add_argument('--localconfig', help="Local configuration", + required=True) + parser.add_argument("--nomerge", action='store_true', + help="Don't actually do merge") + parser.add_argument("--timing", action='store_true', + help="Print timing information") + args = parser.parse_args() + + config = yaml.load(open(args.config)) + localconfig = yaml.load(open(args.localconfig)) + paths = localconfig["paths"] + + create_ssl_context(cafile=paths["https_cacertfile"]) + + sth = merge_fetch(args, config, localconfig) + merge_backup(args, config, localconfig, sth) + merge_dist(args, config, localconfig, sth) + +if __name__ == '__main__': + sys.exit(main()) |