diff options
| -rwxr-xr-x | tools/merge.py | 542 | ||||
| -rw-r--r-- | tools/merge_backup.py | 108 | ||||
| -rw-r--r-- | tools/merge_dist.py | 130 | ||||
| -rw-r--r-- | tools/merge_fetch.py | 97 | ||||
| -rw-r--r-- | tools/mergetools.py | 250 | 
5 files changed, 613 insertions, 514 deletions
| diff --git a/tools/merge.py b/tools/merge.py index 2065a2d..212c171 100755 --- a/tools/merge.py +++ b/tools/merge.py @@ -1,518 +1,38 @@  #!/usr/bin/env python  # -*- coding: utf-8 -*-  # -# Copyright (c) 2014, NORDUnet A/S. +# Copyright (c) 2014-2015, NORDUnet A/S.  # See LICENSE for licensing information.  import argparse -import json -import base64 -import urllib -import urllib2 -import sys -import time -import ecdsa -import hashlib -import urlparse -import os  import yaml -import select -import struct -from certtools import build_merkle_tree, create_sth_signature, \ -    check_sth_signature, get_eckey_from_file, timing_point, http_request, \ -    get_public_key_from_file, get_leaf_hash, decode_certificate_chain, \ -    create_ssl_context -from mergetools import parselogrow, get_logorder, read_chain, \ -    verify_entry - -parser = argparse.ArgumentParser(description="") -parser.add_argument('--config', help="System configuration", required=True) -parser.add_argument('--localconfig', help="Local configuration", required=True) -parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge") -parser.add_argument("--timing", action='store_true', help="Print timing information") -args = parser.parse_args() - -config = yaml.load(open(args.config)) -localconfig = yaml.load(open(args.localconfig)) - -ctbaseurl = config["baseurl"] -frontendnodes = config["frontendnodes"] -storagenodes = config["storagenodes"] -secondaries = config.get("mergenodes", []) -paths = localconfig["paths"] -mergedb = paths["mergedb"] - -signingnodes = config["signingnodes"] -create_ssl_context(cafile=paths["https_cacertfile"]) - -chainsdir = mergedb + "/chains" -logorderfile = mergedb + "/logorder" - -own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) - -logpublickey = get_public_key_from_file(paths["logpublickey"]) - -hashed_dir = True - -def hexencode(key): -    return base64.b16encode(key).lower() - -def write_chain(key, value): -    filename = hexencode(key) -    if hashed_dir: -        path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] -        try: -            os.makedirs(path) -        except Exception, e: -            pass -    else: -        path = chainsdir -    f = open(path + "/" + filename, "w") -    f.write(value) -    f.close() - -def add_to_logorder(key): -    f = open(logorderfile, "a") -    f.write(hexencode(key) + "\n") -    f.close() - -def fsync_logorder(): -    f = open(logorderfile, "a") -    os.fsync(f.fileno()) -    f.close() - -def get_new_entries(node, baseurl): -    try: -        result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            return [base64.b64decode(entry) for entry in parsed_result[u"entries"]] -        print >>sys.stderr, "ERROR: fetchnewentries", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: fetchnewentries", e.read() -        sys.exit(1) - -def get_entries(node, baseurl, hashes): -    try: -        params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True) -        result = http_request(baseurl + "plop/v1/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) -            assert len(entries) == len(hashes) -            assert set(entries.keys()) == set(hashes) -            return entries -        print >>sys.stderr, "ERROR: getentry", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: getentry", e.read() -        sys.exit(1) - -def get_curpos(node, baseurl): -    try: -        result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            return parsed_result[u"position"] -        print >>sys.stderr, "ERROR: currentposition", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: currentposition", e.read() -        sys.exit(1) - -def get_verifiedsize(node, baseurl): -    try: -        result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            return parsed_result[u"size"] -        print >>sys.stderr, "ERROR: verifiedsize", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: verifiedsize", e.read() -        sys.exit(1) - - -         -def sendlog(node, baseurl, submission): -    try: -        result = http_request(baseurl + "plop/v1/frontend/sendlog", -            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: sendlog", e.read() -        sys.stderr.flush() -        return None -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, submission -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def backup_sendlog(node, baseurl, submission): -    try: -        result = http_request(baseurl + "plop/v1/merge/sendlog", -            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: sendlog", e.read() -        sys.stderr.flush() -        return None -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, submission -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def sendentry(node, baseurl, entry, hash): -    try: -        result = http_request(baseurl + "plop/v1/frontend/sendentry", -            json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, -            verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: sendentry", e.read() -        sys.exit(1) -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, hash -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def sendentry_merge(node, baseurl, entry, hash): -    try: -        result = http_request(baseurl + "plop/v1/merge/sendentry", -            json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, -            verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: sendentry", e.read() -        sys.exit(1) -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, hash -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def sendsth(node, baseurl, submission): -    try: -        result = http_request(baseurl + "plop/v1/frontend/sendsth", -            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: sendsth", e.read() -        sys.exit(1) -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, submission -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def verifyroot(node, baseurl, treesize): -    try: -        result = http_request(baseurl + "plop/v1/merge/verifyroot", -            json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: verifyroot", e.read() -        sys.exit(1) -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, submission -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def setverifiedsize(node, baseurl, treesize): -    try: -        result = http_request(baseurl + "plop/v1/merge/setverifiedsize", -            json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        return json.loads(result) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: setverifiedsize", e.read() -        sys.exit(1) -    except ValueError, e: -        print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, submission -        print >>sys.stderr, "======= RESPONSE =======" -        print >>sys.stderr, result -        print >>sys.stderr, "========================" -        sys.stderr.flush() -        raise e - -def get_missingentries(node, baseurl): -    try: -        result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            return parsed_result[u"entries"] -        print >>sys.stderr, "ERROR: missingentries", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: missingentries", e.read() -        sys.exit(1) - -def get_missingentriesforbackup(node, baseurl): -    try: -        result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) -        parsed_result = json.loads(result) -        if parsed_result.get(u"result") == u"ok": -            return parsed_result[u"entries"] -        print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result -        sys.exit(1) -    except urllib2.HTTPError, e: -        print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() -        sys.exit(1) - -def chunks(l, n): -    return [l[i:i+n] for i in range(0, len(l), n)] - -timing = timing_point() - -logorder = get_logorder(logorderfile) - -timing_point(timing, "get logorder") - -certsinlog = set(logorder) - -new_entries_per_node = {} -new_entries = set() -entries_to_fetch = {} - -for storagenode in storagenodes: -    print >>sys.stderr, "getting new entries from", storagenode["name"] -    sys.stderr.flush() -    new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"])) -    new_entries.update(new_entries_per_node[storagenode["name"]]) -    entries_to_fetch[storagenode["name"]] = [] - -import subprocess - -timing_point(timing, "get new entries") - -new_entries -= certsinlog - -print >>sys.stderr, "adding", len(new_entries), "entries" -sys.stderr.flush() - -if args.nomerge: -    sys.exit(0) - -for hash in new_entries: -    for storagenode in storagenodes: -        if hash in new_entries_per_node[storagenode["name"]]: -            entries_to_fetch[storagenode["name"]].append(hash) -            break - -verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]], -                              stdin=subprocess.PIPE, stdout=subprocess.PIPE) - -added_entries = 0 -for storagenode in storagenodes: -    print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), -    sys.stderr.flush() -    for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): -        entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk) -        for hash in chunk: -            entry = entries[hash] -            verify_entry(verifycert, entry, hash) -            write_chain(hash, entry) -            add_to_logorder(hash) -            logorder.append(hash) -            certsinlog.add(hash) -            added_entries += 1 -        print >>sys.stderr, added_entries, -        sys.stderr.flush() -    print >>sys.stderr -    sys.stderr.flush() -fsync_logorder() -timing_point(timing, "add entries") -print >>sys.stderr, "added", added_entries, "entries" -sys.stderr.flush() - -verifycert.communicate(struct.pack("I", 0)) - -tree = build_merkle_tree(logorder) -tree_size = len(logorder) -root_hash = tree[-1][0] -timestamp = int(time.time() * 1000) - -for secondary in secondaries: -    if secondary["name"] == config["primarymergenode"]: -        continue -    nodeaddress = "https://%s/" % secondary["address"] -    nodename = secondary["name"] -    timing = timing_point() -    print >>sys.stderr, "backing up to node", nodename -    sys.stderr.flush() -    verifiedsize = get_verifiedsize(nodename, nodeaddress) -    timing_point(timing, "get verified size") -    print >>sys.stderr, "verified size", verifiedsize -    sys.stderr.flush() -    entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] -    print >>sys.stderr, "sending log:", -    sys.stderr.flush() -    for chunk in chunks(entries, 1000): -        for trynumber in range(5, 0, -1): -            sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk}) -            if sendlogresult == None: -                if trynumber == 1: -                    sys.exit(1) -                select.select([], [], [], 10.0) -                print >>sys.stderr, "tries left:", trynumber -                sys.stderr.flush() -                continue -            break -        if sendlogresult["result"] != "ok": -            print >>sys.stderr, "sendlog:", sendlogresult -            sys.exit(1) -        verifiedsize += len(chunk) -        print >>sys.stderr, verifiedsize, -        sys.stderr.flush() -    print >>sys.stderr -    timing_point(timing, "sendlog") -    print >>sys.stderr, "log sent" -    sys.stderr.flush() -    missingentries = get_missingentriesforbackup(nodename, nodeaddress) -    timing_point(timing, "get missing") -    print >>sys.stderr, "missing entries:", len(missingentries) -    sys.stderr.flush() -    fetched_entries = 0 -    print >>sys.stderr, "fetching missing entries", -    sys.stderr.flush() -    for missingentry in missingentries: -        hash = base64.b64decode(missingentry) -        sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash) -        if sendentryresult["result"] != "ok": -            print >>sys.stderr, "send sth:", sendentryresult -            sys.exit(1) -        fetched_entries += 1 -        if added_entries % 1000 == 0: -            print >>sys.stderr, fetched_entries, -            sys.stderr.flush() -    print >>sys.stderr -    sys.stderr.flush() -    timing_point(timing, "send missing") -    verifyrootresult = verifyroot(nodename, nodeaddress, tree_size) -    if verifyrootresult["result"] != "ok": -        print >>sys.stderr, "verifyroot:", verifyrootresult -        sys.exit(1) -    secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) -    if root_hash != secondary_root_hash: -        print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash) -        print >>sys.stderr, "  expected", hexencode(root_hash) -        sys.exit(1) -    timing_point(timing, "verifyroot") -    setverifiedsize(nodename, nodeaddress, tree_size) -    if args.timing: -        print >>sys.stderr, timing["deltatimes"] -        sys.stderr.flush() - -tree_head_signature = None -for signingnode in signingnodes: -    try: -        tree_head_signature = create_sth_signature(tree_size, timestamp, -                                                   root_hash, "https://%s/" % signingnode["address"], key=own_key) -        break -    except urllib2.URLError, e: -        print >>sys.stderr, e -        sys.stderr.flush() -if tree_head_signature == None: -    print >>sys.stderr, "Could not contact any signing nodes" -    sys.exit(1) - -sth = {"tree_size": tree_size, "timestamp": timestamp, -       "sha256_root_hash": base64.b64encode(root_hash), -       "tree_head_signature": base64.b64encode(tree_head_signature)} - -check_sth_signature(ctbaseurl, sth, publickey=logpublickey) - -timing_point(timing, "build sth") - -if args.timing: -    print >>sys.stderr, timing["deltatimes"] -    sys.stderr.flush() - -print hexencode(root_hash) -sys.stdout.flush() - -for frontendnode in frontendnodes: -    nodeaddress = "https://%s/" % frontendnode["address"] -    nodename = frontendnode["name"] -    timing = timing_point() -    print >>sys.stderr, "distributing for node", nodename -    sys.stderr.flush() -    curpos = get_curpos(nodename, nodeaddress) -    timing_point(timing, "get curpos") -    print >>sys.stderr, "current position", curpos -    sys.stderr.flush() -    entries = [base64.b64encode(entry) for entry in logorder[curpos:]] -    print >>sys.stderr, "sending log:", -    sys.stderr.flush() -    for chunk in chunks(entries, 1000): -        for trynumber in range(5, 0, -1): -            sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk}) -            if sendlogresult == None: -                if trynumber == 1: -                    sys.exit(1) -                select.select([], [], [], 10.0) -                print >>sys.stderr, "tries left:", trynumber -                sys.stderr.flush() -                continue -            break -        if sendlogresult["result"] != "ok": -            print >>sys.stderr, "sendlog:", sendlogresult -            sys.exit(1) -        curpos += len(chunk) -        print >>sys.stderr, curpos, -        sys.stderr.flush() -    print >>sys.stderr -    timing_point(timing, "sendlog") -    print >>sys.stderr, "log sent" -    sys.stderr.flush() -    missingentries = get_missingentries(nodename, nodeaddress) -    timing_point(timing, "get missing") -    print >>sys.stderr, "missing entries:", len(missingentries) -    sys.stderr.flush() -    fetched_entries = 0 -    print >>sys.stderr, "fetching missing entries", -    sys.stderr.flush() -    for missingentry in missingentries: -        hash = base64.b64decode(missingentry) -        sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash) -        if sendentryresult["result"] != "ok": -            print >>sys.stderr, "send sth:", sendentryresult -            sys.exit(1) -        fetched_entries += 1 -        if added_entries % 1000 == 0: -            print >>sys.stderr, fetched_entries, -            sys.stderr.flush() -    print >>sys.stderr -    sys.stderr.flush() -    timing_point(timing, "send missing") -    sendsthresult = sendsth(nodename, nodeaddress, sth) -    if sendsthresult["result"] != "ok": -        print >>sys.stderr, "send sth:", sendsthresult -        sys.exit(1) -    timing_point(timing, "send sth") -    if args.timing: -        print >>sys.stderr, timing["deltatimes"] -        sys.stderr.flush() +import sys +from certtools import create_ssl_context +from merge_fetch import merge_fetch +from merge_backup import merge_backup +from merge_dist import merge_dist + +def main(): +    parser = argparse.ArgumentParser(description="") +    parser.add_argument('--config', help="System configuration", +                        required=True) +    parser.add_argument('--localconfig', help="Local configuration", +                        required=True) +    parser.add_argument("--nomerge", action='store_true', +                        help="Don't actually do merge") +    parser.add_argument("--timing", action='store_true', +                        help="Print timing information") +    args = parser.parse_args() + +    config = yaml.load(open(args.config)) +    localconfig = yaml.load(open(args.localconfig)) +    paths = localconfig["paths"] + +    create_ssl_context(cafile=paths["https_cacertfile"]) + +    sth = merge_fetch(args, config, localconfig) +    merge_backup(args, config, localconfig, sth) +    merge_dist(args, config, localconfig, sth) + +if __name__ == '__main__': +    sys.exit(main()) diff --git a/tools/merge_backup.py b/tools/merge_backup.py new file mode 100644 index 0000000..27c71a5 --- /dev/null +++ b/tools/merge_backup.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import base64 +import select +from certtools import timing_point +from mergetools import chunks, backup_sendlog, get_logorder, \ +     get_verifiedsize, get_missingentriesforbackup, read_chain, \ +     hexencode, setverifiedsize, sendentry_merge, verifyroot + +def merge_backup(args, config, localconfig, sth_in): +    paths = localconfig["paths"] +    own_key = (localconfig["nodename"], +               "%s/%s-private.pem" % (paths["privatekeys"], +                                      localconfig["nodename"])) +    secondaries = config.get("mergenodes", []) +    mergedb = paths["mergedb"] +    chainsdir = mergedb + "/chains" +    logorderfile = mergedb + "/logorder" +    timing = timing_point() + +    logorder = get_logorder(logorderfile) +    timing_point(timing, "get logorder") + +    (tree_size, root_hash, _) = sth_in + +    for secondary in secondaries: +        if secondary["name"] == config["primarymergenode"]: +            continue +        nodeaddress = "https://%s/" % secondary["address"] +        nodename = secondary["name"] +        timing = timing_point() +        print >>sys.stderr, "backing up to node", nodename +        sys.stderr.flush() +        verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) +        timing_point(timing, "get verified size") +        print >>sys.stderr, "verified size", verifiedsize +        sys.stderr.flush() +        entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] +        print >>sys.stderr, "sending log:", +        sys.stderr.flush() +        for chunk in chunks(entries, 1000): +            for trynumber in range(5, 0, -1): +                sendlogresult = \ +                  backup_sendlog(nodename, nodeaddress, own_key, paths, +                                 {"start": verifiedsize, "hashes": chunk}) +                if sendlogresult == None: +                    if trynumber == 1: +                        sys.exit(1) +                    select.select([], [], [], 10.0) +                    print >>sys.stderr, "tries left:", trynumber +                    sys.stderr.flush() +                    continue +                break +            if sendlogresult["result"] != "ok": +                print >>sys.stderr, "sendlog:", sendlogresult +                sys.exit(1) +            verifiedsize += len(chunk) +            print >>sys.stderr, verifiedsize, +            sys.stderr.flush() +        print >>sys.stderr +        timing_point(timing, "sendlog") +        print >>sys.stderr, "log sent" +        sys.stderr.flush() +        missingentries = get_missingentriesforbackup(nodename, nodeaddress, +                                                     own_key, paths) +        timing_point(timing, "get missing") +        print >>sys.stderr, "missing entries:", len(missingentries) +        sys.stderr.flush() +        fetched_entries = 0 +        print >>sys.stderr, "fetching missing entries", +        sys.stderr.flush() +        for missingentry in missingentries: +            ehash = base64.b64decode(missingentry) +            sendentryresult = sendentry_merge(nodename, nodeaddress, +                                              own_key, paths, +                                              read_chain(chainsdir, ehash), +                                              ehash) +            if sendentryresult["result"] != "ok": +                print >>sys.stderr, "send sth:", sendentryresult +                sys.exit(1) +            fetched_entries += 1 +            if fetched_entries % 1000 == 0: +                print >>sys.stderr, fetched_entries, +                sys.stderr.flush() +        print >>sys.stderr +        sys.stderr.flush() +        timing_point(timing, "send missing") +        verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, +                                      tree_size) +        if verifyrootresult["result"] != "ok": +            print >>sys.stderr, "verifyroot:", verifyrootresult +            sys.exit(1) +        secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) +        if root_hash != secondary_root_hash: +            print >>sys.stderr, "secondary root hash was", \ +              hexencode(secondary_root_hash) +            print >>sys.stderr, "  expected", hexencode(root_hash) +            sys.exit(1) +        timing_point(timing, "verifyroot") +        setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) +        if args.timing: +            print >>sys.stderr, timing["deltatimes"] +            sys.stderr.flush() diff --git a/tools/merge_dist.py b/tools/merge_dist.py new file mode 100644 index 0000000..2b2f259 --- /dev/null +++ b/tools/merge_dist.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import urllib2 +import base64 +import select +from certtools import timing_point, check_sth_signature, \ +     get_public_key_from_file +from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ +     sendsth, create_sth_signature, hexencode, sendlog, sendentry, read_chain + +def merge_dist(args, config, localconfig, sth_in): +    paths = localconfig["paths"] +    own_key = (localconfig["nodename"], +               "%s/%s-private.pem" % (paths["privatekeys"], +                                      localconfig["nodename"])) +    frontendnodes = config["frontendnodes"] +    signingnodes = config["signingnodes"] +    ctbaseurl = config["baseurl"] +    logpublickey = get_public_key_from_file(paths["logpublickey"]) +    mergedb = paths["mergedb"] +    chainsdir = mergedb + "/chains" +    logorderfile = mergedb + "/logorder" +    timing = timing_point() + +    logorder = get_logorder(logorderfile) +    timing_point(timing, "get logorder") + +    (tree_size, root_hash, timestamp) = sth_in +    tree_head_signature = None +    for signingnode in signingnodes: +        try: +            tree_head_signature = \ +              create_sth_signature(tree_size, timestamp, +                                   root_hash, +                                   "https://%s/" % signingnode["address"], +                                   key=own_key) +            break +        except urllib2.URLError, err: +            print >>sys.stderr, err +            sys.stderr.flush() +    if tree_head_signature == None: +        print >>sys.stderr, "Could not contact any signing nodes" +        sys.exit(1) + +    sth = {"tree_size": tree_size, "timestamp": timestamp, +           "sha256_root_hash": base64.b64encode(root_hash), +           "tree_head_signature": base64.b64encode(tree_head_signature)} + +    check_sth_signature(ctbaseurl, sth, publickey=logpublickey) + +    timing_point(timing, "build sth") + +    if args.timing: +        print >>sys.stderr, timing["deltatimes"] +        sys.stderr.flush() + +    print hexencode(root_hash) +    sys.stdout.flush() + +    for frontendnode in frontendnodes: +        nodeaddress = "https://%s/" % frontendnode["address"] +        nodename = frontendnode["name"] +        timing = timing_point() +        print >>sys.stderr, "distributing for node", nodename +        sys.stderr.flush() +        curpos = get_curpos(nodename, nodeaddress, own_key, paths) +        timing_point(timing, "get curpos") +        print >>sys.stderr, "current position", curpos +        sys.stderr.flush() +        entries = [base64.b64encode(entry) for entry in logorder[curpos:]] +        print >>sys.stderr, "sending log:", +        sys.stderr.flush() +        for chunk in chunks(entries, 1000): +            for trynumber in range(5, 0, -1): +                sendlogresult = sendlog(nodename, nodeaddress, +                                        own_key, paths, +                                        {"start": curpos, "hashes": chunk}) +                if sendlogresult == None: +                    if trynumber == 1: +                        sys.exit(1) +                    select.select([], [], [], 10.0) +                    print >>sys.stderr, "tries left:", trynumber +                    sys.stderr.flush() +                    continue +                break +            if sendlogresult["result"] != "ok": +                print >>sys.stderr, "sendlog:", sendlogresult +                sys.exit(1) +            curpos += len(chunk) +            print >>sys.stderr, curpos, +            sys.stderr.flush() +        print >>sys.stderr +        timing_point(timing, "sendlog") +        print >>sys.stderr, "log sent" +        sys.stderr.flush() +        missingentries = get_missingentries(nodename, nodeaddress, own_key, +                                            paths) +        timing_point(timing, "get missing") +        print >>sys.stderr, "missing entries:", len(missingentries) +        sys.stderr.flush() +        fetched_entries = 0 +        print >>sys.stderr, "fetching missing entries", +        sys.stderr.flush() +        for missingentry in missingentries: +            ehash = base64.b64decode(missingentry) +            sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, +                                        read_chain(chainsdir, ehash), ehash) +            if sendentryresult["result"] != "ok": +                print >>sys.stderr, "send sth:", sendentryresult +                sys.exit(1) +            fetched_entries += 1 +            if fetched_entries % 1000 == 0: +                print >>sys.stderr, fetched_entries, +                sys.stderr.flush() +        print >>sys.stderr +        sys.stderr.flush() +        timing_point(timing, "send missing") +        sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) +        if sendsthresult["result"] != "ok": +            print >>sys.stderr, "send sth:", sendsthresult +            sys.exit(1) +        timing_point(timing, "send sth") +        if args.timing: +            print >>sys.stderr, timing["deltatimes"] +            sys.stderr.flush() diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py new file mode 100644 index 0000000..a0a0396 --- /dev/null +++ b/tools/merge_fetch.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (c) 2014-2015, NORDUnet A/S. +# See LICENSE for licensing information. + +import sys +import struct +import time +import subprocess +from mergetools import get_logorder, verify_entry, get_new_entries, \ +     chunks, fsync_logorder, get_entries, write_chain, add_to_logorder +from certtools import timing_point, build_merkle_tree + +def merge_fetch(args, config, localconfig): +    paths = localconfig["paths"] +    storagenodes = config["storagenodes"] +    mergedb = paths["mergedb"] +    logorderfile = mergedb + "/logorder" +    chainsdir = mergedb + "/chains" +    own_key = (localconfig["nodename"], +               "%s/%s-private.pem" % (paths["privatekeys"], +                                      localconfig["nodename"])) + +    timing = timing_point() + +    logorder = get_logorder(logorderfile) +    timing_point(timing, "get logorder") + +    certsinlog = set(logorder) + +    new_entries_per_node = {} +    new_entries = set() +    entries_to_fetch = {} + +    for storagenode in storagenodes: +        print >>sys.stderr, "getting new entries from", storagenode["name"] +        sys.stderr.flush() +        new_entries_per_node[storagenode["name"]] = \ +          set(get_new_entries(storagenode["name"], +                              "https://%s/" % storagenode["address"], +                              own_key, paths)) +        new_entries.update(new_entries_per_node[storagenode["name"]]) +        entries_to_fetch[storagenode["name"]] = [] +    timing_point(timing, "get new entries") + +    new_entries -= certsinlog +    print >>sys.stderr, "adding", len(new_entries), "entries" +    sys.stderr.flush() + +    if args.nomerge: +        sys.exit(0) + +    for ehash in new_entries: +        for storagenode in storagenodes: +            if ehash in new_entries_per_node[storagenode["name"]]: +                entries_to_fetch[storagenode["name"]].append(ehash) +                break + +    verifycert = subprocess.Popen( +        [paths["verifycert_bin"], paths["known_roots"]], +        stdin=subprocess.PIPE, stdout=subprocess.PIPE) + +    added_entries = 0 +    for storagenode in storagenodes: +        print >>sys.stderr, "getting %d entries from %s:" % \ +          (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), +        sys.stderr.flush() +        for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): +            entries = get_entries(storagenode["name"], +                                  "https://%s/" % storagenode["address"], +                                  own_key, paths, chunk) +            for ehash in chunk: +                entry = entries[ehash] +                verify_entry(verifycert, entry, ehash) +                write_chain(ehash, entry, chainsdir) +                add_to_logorder(logorderfile, ehash) +                logorder.append(ehash) +                certsinlog.add(ehash) +                added_entries += 1 +            print >>sys.stderr, added_entries, +            sys.stderr.flush() +        print >>sys.stderr +        sys.stderr.flush() +    fsync_logorder(logorderfile) +    timing_point(timing, "add entries") +    print >>sys.stderr, "added", added_entries, "entries" +    sys.stderr.flush() + +    verifycert.communicate(struct.pack("I", 0)) + +    tree = build_merkle_tree(logorder) +    tree_size = len(logorder) +    root_hash = tree[-1][0] +    timestamp = int(time.time() * 1000) + +    return (tree_size, root_hash, timestamp) diff --git a/tools/mergetools.py b/tools/mergetools.py index 947d7f4..820087c 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -1,10 +1,17 @@  # Copyright (c) 2015, NORDUnet A/S.  # See LICENSE for licensing information. + +import os  import base64  import hashlib  import sys  import struct -from certtools import get_leaf_hash +import urllib +import urllib2 +import json +from certtools import get_leaf_hash, create_sth_signature, \ +    check_sth_signature, get_eckey_from_file, http_request, \ +    get_leaf_hash, decode_certificate_chain  def parselogrow(row):      return base64.b16decode(row, casefold=True) @@ -22,7 +29,7 @@ def read_chain(chainsdir, key):      filename = base64.b16encode(key).upper()      try:          f = read_chain_open(chainsdir, filename) -    except IOError, e: +    except IOError:          f = read_chain_open(chainsdir, filename.lower())      value = f.read()      f.close() @@ -67,7 +74,7 @@ def unwrap_entry(entry):  def wrap_entry(entry):      return tlv_encodelist([("PLOP", entry), -                        ("S256", hashlib.sha256(entry).digest())]) +                           ("S256", hashlib.sha256(entry).digest())])  def verify_entry(verifycert, entry, hash):      packed = unwrap_entry(entry) @@ -94,3 +101,240 @@ def verify_entry(verifycert, entry, hash):      if error_code != 0:          print >>sys.stderr, result[1:]          sys.exit(1) + +def hexencode(key): +    return base64.b16encode(key).lower() + +def write_chain(key, value, chainsdir, hashed_dir=True): +    filename = hexencode(key) +    if hashed_dir: +        path = chainsdir + "/" \ +          + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] +        try: +            os.makedirs(path) +        except Exception, e: +            pass +    else: +        path = chainsdir +    f = open(path + "/" + filename, "w") +    f.write(value) +    f.close() + +def add_to_logorder(logorderfile, key): +    f = open(logorderfile, "a") +    f.write(hexencode(key) + "\n") +    f.close() + +def fsync_logorder(logorderfile): +    f = open(logorderfile, "a") +    os.fsync(f.fileno()) +    f.close() + +def get_new_entries(node, baseurl, own_key, paths): +    try: +        result = http_request(baseurl + "plop/v1/storage/fetchnewentries", +                              key=own_key, verifynode=node, +                              publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            return [base64.b64decode(entry) for \ +                    entry in parsed_result[u"entries"]] +        print >>sys.stderr, "ERROR: fetchnewentries", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: fetchnewentries", e.read() +        sys.exit(1) + +def get_entries(node, baseurl, own_key, paths, hashes): +    try: +        params = urllib.urlencode({"hash":[base64.b64encode(hash) for \ +                                           hash in hashes]}, doseq=True) +        result = http_request(baseurl + "plop/v1/storage/getentry?" + params, +                              key=own_key, verifynode=node, +                              publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]]) +            assert len(entries) == len(hashes) +            assert set(entries.keys()) == set(hashes) +            return entries +        print >>sys.stderr, "ERROR: getentry", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: getentry", e.read() +        sys.exit(1) + +def get_curpos(node, baseurl, own_key, paths): +    try: +        result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            return parsed_result[u"position"] +        print >>sys.stderr, "ERROR: currentposition", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: currentposition", e.read() +        sys.exit(1) + +def get_verifiedsize(node, baseurl, own_key, paths): +    try: +        result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            return parsed_result[u"size"] +        print >>sys.stderr, "ERROR: verifiedsize", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: verifiedsize", e.read() +        sys.exit(1) + + +def sendlog(node, baseurl, own_key, paths, submission): +    try: +        result = http_request(baseurl + "plop/v1/frontend/sendlog", +            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: sendlog", e.read() +        sys.stderr.flush() +        return None +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, submission +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def backup_sendlog(node, baseurl, own_key, paths, submission): +    try: +        result = http_request(baseurl + "plop/v1/merge/sendlog", +            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: sendlog", e.read() +        sys.stderr.flush() +        return None +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, submission +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def sendentry(node, baseurl, own_key, paths, entry, hash): +    try: +        result = http_request(baseurl + "plop/v1/frontend/sendentry", +            json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, +            verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: sendentry", e.read() +        sys.exit(1) +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, hash +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def sendentry_merge(node, baseurl, own_key, paths, entry, hash): +    try: +        result = http_request(baseurl + "plop/v1/merge/sendentry", +            json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key, +            verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: sendentry", e.read() +        sys.exit(1) +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, hash +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def sendsth(node, baseurl, own_key, paths, submission): +    try: +        result = http_request(baseurl + "plop/v1/frontend/sendsth", +            json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: sendsth", e.read() +        sys.exit(1) +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, submission +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def verifyroot(node, baseurl, own_key, paths, treesize): +    try: +        result = http_request(baseurl + "plop/v1/merge/verifyroot", +            json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: verifyroot", e.read() +        sys.exit(1) +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, submission +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def setverifiedsize(node, baseurl, own_key, paths, treesize): +    try: +        result = http_request(baseurl + "plop/v1/merge/setverifiedsize", +            json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        return json.loads(result) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: setverifiedsize", e.read() +        sys.exit(1) +    except ValueError, e: +        print >>sys.stderr, "==== FAILED REQUEST ====" +        print >>sys.stderr, submission +        print >>sys.stderr, "======= RESPONSE =======" +        print >>sys.stderr, result +        print >>sys.stderr, "========================" +        sys.stderr.flush() +        raise e + +def get_missingentries(node, baseurl, own_key, paths): +    try: +        result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            return parsed_result[u"entries"] +        print >>sys.stderr, "ERROR: missingentries", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: missingentries", e.read() +        sys.exit(1) + +def get_missingentriesforbackup(node, baseurl, own_key, paths): +    try: +        result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) +        parsed_result = json.loads(result) +        if parsed_result.get(u"result") == u"ok": +            return parsed_result[u"entries"] +        print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result +        sys.exit(1) +    except urllib2.HTTPError, e: +        print >>sys.stderr, "ERROR: missingentriesforbackup", e.read() +        sys.exit(1) + +def chunks(l, n): +    return [l[i:i+n] for i in range(0, len(l), n)] | 
