summaryrefslogtreecommitdiff
path: root/tools/merge.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge.py')
-rwxr-xr-xtools/merge.py139
1 files changed, 89 insertions, 50 deletions
diff --git a/tools/merge.py b/tools/merge.py
index c137f4b..f9c93d9 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -14,27 +14,38 @@ import time
import ecdsa
import hashlib
import urlparse
-from certtools import build_merkle_tree, create_sth_signature, check_sth_signature, get_eckey_from_file, timing_point, http_request
+import os
+import yaml
+from certtools import build_merkle_tree, create_sth_signature, \
+ check_sth_signature, get_eckey_from_file, timing_point, http_request, \
+ get_public_key_from_file
parser = argparse.ArgumentParser(description="")
-parser.add_argument("--baseurl", metavar="url", help="Base URL for CT server", required=True)
-parser.add_argument("--frontend", action="append", metavar="url", help="Base URL for frontend server", required=True)
-parser.add_argument("--storage", action="append", metavar="url", help="Base URL for storage server", required=True)
-parser.add_argument("--mergedb", metavar="dir", help="Merge database directory", required=True)
-parser.add_argument("--signing", metavar="url", help="Base URL for signing server", required=True)
-parser.add_argument("--own-keyname", metavar="keyname", help="The key name of the merge node", required=True)
-parser.add_argument("--own-keyfile", metavar="keyfile", help="The file containing the private key of the merge node", required=True)
+parser.add_argument('--config', help="System configuration", required=True)
+parser.add_argument('--localconfig', help="Local configuration", required=True)
parser.add_argument("--nomerge", action='store_true', help="Don't actually do merge")
+parser.add_argument("--timing", action='store_true', help="Print timing information")
args = parser.parse_args()
-ctbaseurl = args.baseurl
-frontendnodes = args.frontend
-storagenodes = args.storage
+config = yaml.load(open(args.config))
+localconfig = yaml.load(open(args.localconfig))
-chainsdir = args.mergedb + "/chains"
-logorderfile = args.mergedb + "/logorder"
+ctbaseurl = config["baseurl"]
+frontendnodes = config["frontendnodes"]
+storagenodes = config["storagenodes"]
+paths = localconfig["paths"]
+mergedb = paths["mergedb"]
-own_key = (args.own_keyname, args.own_keyfile)
+signingnodes = config["signingnodes"]
+
+chainsdir = mergedb + "/chains"
+logorderfile = mergedb + "/logorder"
+
+own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"]))
+
+logpublickey = get_public_key_from_file(paths["logpublickey"])
+
+hashed_dir = True
def parselogrow(row):
return base64.b16decode(row)
@@ -44,12 +55,26 @@ def get_logorder():
return [parselogrow(row.rstrip()) for row in f]
def write_chain(key, value):
- f = open(chainsdir + "/" + base64.b16encode(key), "w")
+ filename = base64.b16encode(key)
+ if hashed_dir:
+ path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
+ try:
+ os.makedirs(path)
+ except Exception, e:
+ print e
+ else:
+ path = chainsdir
+ f = open(path + "/" + filename, "w")
f.write(value)
f.close()
def read_chain(key):
- f = open(chainsdir + "/" + base64.b16encode(key), "r")
+ filename = base64.b16encode(key)
+ path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
+ try:
+ f = open(path + "/" + filename, "r")
+ except IOError, e:
+ f = open(chainsdir + "/" + filename, "r")
value = f.read()
f.close()
return value
@@ -59,9 +84,9 @@ def add_to_logorder(key):
f.write(base64.b16encode(key) + "\n")
f.close()
-def get_new_entries(baseurl):
+def get_new_entries(node, baseurl):
try:
- result = http_request(baseurl + "ct/storage/fetchnewentries", key=own_key)
+ result = http_request(baseurl + "ct/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return [base64.b64decode(entry) for entry in parsed_result[u"entries"]]
@@ -71,10 +96,10 @@ def get_new_entries(baseurl):
print "ERROR: fetchnewentries", e.read()
sys.exit(1)
-def get_entries(baseurl, hashes):
+def get_entries(node, baseurl, hashes):
try:
params = urllib.urlencode({"hash":[base64.b64encode(hash) for hash in hashes]}, doseq=True)
- result = http_request(baseurl + "ct/storage/getentry?" + params, key=own_key)
+ result = http_request(baseurl + "ct/storage/getentry?" + params, key=own_key, verifynode=node, publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for entry in parsed_result[u"entries"]])
@@ -87,9 +112,9 @@ def get_entries(baseurl, hashes):
print "ERROR: getentry", e.read()
sys.exit(1)
-def get_curpos(baseurl):
+def get_curpos(node, baseurl):
try:
- result = http_request(baseurl + "ct/frontend/currentposition", key=own_key)
+ result = http_request(baseurl + "ct/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return parsed_result[u"position"]
@@ -99,10 +124,10 @@ def get_curpos(baseurl):
print "ERROR: currentposition", e.read()
sys.exit(1)
-def sendlog(baseurl, submission):
+def sendlog(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendlog",
- json.dumps(submission), key=own_key)
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR: sendlog", e.read()
@@ -115,10 +140,11 @@ def sendlog(baseurl, submission):
print "========================"
raise e
-def sendentry(baseurl, entry, hash):
+def sendentry(node, baseurl, entry, hash):
try:
result = http_request(baseurl + "ct/frontend/sendentry",
- json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key)
+ json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+ verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR: sendentry", e.read()
@@ -131,10 +157,10 @@ def sendentry(baseurl, entry, hash):
print "========================"
raise e
-def sendsth(baseurl, submission):
+def sendsth(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendsth",
- json.dumps(submission), key=own_key)
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except urllib2.HTTPError, e:
print "ERROR: sendsth", e.read()
@@ -147,9 +173,9 @@ def sendsth(baseurl, submission):
print "========================"
raise e
-def get_missingentries(baseurl):
+def get_missingentries(node, baseurl):
try:
- result = http_request(baseurl + "ct/frontend/missingentries", key=own_key)
+ result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
return parsed_result[u"entries"]
@@ -175,10 +201,10 @@ new_entries = set()
entries_to_fetch = {}
for storagenode in storagenodes:
- print "getting new entries from", storagenode
- new_entries_per_node[storagenode] = set(get_new_entries(storagenode))
- new_entries.update(new_entries_per_node[storagenode])
- entries_to_fetch[storagenode] = []
+ print "getting new entries from", storagenode["name"]
+ new_entries_per_node[storagenode["name"]] = set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"]))
+ new_entries.update(new_entries_per_node[storagenode["name"]])
+ entries_to_fetch[storagenode["name"]] = []
timing_point(timing, "get new entries")
@@ -191,16 +217,16 @@ if args.nomerge:
for hash in new_entries:
for storagenode in storagenodes:
- if hash in new_entries_per_node[storagenode]:
- entries_to_fetch[storagenode].append(hash)
+ if hash in new_entries_per_node[storagenode["name"]]:
+ entries_to_fetch[storagenode["name"]].append(hash)
break
added_entries = 0
for storagenode in storagenodes:
- print "getting", len(entries_to_fetch[storagenode]), "entries from", storagenode
- for chunk in chunks(entries_to_fetch[storagenode], 100):
- entries = get_entries(storagenode, chunk)
+ print "getting", len(entries_to_fetch[storagenode["name"]]), "entries from", storagenode["name"]
+ for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
+ entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], chunk)
for hash in chunk:
entry = entries[hash]
write_chain(hash, entry)
@@ -216,30 +242,42 @@ tree_size = len(logorder)
root_hash = tree[-1][0]
timestamp = int(time.time() * 1000)
-tree_head_signature = create_sth_signature(tree_size, timestamp,
- root_hash, args.signing, key=own_key)
+tree_head_signature = None
+for signingnode in signingnodes:
+ try:
+ tree_head_signature = create_sth_signature(tree_size, timestamp,
+ root_hash, "https://%s/" % signingnode["address"], key=own_key)
+ break
+ except urllib2.URLError, e:
+ print e
+if tree_head_signature == None:
+ print >>sys.stderr, "Could not contact any signing nodes"
+ sys.exit(1)
sth = {"tree_size": tree_size, "timestamp": timestamp,
"sha256_root_hash": base64.b64encode(root_hash),
"tree_head_signature": base64.b64encode(tree_head_signature)}
-check_sth_signature(ctbaseurl, sth)
+check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
timing_point(timing, "build sth")
-print timing["deltatimes"]
+if args.timing:
+ print timing["deltatimes"]
print "root hash", base64.b16encode(root_hash)
for frontendnode in frontendnodes:
+ nodeaddress = "https://%s/" % frontendnode["address"]
+ nodename = frontendnode["name"]
timing = timing_point()
- print "distributing for node", frontendnode
- curpos = get_curpos(frontendnode)
+ print "distributing for node", nodename
+ curpos = get_curpos(nodename, nodeaddress)
timing_point(timing, "get curpos")
print "current position", curpos
entries = [base64.b64encode(entry) for entry in logorder[curpos:]]
for chunk in chunks(entries, 1000):
- sendlogresult = sendlog(frontendnode, {"start": curpos, "hashes": chunk})
+ sendlogresult = sendlog(nodename, nodeaddress, {"start": curpos, "hashes": chunk})
if sendlogresult["result"] != "ok":
print "sendlog:", sendlogresult
sys.exit(1)
@@ -248,19 +286,20 @@ for frontendnode in frontendnodes:
sys.stdout.flush()
timing_point(timing, "sendlog")
print "log sent"
- missingentries = get_missingentries(frontendnode)
+ missingentries = get_missingentries(nodename, nodeaddress)
timing_point(timing, "get missing")
print "missing entries:", len(missingentries)
for missingentry in missingentries:
hash = base64.b64decode(missingentry)
- sendentryresult = sendentry(frontendnode, read_chain(hash), hash)
+ sendentryresult = sendentry(nodename, nodeaddress, read_chain(hash), hash)
if sendentryresult["result"] != "ok":
print "send sth:", sendentryresult
sys.exit(1)
timing_point(timing, "send missing")
- sendsthresult = sendsth(frontendnode, sth)
+ sendsthresult = sendsth(nodename, nodeaddress, sth)
if sendsthresult["result"] != "ok":
print "send sth:", sendsthresult
sys.exit(1)
timing_point(timing, "send sth")
- print timing["deltatimes"]
+ if args.timing:
+ print timing["deltatimes"]