summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorMagnus Ahltorp <map@kth.se>2016-11-02 13:10:37 +0100
committerMagnus Ahltorp <map@kth.se>2016-11-02 13:10:37 +0100
commitb48c1689e36bdcc65a34b4ab12763478b072a716 (patch)
treef812f21492567b97aecba2c80dc0ce97a8e6eb6a /tools
parent6659a1c08dda0d6ec20f945e135b23b544db55a4 (diff)
Change algorithm for merge backup and merge dist
Diffstat (limited to 'tools')
-rwxr-xr-xtools/compileconfig.py3
-rwxr-xr-xtools/merge_backup.py166
-rwxr-xr-xtools/merge_dist.py140
-rw-r--r--tools/mergetools.py25
4 files changed, 185 insertions, 149 deletions
diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index 7ba2fac..1fa352e 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -129,7 +129,8 @@ def allowed_clients_frontend(mergenodenames, primarymergenode):
return [
("/plop/v1/frontend/sendentry", mergenodenames),
("/plop/v1/frontend/sendlog", mergenodenames),
- ("/plop/v1/frontend/sendsth", [primarymergenode]),
+ ("/plop/v1/frontend/publish-sth", [primarymergenode]),
+ ("/plop/v1/frontend/verify-entries", [primarymergenode]),
("/plop/v1/frontend/currentposition", mergenodenames),
("/plop/v1/frontend/missingentries", mergenodenames),
]
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 05679a1..2c17d90 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -9,6 +9,7 @@ import base64
import select
import requests
from time import sleep
+from base64 import b64encode, b64decode
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
@@ -30,8 +31,78 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
continue
return sendlogresult
+sendlog_discover_chunksize = 100000
+def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
+ if sendlogresult == None:
+ sys.exit(1)
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "backup_sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ fetched_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries_merge(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries_merge:", sendentryresult
+ sys.exit(1)
+ fetched_entries += len(missingentry_hashes)
+ #print >>sys.stderr, fetched_entries,
+ #sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
+def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing):
+ tree = build_merkle_tree(logorder[:tree_size])
+ root_hash = tree[-1][0]
+ timing_point(timing, "build tree")
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ if verifyrootresult["result"] != "ok":
+ print >>sys.stderr, "verifyroot:", verifyrootresult
+ sys.exit(1)
+ secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ if root_hash != secondary_root_hash:
+ print >>sys.stderr, "secondary root hash was", \
+ hexencode(secondary_root_hash)
+ print >>sys.stderr, " expected", hexencode(root_hash)
+ sys.exit(1)
+ timing_point(timing, "verifyroot")
+ return root_hash
+
def merge_backup(args, config, localconfig, secondaries):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -48,10 +119,6 @@ def merge_backup(args, config, localconfig, secondaries):
tree_size = len(logorder)
timing_point(timing, "get logorder")
- tree = build_merkle_tree(logorder)
- root_hash = tree[-1][0]
- timing_point(timing, "build tree")
-
for secondary in secondaries:
if secondary["name"] == config["primarymergenode"]:
continue
@@ -65,92 +132,23 @@ def merge_backup(args, config, localconfig, secondaries):
print >>sys.stderr, "verified size", verifiedsize
sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
-
- print >>sys.stderr, "determining end of log:",
- for chunk in chunks(entries, 100000):
- sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
- if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
-
- if verifiedsize > 100000:
- verifiedsize -= 100000
+ if verifiedsize == tree_size:
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing)
else:
- verifiedsize = 0
+ while verifiedsize < tree_size:
+ uptopos = min(verifiedsize + maxwindow, tree_size)
- timing_point(timing, "checklog")
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
- if sendlogresult == None:
- sys.exit(1)
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
- missingentries = get_missingentriesforbackup(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
-
- while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing)
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- with requests.sessions.Session() as session:
- for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
- sendentryresult = sendentries_merge(nodename, nodeaddress,
- own_key, paths,
- hashes_and_entries, session)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentries_merge:", sendentryresult
- sys.exit(1)
- fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
-
- missingentries = get_missingentriesforbackup(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
-
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
- if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
- secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
- if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
- timing_point(timing, "verifyroot")
+ verifiedsize = uptopos
+ setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize)
- setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index 9d66cfd..ded25a1 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -13,9 +13,71 @@ from base64 import b64encode, b64decode
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentries, parse_args, perm
+ publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \
+ frontend_verify_entries
+
+def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = sendlog(nodename, nodeaddress,
+ own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ sleep(10)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ curpos += len(chunk)
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentries(nodename, nodeaddress, own_key,
+ paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ sent_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries:", sendentryresult
+ sys.exit(1)
+ sent_entries += len(missingentry_hashes)
+ print >>sys.stderr, sent_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentries(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
def merge_dist(args, localconfig, frontendnodes, timestamp):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -55,72 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
print >>sys.stderr, "current position", curpos
sys.stderr.flush()
- entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
-
- missingentries = get_missingentries(nodename, nodeaddress, own_key,
- paths)
- timing_point(timing, "get missing")
+ verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
+ assert verifiedsize >= curpos
+ while verifiedsize < len(logorder):
+ uptopos = min(verifiedsize + maxwindow, len(logorder))
+
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
- while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
-
- sent_entries = 0
- print >>sys.stderr, "sending missing entries",
- sys.stderr.flush()
- with requests.sessions.Session() as session:
- for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
- sendentryresult = sendentries(nodename, nodeaddress,
- own_key, paths,
- hashes_and_entries, session)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentries:", sendentryresult
- sys.exit(1)
- sent_entries += len(missingentry_hashes)
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
-
- missingentries = get_missingentries(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
+ verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos)
+
print >>sys.stderr, "sending sth to node", nodename
sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
+ publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth)
+ if publishsthresult["result"] != "ok":
+ print >>sys.stderr, "publishsth:", publishsthresult
sys.exit(1)
timing_point(timing, "send sth")
diff --git a/tools/mergetools.py b/tools/mergetools.py
index bea09e9..ff3d08c 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -214,6 +214,25 @@ def get_curpos(node, baseurl, own_key, paths):
print >>sys.stderr, "ERROR: currentposition", e.response
sys.exit(1)
+def get_frontend_verifiedsize(node, baseurl, own_key, paths):
+ return frontend_verify_entries(node, baseurl, own_key, paths, 0)
+
+def frontend_verify_entries(node, baseurl, own_key, paths, size):
+ try:
+ arguments = {"verify_to": size}
+ result = http_request(baseurl + "plop/v1/frontend/verify-entries",
+ json.dumps(arguments),
+ key=own_key, verifynode=node,
+ publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"verified"]
+ print >>sys.stderr, "ERROR: verify-entries", parsed_result
+ sys.exit(1)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: verify-entries", e.response
+ sys.exit(1)
+
def get_verifiedsize(node, baseurl, own_key, paths):
try:
result = http_request(baseurl + "plop/v1/merge/verifiedsize",
@@ -319,14 +338,14 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e.request, e.response
sys.exit(1)
-def sendsth(node, baseurl, own_key, paths, submission):
+def publish_sth(node, baseurl, own_key, paths, submission):
try:
- result = http_request(baseurl + "plop/v1/frontend/sendsth",
+ result = http_request(baseurl + "plop/v1/frontend/publish-sth",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.response
+ print >>sys.stderr, "ERROR: publish-sth", e.response
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="