summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xtools/merge_backup.py100
-rw-r--r--tools/mergetools.py5
2 files changed, 70 insertions, 35 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 123347a..0c283e5 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -7,6 +7,7 @@
import sys
import base64
import select
+import requests
from time import sleep
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
@@ -15,6 +16,21 @@ from mergetools import chunks, backup_sendlog, get_logorder, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
get_nfetched, parse_args
+def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = \
+ backup_sendlog(nodename, nodeaddress, own_key, paths,
+ {"start": verifiedsize, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ return None
+ select.select([], [], [], 10.0)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ return sendlogresult
+
+
def merge_backup(args, config, localconfig, secondaries):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -27,6 +43,7 @@ def merge_backup(args, config, localconfig, secondaries):
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
+ timing_point(timing, "get nfetched")
logorder = get_logorder(logorderfile, nfetched)
tree_size = len(logorder)
timing_point(timing, "get logorder")
@@ -49,21 +66,34 @@ def merge_backup(args, config, localconfig, secondaries):
sys.stderr.flush()
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
+
+ print >>sys.stderr, "determining end of log:",
+ for chunk in chunks(entries, 100000):
+ sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
+ if sendlogresult == None:
+ print >>sys.stderr, "sendlog result was None"
+ sys.exit(1)
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "backup_sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+
+ if verifiedsize > 100000:
+ verifiedsize -= 100000
+ else:
+ verifiedsize = 0
+
+ timing_point(timing, "checklog")
+
+ entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
print >>sys.stderr, "sending log:",
sys.stderr.flush()
for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = \
- backup_sendlog(nodename, nodeaddress, own_key, paths,
- {"start": verifiedsize, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
+ sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
+ if sendlogresult == None:
+ sys.exit(1)
if sendlogresult["result"] != "ok":
print >>sys.stderr, "backup_sendlog:", sendlogresult
sys.exit(1)
@@ -78,28 +108,32 @@ def merge_backup(args, config, localconfig, secondaries):
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes]
- sendentryresult = sendentries_merge(nodename, nodeaddress,
- own_key, paths,
- hashes_and_entries)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
- fetched_entries += 1
- if fetched_entries % 1000 == 0:
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ print >>sys.stderr, "fetching missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, read_chain(chainsdir, hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries_merge(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentry_merge:", sendentryresult
+ sys.exit(1)
+ print >>sys.stderr, fetched_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
tree_size)
diff --git a/tools/mergetools.py b/tools/mergetools.py
index f6e8bd5..ec4fd2a 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -286,13 +286,14 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash):
def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)])
-def sendentries_merge(node, baseurl, own_key, paths, entries):
+def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
try:
json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
result = http_request(
baseurl + "plop/v1/merge/sendentry",
json.dumps(json_entries),
- key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ key=own_key, verifynode=node, publickeydir=paths["publickeys"],
+ session=session)
return json.loads(result)
except requests.exceptions.HTTPError, e:
print >>sys.stderr, "ERROR: sendentry_merge", e.response