diff options
-rwxr-xr-x | tools/merge.py | 542 | ||||
-rw-r--r-- | tools/merge_backup.py | 108 | ||||
-rw-r--r-- | tools/merge_dist.py | 130 | ||||
-rw-r--r-- | tools/merge_fetch.py | 97 | ||||
-rw-r--r-- | tools/mergetools.py | 250 |
5 files changed, 613 insertions, 514 deletions
diff --git a/tools/merge.py b/tools/merge.py index 2065a2d..212c171 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -1,518 +1,38 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Copyright (c) 2014, NORDUnet A/S. +# Copyright (c) 2014-2015, NORDUnet A/S. # See LICENSE for licensing information. import argparse -import json -import base64 -import urllib -import urllib2 -import sys -import time -import ecdsa -import hashlib -import urlparse -import os import yaml -import select -import struct -from certtools import build_merkle_tree, create_sth_signature, \ - check_sth_signature, get_eckey_from_file, timing_point, http_request, \ - get_public_key_from_file, get_leaf_hash, decode_certificate_chain, \ - create_ssl_context -from mergetools import parselogrow, get_logorder, read_chain, \ - verify_entry - -parser = argparse.ArgumentParser(description="") -parser.add_argument('--config', help="System configuration", required=True) -parser.add_argument('--localconfig', help="Local configuration", required=True) -parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge") -parser.add_argument("--timing", action='store_true', help="Print timing information") -args = parser.parse_args() - -config = yaml.load(open(args.config)) -localconfig = yaml.load(open(args.localconfig)) - -ctbaseurl = config["baseurl"] -frontendnodes = config["frontendnodes"] -storagenodes = config["storagenodes"] -secondaries = config.get("mergenodes", []) -paths = localconfig["paths"] -mergedb = paths["mergedb"] - -signingnodes = config["signingnodes"] -create_ssl_context(cafile=paths["https_cacertfile"]) - -chainsdir = mergedb + "/chains" -logorderfile = mergedb + "/logorder" - -own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) - -logpublickey = get_public_key_from_file(paths["logpublickey"]) - -hashed_dir = True - -def hexencode(key): - return base64.b16encode(key).lower() - -def write_chain(key, value): - filename = hexencode(key) - if hashed_dir: - path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] - try: - os.makedirs(path) - except Exception, e: - pass - else: - path = chainsdir - f = open(path + "/" + filename, "w") - f.write(value) - f.close() - -def add_to_logorder(key): - f = open(logorderfile, "a") - f.write(hexencode(key) + "\n") - f.close() - -def fsync_logorder(): - f = open(logorderfile, "a") - os.fsync(f.fileno()) - f.close() - -def get_new_entries(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return [base64.b64decode(entry) for entry in parsed_result[u"entries"]] - print >>sys.stderr, "ERROR: fetchnewentries", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: fetchnewentries", e.read() - sys.exit(1) - -def get_entries(node, baseurl, hashes): - try: - params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True) - result = http_request(baseurl + "plop/v1/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) - assert len(entries) == len(hashes) - assert set(entries.keys()) == set(hashes) - return entries - print >>sys.stderr, "ERROR: getentry", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: getentry", e.read() - sys.exit(1) - -def get_curpos(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"position"] - print >>sys.stderr, "ERROR: currentposition", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: currentposition", e.read() - sys.exit(1) - -def get_verifiedsize(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"size"] - print >>sys.stderr, "ERROR: verifiedsize", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: verifiedsize", e.read() - sys.exit(1) - - - -def sendlog(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/frontend/sendlog", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.read() - sys.stderr.flush() - return None - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def backup_sendlog(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/merge/sendlog", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.read() - sys.stderr.flush() - return None - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendentry(node, baseurl, entry, hash): - try: - result = http_request(baseurl + "plop/v1/frontend/sendentry", - json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, - verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendentry_merge(node, baseurl, entry, hash): - try: - result = http_request(baseurl + "plop/v1/merge/sendentry", - json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, - verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def sendsth(node, baseurl, submission): - try: - result = http_request(baseurl + "plop/v1/frontend/sendsth", - json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: sendsth", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def verifyroot(node, baseurl, treesize): - try: - result = http_request(baseurl + "plop/v1/merge/verifyroot", - json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: verifyroot", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def setverifiedsize(node, baseurl, treesize): - try: - result = http_request(baseurl + "plop/v1/merge/setverifiedsize", - json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: setverifiedsize", e.read() - sys.exit(1) - except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e - -def get_missingentries(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"entries"] - print >>sys.stderr, "ERROR: missingentries", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: missingentries", e.read() - sys.exit(1) - -def get_missingentriesforbackup(node, baseurl): - try: - result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - parsed_result = json.loads(result) - if parsed_result.get(u"result") == u"ok": - return parsed_result[u"entries"] - print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result - sys.exit(1) - except urllib2.HTTPError, e: - print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() - sys.exit(1) - -def chunks(l, n): - return [l[i:i+n] for i in range(0, len(l), n)] - -timing = timing_point() - -logorder = get_logorder(logorderfile) - -timing_point(timing, "get logorder") - -certsinlog = set(logorder) - -new_entries_per_node = {} -new_entries = set() -entries_to_fetch = {} - -for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() - new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"])) - new_entries.update(new_entries_per_node[storagenode["name"]]) - entries_to_fetch[storagenode["name"]] = [] - -import subprocess - -timing_point(timing, "get new entries") - -new_entries -= certsinlog - -print >>sys.stderr, "adding", len(new_entries), "entries" -sys.stderr.flush() - -if args.nomerge: - sys.exit(0) - -for hash in new_entries: - for storagenode in storagenodes: - if hash in new_entries_per_node[storagenode["name"]]: - entries_to_fetch[storagenode["name"]].append(hash) - break - -verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], - stdin=subprocess.PIPE, stdout=subprocess.PIPE) - -added_entries = 0 -for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() - for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): - entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk) - for hash in chunk: - entry = entries[hash] - verify_entry(verifycert, entry, hash) - write_chain(hash, entry) - add_to_logorder(hash) - logorder.append(hash) - certsinlog.add(hash) - added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() -fsync_logorder() -timing_point(timing, "add entries") -print >>sys.stderr, "added", added_entries, "entries" -sys.stderr.flush() - -verifycert.communicate(struct.pack("I", 0)) - -tree = build_merkle_tree(logorder) -tree_size = len(logorder) -root_hash = tree[-1][0] -timestamp = int(time.time() * 1000) - -for secondary in secondaries: - if secondary["name"] == config["primarymergenode"]: - continue - nodeaddress = "https://%s/" % secondary["address"] - nodename = secondary["name"] - timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress) - timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() - missingentries = get_missingentriesforbackup(nodename, nodeaddress) - timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry in missingentries: - hash = base64.b64decode(missingentry) - sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if added_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - verifyrootresult = verifyroot(nodename, nodeaddress, tree_size) - if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) - secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) - if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) - timing_point(timing, "verifyroot") - setverifiedsize(nodename, nodeaddress, tree_size) - if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() - -tree_head_signature = None -for signingnode in signingnodes: - try: - tree_head_signature = create_sth_signature(tree_size, timestamp, - root_hash, "https://%s/" % signingnode["address"], key=own_key) - break - except urllib2.URLError, e: - print >>sys.stderr, e - sys.stderr.flush() -if tree_head_signature == None: - print >>sys.stderr, "Could not contact any signing nodes" - sys.exit(1) - -sth = {"tree_size": tree_size, "timestamp": timestamp, - "sha256_root_hash": base64.b64encode(root_hash), - "tree_head_signature": base64.b64encode(tree_head_signature)} - -check_sth_signature(ctbaseurl, sth, publickey=logpublickey) - -timing_point(timing, "build sth") - -if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() - -print hexencode(root_hash) -sys.stdout.flush() - -for frontendnode in frontendnodes: - nodeaddress = "https://%s/" % frontendnode["address"] - nodename = frontendnode["name"] - timing = timing_point() - print >>sys.stderr, "distributing for node", nodename - sys.stderr.flush() - curpos = get_curpos(nodename, nodeaddress) - timing_point(timing, "get curpos") - print >>sys.stderr, "current position", curpos - sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() - missingentries = get_missingentries(nodename, nodeaddress) - timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - for missingentry in missingentries: - hash = base64.b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendentryresult - sys.exit(1) - fetched_entries += 1 - if added_entries % 1000 == 0: - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - sendsthresult = sendsth(nodename, nodeaddress, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "send sth:", sendsthresult - sys.exit(1) - timing_point(timing, "send sth") - if args.timing: - print >>sys.stderr, timing["deltatimes"] - sys.stderr.flush() +import sys +from certtools import create_ssl_context +from merge_fetch import merge_fetch +from merge_backup import merge_backup +from merge_dist import merge_dist + +def main(): + parser = argparse.ArgumentParser(description="") + parser.add_argument('--config', help="System configuration", + required=True) + parser.add_argument('--localconfig', help="Local configuration", + required=True) + parser.add_argument("--nomerge", action='store_true', + help="Don't actually do merge") + parser.add_argument("--timing", action='store_true', + help="Print timing information") + args = parser.parse_args() + + config = yaml.load(open(args.config)) + localconfig = yaml.load(open(args.localconfig)) + paths = localconfig["paths"] + + create_ssl_context(cafile=paths["https_cacertfile"]) + + sth = merge_fetch(args, config, localconfig) + merge_backup(args, config, localconfig, sth) + merge_dist(args, config, localconfig, sth) + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tools/merge_backup.py b/tools/merge_backup.py new file mode 100644 index 0000000..27c71a5 --- /dev/null +++ b/tools/merge_backup.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import base64 +import select +from certtools import timing_point +from mergetools import chunks, backup_sendlog, get_logorder, \ + get_verifiedsize, get_missingentriesforbackup, read_chain, \ + hexencode, setverifiedsize, sendentry_merge, verifyroot + +def merge_backup(args, config, localconfig, sth_in): + paths = localconfig["paths"] + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + secondaries = config.get("mergenodes", []) + mergedb = paths["mergedb"] + chainsdir = mergedb + "/chains" + logorderfile = mergedb + "/logorder" + timing = timing_point() + + logorder = get_logorder(logorderfile) + timing_point(timing, "get logorder") + + (tree_size, root_hash, _) = sth_in + + for secondary in secondaries: + if secondary["name"] == config["primarymergenode"]: + continue + nodeaddress = "https://%s/" % secondary["address"] + nodename = secondary["name"] + timing = timing_point() + print >>sys.stderr, "backing up to node", nodename + sys.stderr.flush() + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get verified size") + print >>sys.stderr, "verified size", verifiedsize + sys.stderr.flush() + entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = \ + backup_sendlog(nodename, nodeaddress, own_key, paths, + {"start": verifiedsize, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + print >>sys.stderr + timing_point(timing, "sendlog") + print >>sys.stderr, "log sent" + sys.stderr.flush() + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + for missingentry in missingentries: + ehash = base64.b64decode(missingentry) + sendentryresult = sendentry_merge(nodename, nodeaddress, + own_key, paths, + read_chain(chainsdir, ehash), + ehash) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "send sth:", sendentryresult + sys.exit(1) + fetched_entries += 1 + if fetched_entries % 1000 == 0: + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + if verifyrootresult["result"] != "ok": + print >>sys.stderr, "verifyroot:", verifyrootresult + sys.exit(1) + secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + if root_hash != secondary_root_hash: + print >>sys.stderr, "secondary root hash was", \ + hexencode(secondary_root_hash) + print >>sys.stderr, " expected", hexencode(root_hash) + sys.exit(1) + timing_point(timing, "verifyroot") + setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) + if args.timing: + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() diff --git a/tools/merge_dist.py b/tools/merge_dist.py new file mode 100644 index 0000000..2b2f259 --- /dev/null +++ b/tools/merge_dist.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import urllib2 +import base64 +import select +from certtools import timing_point, check_sth_signature, \ + get_public_key_from_file +from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ + sendsth, create_sth_signature, hexencode, sendlog, sendentry, read_chain + +def merge_dist(args, config, localconfig, sth_in): + paths = localconfig["paths"] + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + frontendnodes = config["frontendnodes"] + signingnodes = config["signingnodes"] + ctbaseurl = config["baseurl"] + logpublickey = get_public_key_from_file(paths["logpublickey"]) + mergedb = paths["mergedb"] + chainsdir = mergedb + "/chains" + logorderfile = mergedb + "/logorder" + timing = timing_point() + + logorder = get_logorder(logorderfile) + timing_point(timing, "get logorder") + + (tree_size, root_hash, timestamp) = sth_in + tree_head_signature = None + for signingnode in signingnodes: + try: + tree_head_signature = \ + create_sth_signature(tree_size, timestamp, + root_hash, + "https://%s/" % signingnode["address"], + key=own_key) + break + except urllib2.URLError, err: + print >>sys.stderr, err + sys.stderr.flush() + if tree_head_signature == None: + print >>sys.stderr, "Could not contact any signing nodes" + sys.exit(1) + + sth = {"tree_size": tree_size, "timestamp": timestamp, + "sha256_root_hash": base64.b64encode(root_hash), + "tree_head_signature": base64.b64encode(tree_head_signature)} + + check_sth_signature(ctbaseurl, sth, publickey=logpublickey) + + timing_point(timing, "build sth") + + if args.timing: + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() + + print hexencode(root_hash) + sys.stdout.flush() + + for frontendnode in frontendnodes: + nodeaddress = "https://%s/" % frontendnode["address"] + nodename = frontendnode["name"] + timing = timing_point() + print >>sys.stderr, "distributing for node", nodename + sys.stderr.flush() + curpos = get_curpos(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get curpos") + print >>sys.stderr, "current position", curpos + sys.stderr.flush() + entries = [base64.b64encode(entry) for entry in logorder[curpos:]] + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = sendlog(nodename, nodeaddress, + own_key, paths, + {"start": curpos, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + select.select([], [], [], 10.0) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + curpos += len(chunk) + print >>sys.stderr, curpos, + sys.stderr.flush() + print >>sys.stderr + timing_point(timing, "sendlog") + print >>sys.stderr, "log sent" + sys.stderr.flush() + missingentries = get_missingentries(nodename, nodeaddress, own_key, + paths) + timing_point(timing, "get missing") + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + fetched_entries = 0 + print >>sys.stderr, "fetching missing entries", + sys.stderr.flush() + for missingentry in missingentries: + ehash = base64.b64decode(missingentry) + sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, + read_chain(chainsdir, ehash), ehash) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "send sth:", sendentryresult + sys.exit(1) + fetched_entries += 1 + if fetched_entries % 1000 == 0: + print >>sys.stderr, fetched_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) + if sendsthresult["result"] != "ok": + print >>sys.stderr, "send sth:", sendsthresult + sys.exit(1) + timing_point(timing, "send sth") + if args.timing: + print >>sys.stderr, timing["deltatimes"] + sys.stderr.flush() diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py new file mode 100644 index 0000000..a0a0396 --- /dev/null +++ b/tools/merge_fetch.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import struct +import time +import subprocess +from mergetools import get_logorder, verify_entry, get_new_entries, \ + chunks, fsync_logorder, get_entries, write_chain, add_to_logorder +from certtools import timing_point, build_merkle_tree + +def merge_fetch(args, config, localconfig): + paths = localconfig["paths"] + storagenodes = config["storagenodes"] + mergedb = paths["mergedb"] + logorderfile = mergedb + "/logorder" + chainsdir = mergedb + "/chains" + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + + timing = timing_point() + + logorder = get_logorder(logorderfile) + timing_point(timing, "get logorder") + + certsinlog = set(logorder) + + new_entries_per_node = {} + new_entries = set() + entries_to_fetch = {} + + for storagenode in storagenodes: + print >>sys.stderr, "getting new entries from", storagenode["name"] + sys.stderr.flush() + new_entries_per_node[storagenode["name"]] = \ + set(get_new_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths)) + new_entries.update(new_entries_per_node[storagenode["name"]]) + entries_to_fetch[storagenode["name"]] = [] + timing_point(timing, "get new entries") + + new_entries -= certsinlog + print >>sys.stderr, "adding", len(new_entries), "entries" + sys.stderr.flush() + + if args.nomerge: + sys.exit(0) + + for ehash in new_entries: + for storagenode in storagenodes: + if ehash in new_entries_per_node[storagenode["name"]]: + entries_to_fetch[storagenode["name"]].append(ehash) + break + + verifycert = subprocess.Popen( + [paths["verifycert_bin"], paths["known_roots"]], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + added_entries = 0 + for storagenode in storagenodes: + print >>sys.stderr, "getting %d entries from %s:" % \ + (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), + sys.stderr.flush() + for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): + entries = get_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths, chunk) + for ehash in chunk: + entry = entries[ehash] + verify_entry(verifycert, entry, ehash) + write_chain(ehash, entry, chainsdir) + add_to_logorder(logorderfile, ehash) + logorder.append(ehash) + certsinlog.add(ehash) + added_entries += 1 + print >>sys.stderr, added_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + fsync_logorder(logorderfile) + timing_point(timing, "add entries") + print >>sys.stderr, "added", added_entries, "entries" + sys.stderr.flush() + + verifycert.communicate(struct.pack("I", 0)) + + tree = build_merkle_tree(logorder) + tree_size = len(logorder) + root_hash = tree[-1][0] + timestamp = int(time.time() * 1000) + + return (tree_size, root_hash, timestamp) diff --git a/tools/mergetools.py b/tools/mergetools.py index 947d7f4..820087c 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -1,10 +1,17 @@ # Copyright (c) 2015, NORDUnet A/S. # See LICENSE for licensing information. + +import os import base64 import hashlib import sys import struct -from certtools import get_leaf_hash +import urllib +import urllib2 +import json +from certtools import get_leaf_hash, create_sth_signature, \ + check_sth_signature, get_eckey_from_file, http_request, \ + get_leaf_hash, decode_certificate_chain def parselogrow(row): return base64.b16decode(row, casefold=True) @@ -22,7 +29,7 @@ def read_chain(chainsdir, key): filename = base64.b16encode(key).upper() try: f = read_chain_open(chainsdir, filename) - except IOError, e: + except IOError: f = read_chain_open(chainsdir, filename.lower()) value = f.read() f.close() @@ -67,7 +74,7 @@ def unwrap_entry(entry): def wrap_entry(entry): return tlv_encodelist([("PLOP", entry), - ("S256", hashlib.sha256(entry).digest())]) + ("S256", hashlib.sha256(entry).digest())]) def verify_entry(verifycert, entry, hash): packed = unwrap_entry(entry) @@ -94,3 +101,240 @@ def verify_entry(verifycert, entry, hash): if error_code != 0: print >>sys.stderr, result[1:] sys.exit(1) + +def hexencode(key): + return base64.b16encode(key).lower() + +def write_chain(key, value, chainsdir, hashed_dir=True): + filename = hexencode(key) + if hashed_dir: + path = chainsdir + "/" \ + + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] + try: + os.makedirs(path) + except Exception, e: + pass + else: + path = chainsdir + f = open(path + "/" + filename, "w") + f.write(value) + f.close() + +def add_to_logorder(logorderfile, key): + f = open(logorderfile, "a") + f.write(hexencode(key) + "\n") + f.close() + +def fsync_logorder(logorderfile): + f = open(logorderfile, "a") + os.fsync(f.fileno()) + f.close() + +def get_new_entries(node, baseurl, own_key, paths): + try: + result = http_request(baseurl + "plop/v1/storage/fetchnewentries", + key=own_key, verifynode=node, + publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return [base64.b64decode(entry) for \ + entry in parsed_result[u"entries"]] + print >>sys.stderr, "ERROR: fetchnewentries", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: fetchnewentries", e.read() + sys.exit(1) + +def get_entries(node, baseurl, own_key, paths, hashes): + try: + params = urllib.urlencode({"hash":[base64.b64encode(hash) for \ + hash in hashes]}, doseq=True) + result = http_request(baseurl + "plop/v1/storage/getentry?" + params, + key=own_key, verifynode=node, + publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) + assert len(entries) == len(hashes) + assert set(entries.keys()) == set(hashes) + return entries + print >>sys.stderr, "ERROR: getentry", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: getentry", e.read() + sys.exit(1) + +def get_curpos(node, baseurl, own_key, paths): + try: + result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"position"] + print >>sys.stderr, "ERROR: currentposition", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: currentposition", e.read() + sys.exit(1) + +def get_verifiedsize(node, baseurl, own_key, paths): + try: + result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"size"] + print >>sys.stderr, "ERROR: verifiedsize", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifiedsize", e.read() + sys.exit(1) + + +def sendlog(node, baseurl, own_key, paths, submission): + try: + result = http_request(baseurl + "plop/v1/frontend/sendlog", + json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendlog", e.read() + sys.stderr.flush() + return None + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def backup_sendlog(node, baseurl, own_key, paths, submission): + try: + result = http_request(baseurl + "plop/v1/merge/sendlog", + json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendlog", e.read() + sys.stderr.flush() + return None + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def sendentry(node, baseurl, own_key, paths, entry, hash): + try: + result = http_request(baseurl + "plop/v1/frontend/sendentry", + json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, + verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendentry", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, hash + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def sendentry_merge(node, baseurl, own_key, paths, entry, hash): + try: + result = http_request(baseurl + "plop/v1/merge/sendentry", + json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, + verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendentry", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, hash + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def sendsth(node, baseurl, own_key, paths, submission): + try: + result = http_request(baseurl + "plop/v1/frontend/sendsth", + json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: sendsth", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def verifyroot(node, baseurl, own_key, paths, treesize): + try: + result = http_request(baseurl + "plop/v1/merge/verifyroot", + json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: verifyroot", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def setverifiedsize(node, baseurl, own_key, paths, treesize): + try: + result = http_request(baseurl + "plop/v1/merge/setverifiedsize", + json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + return json.loads(result) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: setverifiedsize", e.read() + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, submission + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, result + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + +def get_missingentries(node, baseurl, own_key, paths): + try: + result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"entries"] + print >>sys.stderr, "ERROR: missingentries", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: missingentries", e.read() + sys.exit(1) + +def get_missingentriesforbackup(node, baseurl, own_key, paths): + try: + result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"entries"] + print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result + sys.exit(1) + except urllib2.HTTPError, e: + print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() + sys.exit(1) + +def chunks(l, n): + return [l[i:i+n] for i in range(0, len(l), n)] |