summaryrefslogtreecommitdiff
path: root/tools/merge_backup.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
commit19a2a611a839c0318f58347e2d93943c8e2401a5 (patch)
tree18cd302161a88d4546b39792a4bff6b1ade95c77 /tools/merge_backup.py
parent27e368196ce65e109c027987c706a697356f7bc5 (diff)
WIP
Merge can run as four separate processes, plus a fifth controlling proces 'merge'. Tests are limited to testcase1.py and they're failing because of the test with the dead merge secondary. Tests are also time consuming because they're waiting for 60s each time a merge needs to be verified. This could be improved by peeking at the control files, for example.
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-xtools/merge_backup.py141
1 files changed, 84 insertions, 57 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 4f688c3..723fc7a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -11,13 +11,16 @@ import sys
import base64
import select
import requests
+import errno
+import logging
from time import sleep
+from os import stat
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
if trynumber == 1:
return None
select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
+ logging.info("tries left: %d", trynumber)
continue
return sendlogresult
-
def merge_backup(args, config, localconfig, secondaries):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
+ statusfile = mergedb + "/merge_backup.status"
+ s = Status(statusfile)
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
@@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries):
nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ logging.info("backing up to node %s", nodename)
+ try:
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when getting verified size from %s", nodename)
+ return 1
timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
+ logging.info("verified size %d", verifiedsize)
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "determining end of log:",
+ logging.info("determining end of log")
for chunk in chunks(entries, 100000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
+ logging.error("sendlog result was None")
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
+ s.status("INFO: determining end of log: %d" % verifiedsize)
if verifiedsize > 100000:
verifiedsize -= 100000
else:
verifiedsize = 0
+ logging.info("end of log determined")
timing_point(timing, "checklog")
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log")
for chunk in chunks(entries, 1000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
if sendlogresult == None:
- sys.exit(1)
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sending log: %d" % verifiedsize)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ logging.info("log sent")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("missing entries: %d", len(missingentries))
fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
+ logging.info("fetching missing entries")
with requests.sessions.Session() as session:
for missingentry_chunk in chunks(missingentries, 100):
missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
@@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries):
own_key, paths,
hashes_and_entries, session)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
+ logging.error("sendentry_merge: %s", sendentryresult)
+ return 1
fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: fetching missing entries: %d" % fetched_entries)
timing_point(timing, "send missing")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
+ try:
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when verifying root at %s", nodename)
+ return 1
if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
+ logging.error("verifyroot: %s", verifyrootresult)
+ return 1
secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
+ logging.error("secondary root hash was %s, expected %s",
+ hexencode(secondary_root_hash),
+ hexencode(root_hash))
+ return 1
timing_point(timing, "verifyroot")
setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
- #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata
+ logging.debug("writing to %s: %s", backuppath, backupdata)
write_file(backuppath, backupdata)
if args.timing:
- print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read logorder file up until what's indicated by fetched file and
- build the tree.
+ Wait until 'fetched' exists and read it.
+
+ Read 'logorder' up until what's indicated by 'fetched' and build the
+ tree.
Distribute entries to all secondaries, write tree size and tree head
- to backup.<secondary> files as each secondary is verified to have
+ to 'backup.<secondary>' files as each secondary is verified to have
the entries.
- Sleep some and start over.
+ If `--mergeinterval', wait until 'fetched' is updated and read it
+ and start over from the point where 'logorder' is read.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_backup.lock"
+ fetched_path = mergedb + "/fetched"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_backup.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- paths = localconfig["paths"]
create_ssl_context(cafile=paths["https_cacertfile"])
if len(args.node) == 0:
@@ -187,12 +206,20 @@ def main():
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
+ if args.mergeinterval is None:
+ return merge_backup(args, config, localconfig, nodes)
+
+ fetched_statinfo = waitforfile(fetched_path)
+
while True:
- merge_backup(args, config, localconfig, nodes)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ err = merge_backup(args, config, localconfig, nodes)
+ if err != 0:
+ return err
+ fetched_statinfo_old = fetched_statinfo
+ while fetched_statinfo == fetched_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ fetched_statinfo = stat(fetched_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())