summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2015-09-22 21:59:48 +0200
committerLinus Nordberg <linus@nordu.net>2015-09-27 13:38:29 +0200
commit5f80e1a1b67221ad81a3c717de569647bf121967 (patch)
tree5d8a61fb94ee5c4e73e01610aa386da5bd083fe2
parenta11ef0bb9db8425c4a7e1c879b4a0116d168a1fb (diff)
Split merge.py into three pieces.
-rwxr-xr-xtools/merge.py542
-rw-r--r--tools/merge_backup.py108
-rw-r--r--tools/merge_dist.py130
-rw-r--r--tools/merge_fetch.py97
-rw-r--r--tools/mergetools.py250
5 files changed, 613 insertions, 514 deletions
diff --git a/tools/merge.py b/tools/merge.py
index 2065a2d..212c171 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -1,518 +1,38 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
-# Copyright (c) 2014, NORDUnet A/S.
+# Copyright (c) 2014-2015, NORDUnet A/S.
# See LICENSE for licensing information.
import argparse
-import json
-import base64
-import urllib
-import urllib2
-import sys
-import time
-import ecdsa
-import hashlib
-import urlparse
-import os
import yaml
-import select
-import struct
-from certtools import build_merkle_tree, create_sth_signature, \
- check_sth_signature, get_eckey_from_file, timing_point, http_request, \
- get_public_key_from_file, get_leaf_hash, decode_certificate_chain, \
- create_ssl_context
-from mergetools import parselogrow, get_logorder, read_chain, \
- verify_entry
-
-parser = argparse.ArgumentParser(description="")
-parser.add_argument('--config', help="System configuration", required=True)
-parser.add_argument('--localconfig', help="Local configuration", required=True)
-parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge")
-parser.add_argument("--timing", action='store_true', help="Print timing information")
-args = parser.parse_args()
-
-config = yaml.load(open(args.config))
-localconfig = yaml.load(open(args.localconfig))
-
-ctbaseurl = config["baseurl"]
-frontendnodes = config["frontendnodes"]
-storagenodes = config["storagenodes"]
-secondaries = config.get("mergenodes", [])
-paths = localconfig["paths"]
-mergedb = paths["mergedb"]
-
-signingnodes = config["signingnodes"]
-create_ssl_context(cafile=paths["https_cacertfile"])
-
-chainsdir = mergedb + "/chains"
-logorderfile = mergedb + "/logorder"
-
-own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"]))
-
-logpublickey = get_public_key_from_file(paths["logpublickey"])
-
-hashed_dir = True
-
-def hexencode(key):
- return base64.b16encode(key).lower()
-
-def write_chain(key, value):
- filename = hexencode(key)
- if hashed_dir:
- path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
- try:
- os.makedirs(path)
- except Exception, e:
- pass
- else:
- path = chainsdir
- f = open(path + "/" + filename, "w")
- f.write(value)
- f.close()
-
-def add_to_logorder(key):
- f = open(logorderfile, "a")
- f.write(hexencode(key) + "\n")
- f.close()
-
-def fsync_logorder():
- f = open(logorderfile, "a")
- os.fsync(f.fileno())
- f.close()
-
-def get_new_entries(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
- print >>sys.stderr, "ERROR: fetchnewentries", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: fetchnewentries", e.read()
- sys.exit(1)
-
-def get_entries(node, baseurl, hashes):
- try:
- params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True)
- result = http_request(baseurl + "plop/v1/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
- assert len(entries) == len(hashes)
- assert set(entries.keys()) == set(hashes)
- return entries
- print >>sys.stderr, "ERROR: getentry", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: getentry", e.read()
- sys.exit(1)
-
-def get_curpos(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"position"]
- print >>sys.stderr, "ERROR: currentposition", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: currentposition", e.read()
- sys.exit(1)
-
-def get_verifiedsize(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"size"]
- print >>sys.stderr, "ERROR: verifiedsize", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: verifiedsize", e.read()
- sys.exit(1)
-
-
-
-def sendlog(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendlog",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.read()
- sys.stderr.flush()
- return None
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def backup_sendlog(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/merge/sendlog",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.read()
- sys.stderr.flush()
- return None
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendentry(node, baseurl, entry, hash):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendentry",
- json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
- verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendentry_merge(node, baseurl, entry, hash):
- try:
- result = http_request(baseurl + "plop/v1/merge/sendentry",
- json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
- verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def sendsth(node, baseurl, submission):
- try:
- result = http_request(baseurl + "plop/v1/frontend/sendsth",
- json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def verifyroot(node, baseurl, treesize):
- try:
- result = http_request(baseurl + "plop/v1/merge/verifyroot",
- json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: verifyroot", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def setverifiedsize(node, baseurl, treesize):
- try:
- result = http_request(baseurl + "plop/v1/merge/setverifiedsize",
- json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: setverifiedsize", e.read()
- sys.exit(1)
- except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
-
-def get_missingentries(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"entries"]
- print >>sys.stderr, "ERROR: missingentries", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: missingentries", e.read()
- sys.exit(1)
-
-def get_missingentriesforbackup(node, baseurl):
- try:
- result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- parsed_result = json.loads(result)
- if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"entries"]
- print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
- sys.exit(1)
- except urllib2.HTTPError, e:
- print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
- sys.exit(1)
-
-def chunks(l, n):
- return [l[i:i+n] for i in range(0, len(l), n)]
-
-timing = timing_point()
-
-logorder = get_logorder(logorderfile)
-
-timing_point(timing, "get logorder")
-
-certsinlog = set(logorder)
-
-new_entries_per_node = {}
-new_entries = set()
-entries_to_fetch = {}
-
-for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
- new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"]))
- new_entries.update(new_entries_per_node[storagenode["name"]])
- entries_to_fetch[storagenode["name"]] = []
-
-import subprocess
-
-timing_point(timing, "get new entries")
-
-new_entries -= certsinlog
-
-print >>sys.stderr, "adding", len(new_entries), "entries"
-sys.stderr.flush()
-
-if args.nomerge:
- sys.exit(0)
-
-for hash in new_entries:
- for storagenode in storagenodes:
- if hash in new_entries_per_node[storagenode["name"]]:
- entries_to_fetch[storagenode["name"]].append(hash)
- break
-
-verifycert = subprocess.Popen([paths["verifycert_bin"], paths["known_roots"]],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
-
-added_entries = 0
-for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
- for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
- entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk)
- for hash in chunk:
- entry = entries[hash]
- verify_entry(verifycert, entry, hash)
- write_chain(hash, entry)
- add_to_logorder(hash)
- logorder.append(hash)
- certsinlog.add(hash)
- added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
-fsync_logorder()
-timing_point(timing, "add entries")
-print >>sys.stderr, "added", added_entries, "entries"
-sys.stderr.flush()
-
-verifycert.communicate(struct.pack("I", 0))
-
-tree = build_merkle_tree(logorder)
-tree_size = len(logorder)
-root_hash = tree[-1][0]
-timestamp = int(time.time() * 1000)
-
-for secondary in secondaries:
- if secondary["name"] == config["primarymergenode"]:
- continue
- nodeaddress = "https://%s/" % secondary["address"]
- nodename = secondary["name"]
- timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress)
- timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
- missingentries = get_missingentriesforbackup(nodename, nodeaddress)
- timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- hash = base64.b64decode(missingentry)
- sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendentryresult
- sys.exit(1)
- fetched_entries += 1
- if added_entries % 1000 == 0:
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, tree_size)
- if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
- secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
- if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
- timing_point(timing, "verifyroot")
- setverifiedsize(nodename, nodeaddress, tree_size)
- if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
-
-tree_head_signature = None
-for signingnode in signingnodes:
- try:
- tree_head_signature = create_sth_signature(tree_size, timestamp,
- root_hash, "https://%s/" % signingnode["address"], key=own_key)
- break
- except urllib2.URLError, e:
- print >>sys.stderr, e
- sys.stderr.flush()
-if tree_head_signature == None:
- print >>sys.stderr, "Could not contact any signing nodes"
- sys.exit(1)
-
-sth = {"tree_size": tree_size, "timestamp": timestamp,
- "sha256_root_hash": base64.b64encode(root_hash),
- "tree_head_signature": base64.b64encode(tree_head_signature)}
-
-check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
-
-timing_point(timing, "build sth")
-
-if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
-
-print hexencode(root_hash)
-sys.stdout.flush()
-
-for frontendnode in frontendnodes:
- nodeaddress = "https://%s/" % frontendnode["address"]
- nodename = frontendnode["name"]
- timing = timing_point()
- print >>sys.stderr, "distributing for node", nodename
- sys.stderr.flush()
- curpos = get_curpos(nodename, nodeaddress)
- timing_point(timing, "get curpos")
- print >>sys.stderr, "current position", curpos
- sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
- missingentries = get_missingentries(nodename, nodeaddress)
- timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- hash = base64.b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendentryresult
- sys.exit(1)
- fetched_entries += 1
- if added_entries % 1000 == 0:
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
- sendsthresult = sendsth(nodename, nodeaddress, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "send sth:", sendsthresult
- sys.exit(1)
- timing_point(timing, "send sth")
- if args.timing:
- print >>sys.stderr, timing["deltatimes"]
- sys.stderr.flush()
+import sys
+from certtools import create_ssl_context
+from merge_fetch import merge_fetch
+from merge_backup import merge_backup
+from merge_dist import merge_dist
+
+def main():
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('--config', help="System configuration",
+ required=True)
+ parser.add_argument('--localconfig', help="Local configuration",
+ required=True)
+ parser.add_argument("--nomerge", action='store_true',
+ help="Don't actually do merge")
+ parser.add_argument("--timing", action='store_true',
+ help="Print timing information")
+ args = parser.parse_args()
+
+ config = yaml.load(open(args.config))
+ localconfig = yaml.load(open(args.localconfig))
+ paths = localconfig["paths"]
+
+ create_ssl_context(cafile=paths["https_cacertfile"])
+
+ sth = merge_fetch(args, config, localconfig)
+ merge_backup(args, config, localconfig, sth)
+ merge_dist(args, config, localconfig, sth)
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
new file mode 100644
index 0000000..27c71a5
--- /dev/null
+++ b/tools/merge_backup.py
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2014-2015, NORDUnet A/S.
+# See LICENSE for licensing information.
+
+import sys
+import base64
+import select
+from certtools import timing_point
+from mergetools import chunks, backup_sendlog, get_logorder, \
+ get_verifiedsize, get_missingentriesforbackup, read_chain, \
+ hexencode, setverifiedsize, sendentry_merge, verifyroot
+
+def merge_backup(args, config, localconfig, sth_in):
+ paths = localconfig["paths"]
+ own_key = (localconfig["nodename"],
+ "%s/%s-private.pem" % (paths["privatekeys"],
+ localconfig["nodename"]))
+ secondaries = config.get("mergenodes", [])
+ mergedb = paths["mergedb"]
+ chainsdir = mergedb + "/chains"
+ logorderfile = mergedb + "/logorder"
+ timing = timing_point()
+
+ logorder = get_logorder(logorderfile)
+ timing_point(timing, "get logorder")
+
+ (tree_size, root_hash, _) = sth_in
+
+ for secondary in secondaries:
+ if secondary["name"] == config["primarymergenode"]:
+ continue
+ nodeaddress = "https://%s/" % secondary["address"]
+ nodename = secondary["name"]
+ timing = timing_point()
+ print >>sys.stderr, "backing up to node", nodename
+ sys.stderr.flush()
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
+ sys.stderr.flush()
+ entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = \
+ backup_sendlog(nodename, nodeaddress, own_key, paths,
+ {"start": verifiedsize, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ select.select([], [], [], 10.0)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+ print >>sys.stderr
+ timing_point(timing, "sendlog")
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+ fetched_entries = 0
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
+ for missingentry in missingentries:
+ ehash = base64.b64decode(missingentry)
+ sendentryresult = sendentry_merge(nodename, nodeaddress,
+ own_key, paths,
+ read_chain(chainsdir, ehash),
+ ehash)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "send sth:", sendentryresult
+ sys.exit(1)
+ fetched_entries += 1
+ if fetched_entries % 1000 == 0:
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ if verifyrootresult["result"] != "ok":
+ print >>sys.stderr, "verifyroot:", verifyrootresult
+ sys.exit(1)
+ secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ if root_hash != secondary_root_hash:
+ print >>sys.stderr, "secondary root hash was", \
+ hexencode(secondary_root_hash)
+ print >>sys.stderr, " expected", hexencode(root_hash)
+ sys.exit(1)
+ timing_point(timing, "verifyroot")
+ setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
+ if args.timing:
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
new file mode 100644
index 0000000..2b2f259
--- /dev/null
+++ b/tools/merge_dist.py
@@ -0,0 +1,130 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2014-2015, NORDUnet A/S.
+# See LICENSE for licensing information.
+
+import sys
+import urllib2
+import base64
+import select
+from certtools import timing_point, check_sth_signature, \
+ get_public_key_from_file
+from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
+ sendsth, create_sth_signature, hexencode, sendlog, sendentry, read_chain
+
+def merge_dist(args, config, localconfig, sth_in):
+ paths = localconfig["paths"]
+ own_key = (localconfig["nodename"],
+ "%s/%s-private.pem" % (paths["privatekeys"],
+ localconfig["nodename"]))
+ frontendnodes = config["frontendnodes"]
+ signingnodes = config["signingnodes"]
+ ctbaseurl = config["baseurl"]
+ logpublickey = get_public_key_from_file(paths["logpublickey"])
+ mergedb = paths["mergedb"]
+ chainsdir = mergedb + "/chains"
+ logorderfile = mergedb + "/logorder"
+ timing = timing_point()
+
+ logorder = get_logorder(logorderfile)
+ timing_point(timing, "get logorder")
+
+ (tree_size, root_hash, timestamp) = sth_in
+ tree_head_signature = None
+ for signingnode in signingnodes:
+ try:
+ tree_head_signature = \
+ create_sth_signature(tree_size, timestamp,
+ root_hash,
+ "https://%s/" % signingnode["address"],
+ key=own_key)
+ break
+ except urllib2.URLError, err:
+ print >>sys.stderr, err
+ sys.stderr.flush()
+ if tree_head_signature == None:
+ print >>sys.stderr, "Could not contact any signing nodes"
+ sys.exit(1)
+
+ sth = {"tree_size": tree_size, "timestamp": timestamp,
+ "sha256_root_hash": base64.b64encode(root_hash),
+ "tree_head_signature": base64.b64encode(tree_head_signature)}
+
+ check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
+
+ timing_point(timing, "build sth")
+
+ if args.timing:
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
+
+ print hexencode(root_hash)
+ sys.stdout.flush()
+
+ for frontendnode in frontendnodes:
+ nodeaddress = "https://%s/" % frontendnode["address"]
+ nodename = frontendnode["name"]
+ timing = timing_point()
+ print >>sys.stderr, "distributing for node", nodename
+ sys.stderr.flush()
+ curpos = get_curpos(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get curpos")
+ print >>sys.stderr, "current position", curpos
+ sys.stderr.flush()
+ entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = sendlog(nodename, nodeaddress,
+ own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ select.select([], [], [], 10.0)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ curpos += len(chunk)
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
+ timing_point(timing, "sendlog")
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+ missingentries = get_missingentries(nodename, nodeaddress, own_key,
+ paths)
+ timing_point(timing, "get missing")
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+ fetched_entries = 0
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
+ for missingentry in missingentries:
+ ehash = base64.b64decode(missingentry)
+ sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
+ read_chain(chainsdir, ehash), ehash)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "send sth:", sendentryresult
+ sys.exit(1)
+ fetched_entries += 1
+ if fetched_entries % 1000 == 0:
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+ sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
+ if sendsthresult["result"] != "ok":
+ print >>sys.stderr, "send sth:", sendsthresult
+ sys.exit(1)
+ timing_point(timing, "send sth")
+ if args.timing:
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
new file mode 100644
index 0000000..a0a0396
--- /dev/null
+++ b/tools/merge_fetch.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2014-2015, NORDUnet A/S.
+# See LICENSE for licensing information.
+
+import sys
+import struct
+import time
+import subprocess
+from mergetools import get_logorder, verify_entry, get_new_entries, \
+ chunks, fsync_logorder, get_entries, write_chain, add_to_logorder
+from certtools import timing_point, build_merkle_tree
+
+def merge_fetch(args, config, localconfig):
+ paths = localconfig["paths"]
+ storagenodes = config["storagenodes"]
+ mergedb = paths["mergedb"]
+ logorderfile = mergedb + "/logorder"
+ chainsdir = mergedb + "/chains"
+ own_key = (localconfig["nodename"],
+ "%s/%s-private.pem" % (paths["privatekeys"],
+ localconfig["nodename"]))
+
+ timing = timing_point()
+
+ logorder = get_logorder(logorderfile)
+ timing_point(timing, "get logorder")
+
+ certsinlog = set(logorder)
+
+ new_entries_per_node = {}
+ new_entries = set()
+ entries_to_fetch = {}
+
+ for storagenode in storagenodes:
+ print >>sys.stderr, "getting new entries from", storagenode["name"]
+ sys.stderr.flush()
+ new_entries_per_node[storagenode["name"]] = \
+ set(get_new_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths))
+ new_entries.update(new_entries_per_node[storagenode["name"]])
+ entries_to_fetch[storagenode["name"]] = []
+ timing_point(timing, "get new entries")
+
+ new_entries -= certsinlog
+ print >>sys.stderr, "adding", len(new_entries), "entries"
+ sys.stderr.flush()
+
+ if args.nomerge:
+ sys.exit(0)
+
+ for ehash in new_entries:
+ for storagenode in storagenodes:
+ if ehash in new_entries_per_node[storagenode["name"]]:
+ entries_to_fetch[storagenode["name"]].append(ehash)
+ break
+
+ verifycert = subprocess.Popen(
+ [paths["verifycert_bin"], paths["known_roots"]],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+
+ added_entries = 0
+ for storagenode in storagenodes:
+ print >>sys.stderr, "getting %d entries from %s:" % \
+ (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
+ sys.stderr.flush()
+ for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
+ entries = get_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths, chunk)
+ for ehash in chunk:
+ entry = entries[ehash]
+ verify_entry(verifycert, entry, ehash)
+ write_chain(ehash, entry, chainsdir)
+ add_to_logorder(logorderfile, ehash)
+ logorder.append(ehash)
+ certsinlog.add(ehash)
+ added_entries += 1
+ print >>sys.stderr, added_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ fsync_logorder(logorderfile)
+ timing_point(timing, "add entries")
+ print >>sys.stderr, "added", added_entries, "entries"
+ sys.stderr.flush()
+
+ verifycert.communicate(struct.pack("I", 0))
+
+ tree = build_merkle_tree(logorder)
+ tree_size = len(logorder)
+ root_hash = tree[-1][0]
+ timestamp = int(time.time() * 1000)
+
+ return (tree_size, root_hash, timestamp)
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 947d7f4..820087c 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -1,10 +1,17 @@
# Copyright (c) 2015, NORDUnet A/S.
# See LICENSE for licensing information.
+
+import os
import base64
import hashlib
import sys
import struct
-from certtools import get_leaf_hash
+import urllib
+import urllib2
+import json
+from certtools import get_leaf_hash, create_sth_signature, \
+ check_sth_signature, get_eckey_from_file, http_request, \
+ get_leaf_hash, decode_certificate_chain
def parselogrow(row):
return base64.b16decode(row, casefold=True)
@@ -22,7 +29,7 @@ def read_chain(chainsdir, key):
filename = base64.b16encode(key).upper()
try:
f = read_chain_open(chainsdir, filename)
- except IOError, e:
+ except IOError:
f = read_chain_open(chainsdir, filename.lower())
value = f.read()
f.close()
@@ -67,7 +74,7 @@ def unwrap_entry(entry):
def wrap_entry(entry):
return tlv_encodelist([("PLOP", entry),
- ("S256", hashlib.sha256(entry).digest())])
+ ("S256", hashlib.sha256(entry).digest())])
def verify_entry(verifycert, entry, hash):
packed = unwrap_entry(entry)
@@ -94,3 +101,240 @@ def verify_entry(verifycert, entry, hash):
if error_code != 0:
print >>sys.stderr, result[1:]
sys.exit(1)
+
+def hexencode(key):
+ return base64.b16encode(key).lower()
+
+def write_chain(key, value, chainsdir, hashed_dir=True):
+ filename = hexencode(key)
+ if hashed_dir:
+ path = chainsdir + "/" \
+ + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
+ try:
+ os.makedirs(path)
+ except Exception, e:
+ pass
+ else:
+ path = chainsdir
+ f = open(path + "/" + filename, "w")
+ f.write(value)
+ f.close()
+
+def add_to_logorder(logorderfile, key):
+ f = open(logorderfile, "a")
+ f.write(hexencode(key) + "\n")
+ f.close()
+
+def fsync_logorder(logorderfile):
+ f = open(logorderfile, "a")
+ os.fsync(f.fileno())
+ f.close()
+
+def get_new_entries(node, baseurl, own_key, paths):
+ try:
+ result = http_request(baseurl + "plop/v1/storage/fetchnewentries",
+ key=own_key, verifynode=node,
+ publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return [base64.b64decode(entry) for \
+ entry in parsed_result[u"entries"]]
+ print >>sys.stderr, "ERROR: fetchnewentries", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: fetchnewentries", e.read()
+ sys.exit(1)
+
+def get_entries(node, baseurl, own_key, paths, hashes):
+ try:
+ params = urllib.urlencode({"hash":[base64.b64encode(hash) for \
+ hash in hashes]}, doseq=True)
+ result = http_request(baseurl + "plop/v1/storage/getentry?" + params,
+ key=own_key, verifynode=node,
+ publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
+ assert len(entries) == len(hashes)
+ assert set(entries.keys()) == set(hashes)
+ return entries
+ print >>sys.stderr, "ERROR: getentry", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: getentry", e.read()
+ sys.exit(1)
+
+def get_curpos(node, baseurl, own_key, paths):
+ try:
+ result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"position"]
+ print >>sys.stderr, "ERROR: currentposition", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: currentposition", e.read()
+ sys.exit(1)
+
+def get_verifiedsize(node, baseurl, own_key, paths):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"size"]
+ print >>sys.stderr, "ERROR: verifiedsize", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifiedsize", e.read()
+ sys.exit(1)
+
+
+def sendlog(node, baseurl, own_key, paths, submission):
+ try:
+ result = http_request(baseurl + "plop/v1/frontend/sendlog",
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendlog", e.read()
+ sys.stderr.flush()
+ return None
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def backup_sendlog(node, baseurl, own_key, paths, submission):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/sendlog",
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendlog", e.read()
+ sys.stderr.flush()
+ return None
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def sendentry(node, baseurl, own_key, paths, entry, hash):
+ try:
+ result = http_request(baseurl + "plop/v1/frontend/sendentry",
+ json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+ verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendentry", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, hash
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def sendentry_merge(node, baseurl, own_key, paths, entry, hash):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/sendentry",
+ json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+ verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendentry", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, hash
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def sendsth(node, baseurl, own_key, paths, submission):
+ try:
+ result = http_request(baseurl + "plop/v1/frontend/sendsth",
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendsth", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def verifyroot(node, baseurl, own_key, paths, treesize):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/verifyroot",
+ json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifyroot", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def setverifiedsize(node, baseurl, own_key, paths, treesize):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/setverifiedsize",
+ json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: setverifiedsize", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
+def get_missingentries(node, baseurl, own_key, paths):
+ try:
+ result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"entries"]
+ print >>sys.stderr, "ERROR: missingentries", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: missingentries", e.read()
+ sys.exit(1)
+
+def get_missingentriesforbackup(node, baseurl, own_key, paths):
+ try:
+ result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"entries"]
+ print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
+ sys.exit(1)
+
+def chunks(l, n):
+ return [l[i:i+n] for i in range(0, len(l), n)]