summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2015-06-18 09:20:14 +0100
committerMagnus Ahltorp <map@kth.se>2015-06-18 09:20:14 +0100
commit88c0aba850f0a79ecf92070f79c6dd3e95b8cc87 (patch)
treefe1228e0e518b379321cd1056aefd696e1f56c4d
parentb8c86cf28520b7125aeda20adeee27f3a036055e (diff)
Preliminary merge secondary support. Change merge db to lowercase.
-rw-r--r--Makefile13
-rw-r--r--test/catlfish-test-local-merge-2.cfg18
-rw-r--r--test/catlfish-test.cfg5
-rwxr-xr-xtools/compileconfig.py58
-rwxr-xr-xtools/merge.py177
-rw-r--r--tools/mergetools.py14
6 files changed, 240 insertions, 45 deletions
diff --git a/Makefile b/Makefile
index 0657b43..b051988 100644
--- a/Makefile
+++ b/Makefile
@@ -28,14 +28,19 @@ tests-prepare:
mkdir $(INSTDIR)/tests/mergedb
mkdir $(INSTDIR)/tests/mergedb/chains
touch $(INSTDIR)/tests/mergedb/logorder
+ mkdir $(INSTDIR)/tests/mergedb-secondary
+ mkdir $(INSTDIR)/tests/mergedb-secondary/chains
+ touch $(INSTDIR)/tests/mergedb-secondary/logorder
+ printf 0 > $(INSTDIR)/tests/mergedb-secondary/verifiedsize
mkdir $(INSTDIR)/tests/known_roots
cp tools/testcerts/roots/* $(INSTDIR)/tests/known_roots
@for machine in $(MACHINES); do \
- (cd $(INSTDIR); ../tools/compileconfig.py --config ../test/catlfish-test.cfg --localconfig ../test/catlfish-test-local-$$machine.cfg) ; \
- mkdir -p $(INSTDIR)/tests/machine/machine-$$machine/db ; \
- touch $(INSTDIR)/tests/machine/machine-$$machine/db/index ; \
+ (cd $(INSTDIR); ../tools/compileconfig.py --config ../test/catlfish-test.cfg --localconfig ../test/catlfish-test-local-$$machine.cfg) && \
+ mkdir -p $(INSTDIR)/tests/machine/machine-$$machine/db && \
+ touch $(INSTDIR)/tests/machine/machine-$$machine/db/index && \
touch $(INSTDIR)/tests/machine/machine-$$machine/db/newentries ; \
done
+ (cd $(INSTDIR); ../tools/compileconfig.py --config ../test/catlfish-test.cfg --localconfig ../test/catlfish-test-local-merge-2.cfg)
(cd $(INSTDIR); ../tools/compileconfig.py --config ../test/catlfish-test.cfg --localconfig ../test/catlfish-test-local-signing.cfg)
mkdir $(INSTDIR)/tests/privatekeys
mkdir $(INSTDIR)/tests/publickeys
@@ -46,6 +51,8 @@ tests-prepare:
done
(cd $(INSTDIR)/tests/privatekeys ; ../../../tools/create-key.sh merge-1)
mv $(INSTDIR)/tests/privatekeys/merge-1.pem $(INSTDIR)/tests/publickeys/
+ (cd $(INSTDIR)/tests/privatekeys ; ../../../tools/create-key.sh merge-2)
+ mv $(INSTDIR)/tests/privatekeys/merge-2.pem $(INSTDIR)/tests/publickeys/
-test -x $(SOFTHSM) && $(SOFTHSM) --init-token --slot=0 --label=mylabel --so-pin=ffff --pin=ffff
-test -x $(SOFTHSM) && $(SOFTHSM) --import $(INSTDIR)/tests/keys/logkey-private.pkcs8 --slot 0 --label mylabel --pin ffff --id 00
rm -f $(INSTDIR)/cur-sth.json
diff --git a/test/catlfish-test-local-merge-2.cfg b/test/catlfish-test-local-merge-2.cfg
new file mode 100644
index 0000000..28c4eda
--- /dev/null
+++ b/test/catlfish-test-local-merge-2.cfg
@@ -0,0 +1,18 @@
+localnodes:
+ - merge-2
+
+addresses:
+ merge-2: 127.0.0.1:8181
+
+nodename: merge-2
+
+paths:
+ configdir: .
+ knownroots: tests/known_roots
+ mergedb: tests/mergedb-secondary
+ https_certfile: tests/httpscert/httpscert-1.pem
+ https_keyfile: tests/httpscert/httpskey-1.pem
+ https_cacertfile: tests/httpsca/demoCA/cacert.pem
+ publickeys: tests/publickeys
+ logpublickey: tests/keys/logkey.pem
+ privatekeys: tests/privatekeys
diff --git a/test/catlfish-test.cfg b/test/catlfish-test.cfg
index 6cb3bed..77539b6 100644
--- a/test/catlfish-test.cfg
+++ b/test/catlfish-test.cfg
@@ -15,6 +15,11 @@ signingnodes:
mergenodes:
- name: merge-1
+ address: localhost:8180
+ - name: merge-2
+ address: localhost:8181
+
+primarymergenode: merge-1
storage-quorum-size: 1
diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index c48ba66..0e4a839 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -67,7 +67,7 @@ def parse_address(address):
def get_node_config(nodename, config):
nodetype = None
nodeconfig = None
- for t in ["frontendnodes", "storagenodes", "signingnodes"]:
+ for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]:
for node in config[t]:
if node["name"] == nodename:
nodetype = t
@@ -106,16 +106,31 @@ def gen_http_servers(nodetype, nodeconfig, bind_address, bind_publicaddress, bin
elif nodetype == "signingnodes":
return ([],
[(Symbol("signing_https_api"), host, port, Symbol("signing"))])
+ elif nodetype == "mergenodes":
+ return ([],
+ [(Symbol("frontend_https_api"), host, port, Symbol("frontend"))])
+ else:
+ print >>sys.stderr, "unknown nodetype", nodetype
+ sys.exit(1)
-def allowed_clients_frontend(mergenodenames):
+def allowed_clients_frontend(mergenodenames, primarymergenode):
return [
("/ct/frontend/sendentry", mergenodenames),
("/ct/frontend/sendlog", mergenodenames),
- ("/ct/frontend/sendsth", mergenodenames),
+ ("/ct/frontend/sendsth", [primarymergenode]),
("/ct/frontend/currentposition", mergenodenames),
("/ct/frontend/missingentries", mergenodenames),
]
+def allowed_clients_mergesecondary(primarymergenode):
+ return [
+ ("/catlfish/merge/sendentry", [primarymergenode]),
+ ("/catlfish/merge/sendlog", [primarymergenode]),
+ ("/catlfish/merge/verifyroot", [primarymergenode]),
+ ("/catlfish/merge/verifiedsize", [primarymergenode]),
+ ("/catlfish/merge/missingentries", [primarymergenode]),
+ ]
+
def allowed_clients_public():
noauth = Symbol("noauth")
return [
@@ -129,10 +144,10 @@ def allowed_clients_public():
("/ct/v1/get-roots", noauth),
]
-def allowed_clients_signing(frontendnodenames, mergenodenames):
+def allowed_clients_signing(frontendnodenames, primarymergenode):
return [
("/ct/signing/sct", frontendnodenames),
- ("/ct/signing/sth", mergenodenames),
+ ("/ct/signing/sth", [primarymergenode]),
]
def allowed_clients_storage(frontendnodenames, mergenodenames):
@@ -182,11 +197,12 @@ def gen_config(nodename, config, localconfig):
catlfishconfig = []
plopconfig = []
- if nodetype == "frontendnodes":
+ if nodetype in ("frontendnodes", "mergenodes"):
catlfishconfig.append((Symbol("known_roots_path"), localconfig["paths"]["knownroots"]))
+ if nodetype == "frontendnodes":
if "sctcaching" in options:
catlfishconfig.append((Symbol("sctcache_root_path"), paths["db"] + "sctcache/"))
- if localconfig["ratelimits"]:
+ if "ratelimits" in localconfig:
ratelimits = map(parse_ratelimit, localconfig["ratelimits"].items())
catlfishconfig.append((Symbol("ratelimits"), ratelimits))
@@ -231,13 +247,23 @@ def gen_config(nodename, config, localconfig):
(Symbol("sth_path"), paths["db"] + "sth"),
(Symbol("entryhash_from_entry"),
(Symbol("catlfish"), Symbol("entryhash_from_entry"))),
+ ]
+ if nodetype in ("frontendnodes", "mergenodes"):
+ plopconfig += [
(Symbol("verify_entry"),
(Symbol("catlfish"), Symbol("verify_entry"))),
]
+ if nodetype == "mergenodes":
+ plopconfig += [
+ (Symbol("verifiedsize_path"), paths["mergedb"] + "/verifiedsize"),
+ (Symbol("index_path"), paths["mergedb"] + "/logorder"),
+ (Symbol("entry_root_path"), paths["mergedb"] + "/chains/"),
+ ]
signingnodes = config["signingnodes"]
signingnodeaddresses = ["https://%s/ct/signing/" % node["address"] for node in config["signingnodes"]]
mergenodenames = [node["name"] for node in config["mergenodes"]]
+ primarymergenode = config["primarymergenode"]
storagenodeaddresses = ["https://%s/ct/storage/" % node["address"] for node in config["storagenodes"]]
frontendnodenames = [node["name"] for node in config["frontendnodes"]]
@@ -249,15 +275,21 @@ def gen_config(nodename, config, localconfig):
plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses))
plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"]))
services = [Symbol("ht")]
- allowed_clients += allowed_clients_frontend(mergenodenames)
+ allowed_clients += allowed_clients_frontend(mergenodenames, primarymergenode)
allowed_clients += allowed_clients_public()
allowed_servers += allowed_servers_frontend([node["name"] for node in signingnodes], storagenodenames)
elif nodetype == "storagenodes":
allowed_clients += allowed_clients_storage(frontendnodenames, mergenodenames)
services = []
elif nodetype == "signingnodes":
- allowed_clients += allowed_clients_signing(frontendnodenames, mergenodenames)
+ allowed_clients += allowed_clients_signing(frontendnodenames, primarymergenode)
services = [Symbol("sign")]
+ elif nodetype == "mergenodes":
+ storagenodenames = [node["name"] for node in config["storagenodes"]]
+ plopconfig.append((Symbol("storage_nodes"), storagenodeaddresses))
+ plopconfig.append((Symbol("storage_nodes_quorum"), config["storage-quorum-size"]))
+ services = [Symbol("ht")]
+ allowed_clients += allowed_clients_mergesecondary(primarymergenode)
plopconfig += [
(Symbol("publickey_path"), paths["publickeys"]),
@@ -299,15 +331,17 @@ def gen_testmakefile(config, testmakefile, machines):
configfile = open(testmakefile, "w")
frontendnodenames = [node["name"] for node in config["frontendnodes"]]
storagenodenames = [node["name"] for node in config["storagenodes"]]
- signingnodename = [node["name"] for node in config["signingnodes"]]
+ signingnodenames = [node["name"] for node in config["signingnodes"]]
+ mergenodenames = [node["name"] for node in config["mergenodes"]]
frontendnodeaddresses = [node["publicaddress"] for node in config["frontendnodes"]]
storagenodeaddresses = [node["address"] for node in config["storagenodes"]]
signingnodeaddresses = [node["address"] for node in config["signingnodes"]]
+ mergenodeaddresses = [node["address"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]]
- print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodename)
+ print >>configfile, "NODES=" + " ".join(frontendnodenames+storagenodenames+signingnodenames+mergenodenames)
print >>configfile, "MACHINES=" + " ".join([str(e) for e in range(1, machines+1)])
- print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses)
+ print >>configfile, "TESTURLS=" + " ".join(frontendnodeaddresses+storagenodeaddresses+signingnodeaddresses+mergenodeaddresses)
print >>configfile, "BASEURL=" + config["baseurl"]
configfile.close()
diff --git a/tools/merge.py b/tools/merge.py
index 9904b84..34d1697 100755
--- a/tools/merge.py
+++ b/tools/merge.py
@@ -38,7 +38,7 @@ localconfig = yaml.load(open(args.localconfig))
ctbaseurl = config["baseurl"]
frontendnodes = config["frontendnodes"]
storagenodes = config["storagenodes"]
-secondaries = localconfig.get("secondary", [])
+secondaries = config.get("mergenodes", [])
paths = localconfig["paths"]
mergedb = paths["mergedb"]
@@ -54,8 +54,11 @@ logpublickey = get_public_key_from_file(paths["logpublickey"])
hashed_dir = True
+def hexencode(key):
+ return base64.b16encode(key).lower()
+
def write_chain(key, value):
- filename = base64.b16encode(key)
+ filename = hexencode(key)
if hashed_dir:
path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
try:
@@ -70,7 +73,7 @@ def write_chain(key, value):
def add_to_logorder(key):
f = open(logorderfile, "a")
- f.write(base64.b16encode(key) + "\n")
+ f.write(hexencode(key) + "\n")
f.close()
def fsync_logorder():
@@ -118,6 +121,20 @@ def get_curpos(node, baseurl):
print >>sys.stderr, "ERROR: currentposition", e.read()
sys.exit(1)
+def get_verifiedsize(node, baseurl):
+ try:
+ result = http_request(baseurl + "catlfish/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"size"]
+ print >>sys.stderr, "ERROR: verifiedsize", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifiedsize", e.read()
+ sys.exit(1)
+
+
+
def sendlog(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendlog",
@@ -136,6 +153,24 @@ def sendlog(node, baseurl, submission):
sys.stderr.flush()
raise e
+def backup_sendlog(node, baseurl, submission):
+ try:
+ result = http_request(baseurl + "catlfish/merge/sendlog",
+ json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendlog", e.read()
+ sys.stderr.flush()
+ return None
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def sendentry(node, baseurl, entry, hash):
try:
result = http_request(baseurl + "ct/frontend/sendentry",
@@ -154,6 +189,24 @@ def sendentry(node, baseurl, entry, hash):
sys.stderr.flush()
raise e
+def sendentry_merge(node, baseurl, entry, hash):
+ try:
+ result = http_request(baseurl + "catlfish/merge/sendentry",
+ json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)}), key=own_key,
+ verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: sendentry", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, hash
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def sendsth(node, baseurl, submission):
try:
result = http_request(baseurl + "ct/frontend/sendsth",
@@ -171,6 +224,23 @@ def sendsth(node, baseurl, submission):
sys.stderr.flush()
raise e
+def verifyroot(node, baseurl, treesize):
+ try:
+ result = http_request(baseurl + "catlfish/merge/verifyroot",
+ json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ return json.loads(result)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: verifyroot", e.read()
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, submission
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, result
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def get_missingentries(node, baseurl):
try:
result = http_request(baseurl + "ct/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
@@ -183,6 +253,18 @@ def get_missingentries(node, baseurl):
print >>sys.stderr, "ERROR: missingentries", e.read()
sys.exit(1)
+def get_missingentriesforbackup(node, baseurl):
+ try:
+ result = http_request(baseurl + "catlfish/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"entries"]
+ print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result
+ sys.exit(1)
+ except urllib2.HTTPError, e:
+ print >>sys.stderr, "ERROR: missingentriesforbackup", e.read()
+ sys.exit(1)
+
def chunks(l, n):
return [l[i:i+n] for i in range(0, len(l), n)]
@@ -257,30 +339,75 @@ root_hash = tree[-1][0]
timestamp = int(time.time() * 1000)
for secondary in secondaries:
- remotehost = secondary["host"]
- remotedir = remotehost + ":" + secondary["mergedir"]
- localdir = mergedb
- if localdir[:-1] != '/':
- localdir = localdir + "/"
-
- print >>sys.stderr, "copying database to secondary:", remotehost
+ if secondary["name"] == config["primarymergenode"]:
+ continue
+ nodeaddress = "https://%s/" % secondary["address"]
+ nodename = secondary["name"]
+ timing = timing_point()
+ print >>sys.stderr, "backing up to node", nodename
sys.stderr.flush()
- rsyncstatus = subprocess.call(["rsync", "-r", "--append", "--rsh=ssh", localdir, remotedir])
- if rsyncstatus:
- print >>sys.stderr, "rsync failed:", rsyncstatus
- sys.exit(1)
-
- print >>sys.stderr, "verifying database at secondary:", remotehost
+ verifiedsize = get_verifiedsize(nodename, nodeaddress)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
sys.stderr.flush()
- verifysecondary = subprocess.Popen(["ssh", remotehost, secondary["verifycommand"], secondary["mergedir"]],
- stdout=subprocess.PIPE)
-
- (verifysecondaryresult, _) = verifysecondary.communicate()
-
- if root_hash != base64.b16decode(verifysecondaryresult.strip()):
- print >>sys.stderr, "secondary root hash was", verifysecondaryresult.strip()
- print >>sys.stderr, " expected", base64.b16encode(root_hash)
+ entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = backup_sendlog(nodename, nodeaddress, {"start": verifiedsize, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ select.select([], [], [], 10.0)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+ print >>sys.stderr
+ timing_point(timing, "sendlog")
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress)
+ timing_point(timing, "get missing")
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+ fetched_entries = 0
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
+ for missingentry in missingentries:
+ hash = base64.b64decode(missingentry)
+ sendentryresult = sendentry_merge(nodename, nodeaddress, read_chain(chainsdir, hash), hash)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "send sth:", sendentryresult
+ sys.exit(1)
+ fetched_entries += 1
+ if added_entries % 1000 == 0:
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+ verifyrootresult = verifyroot(nodename, nodeaddress, tree_size)
+ if verifyrootresult["result"] != "ok":
+ print >>sys.stderr, "verifyroot:", verifyrootresult
+ sys.exit(1)
+ secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ if root_hash != secondary_root_hash:
+ print >>sys.stderr, "secondary root hash was", hexencode(secondary_root_hash)
+ print >>sys.stderr, " expected", hexencode(root_hash)
sys.exit(1)
+ timing_point(timing, "verifyroot")
+ # XXX: set verifiedsize
+ if args.timing:
+ print >>sys.stderr, timing["deltatimes"]
+ sys.stderr.flush()
tree_head_signature = None
for signingnode in signingnodes:
@@ -307,7 +434,7 @@ if args.timing:
print >>sys.stderr, timing["deltatimes"]
sys.stderr.flush()
-print base64.b16encode(root_hash)
+print hexencode(root_hash)
sys.stdout.flush()
for frontendnode in frontendnodes:
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 5cb36c4..9e84038 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -6,19 +6,23 @@ import struct
from certtools import get_leaf_hash
def parselogrow(row):
- return base64.b16decode(row)
+ return base64.b16decode(row, casefold=True)
def get_logorder(filename):
f = open(filename, "r")
return [parselogrow(row.rstrip()) for row in f]
-def read_chain(chainsdir, key):
- filename = base64.b16encode(key)
+def read_chain_open(chainsdir, filename):
path = chainsdir + "/" + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6]
+ f = open(path + "/" + filename, "r")
+ return f
+
+def read_chain(chainsdir, key):
+ filename = base64.b16encode(key).upper()
try:
- f = open(path + "/" + filename, "r")
+ f = read_chain_open(chainsdir, filename)
except IOError, e:
- f = open(chainsdir + "/" + filename, "r")
+ f = read_chain_open(chainsdir, filename.lower())
value = f.read()
f.close()
return value