summaryrefslogtreecommitdiff
path: root/tools/merge_dist.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_dist.py')
-rwxr-xr-xtools/merge_dist.py134
1 files changed, 82 insertions, 52 deletions
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index 2af1d6c..ded25a1 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -6,14 +6,78 @@
import sys
import json
+import base64
+import requests
from time import sleep
from base64 import b64encode, b64decode
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentry, parse_args, perm
+ publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \
+ frontend_verify_entries
+
+def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = sendlog(nodename, nodeaddress,
+ own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ sleep(10)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ curpos += len(chunk)
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentries(nodename, nodeaddress, own_key,
+ paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ sent_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries:", sendentryresult
+ sys.exit(1)
+ sent_entries += len(missingentry_hashes)
+ print >>sys.stderr, sent_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentries(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
def merge_dist(args, localconfig, frontendnodes, timestamp):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -53,62 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
print >>sys.stderr, "current position", curpos
sys.stderr.flush()
- entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
- missingentries = get_missingentries(nodename, nodeaddress, own_key,
- paths)
- timing_point(timing, "get missing")
+ assert verifiedsize >= curpos
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- sent_entries = 0
- print >>sys.stderr, "send missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- ehash = b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
- chainsdb.get(ehash), ehash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry:", sendentryresult
- sys.exit(1)
- sent_entries += 1
- if sent_entries % 1000 == 0:
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
+ while verifiedsize < len(logorder):
+ uptopos = min(verifiedsize + maxwindow, len(logorder))
+
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
+
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
+ verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos)
+
print >>sys.stderr, "sending sth to node", nodename
sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
+ publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth)
+ if publishsthresult["result"] != "ok":
+ print >>sys.stderr, "publishsth:", publishsthresult
sys.exit(1)
timing_point(timing, "send sth")