summaryrefslogtreecommitdiff
path: root/tools/merge_backup.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_backup.py')
-rwxr-xr-xtools/merge_backup.py141
1 files changed, 84 insertions, 57 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 4f688c3..723fc7a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -11,13 +11,16 @@ import sys
import base64
import select
import requests
+import errno
+import logging
from time import sleep
+from os import stat
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
if trynumber == 1:
return None
select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
+ logging.info("tries left: %d", trynumber)
continue
return sendlogresult
-
def merge_backup(args, config, localconfig, secondaries):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
+ statusfile = mergedb + "/merge_backup.status"
+ s = Status(statusfile)
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
@@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries):
nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ logging.info("backing up to node %s", nodename)
+ try:
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when getting verified size from %s", nodename)
+ return 1
timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
+ logging.info("verified size %d", verifiedsize)
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "determining end of log:",
+ logging.info("determining end of log")
for chunk in chunks(entries, 100000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
+ logging.error("sendlog result was None")
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
+ s.status("INFO: determining end of log: %d" % verifiedsize)
if verifiedsize > 100000:
verifiedsize -= 100000
else:
verifiedsize = 0
+ logging.info("end of log determined")
timing_point(timing, "checklog")
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log")
for chunk in chunks(entries, 1000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
if sendlogresult == None:
- sys.exit(1)
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sending log: %d" % verifiedsize)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ logging.info("log sent")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("missing entries: %d", len(missingentries))
fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
+ logging.info("fetching missing entries")
with requests.sessions.Session() as session:
for missingentry_chunk in chunks(missingentries, 100):
missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
@@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries):
own_key, paths,
hashes_and_entries, session)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
+ logging.error("sendentry_merge: %s", sendentryresult)
+ return 1
fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: fetching missing entries: %d" % fetched_entries)
timing_point(timing, "send missing")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
+ try:
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when verifying root at %s", nodename)
+ return 1
if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
+ logging.error("verifyroot: %s", verifyrootresult)
+ return 1
secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
+ logging.error("secondary root hash was %s, expected %s",
+ hexencode(secondary_root_hash),
+ hexencode(root_hash))
+ return 1
timing_point(timing, "verifyroot")
setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
- #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata
+ logging.debug("writing to %s: %s", backuppath, backupdata)
write_file(backuppath, backupdata)
if args.timing:
- print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read logorder file up until what's indicated by fetched file and
- build the tree.
+ Wait until 'fetched' exists and read it.
+
+ Read 'logorder' up until what's indicated by 'fetched' and build the
+ tree.
Distribute entries to all secondaries, write tree size and tree head
- to backup.<secondary> files as each secondary is verified to have
+ to 'backup.<secondary>' files as each secondary is verified to have
the entries.
- Sleep some and start over.
+ If `--mergeinterval', wait until 'fetched' is updated and read it
+ and start over from the point where 'logorder' is read.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_backup.lock"
+ fetched_path = mergedb + "/fetched"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_backup.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- paths = localconfig["paths"]
create_ssl_context(cafile=paths["https_cacertfile"])
if len(args.node) == 0:
@@ -187,12 +206,20 @@ def main():
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
+ if args.mergeinterval is None:
+ return merge_backup(args, config, localconfig, nodes)
+
+ fetched_statinfo = waitforfile(fetched_path)
+
while True:
- merge_backup(args, config, localconfig, nodes)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ err = merge_backup(args, config, localconfig, nodes)
+ if err != 0:
+ return err
+ fetched_statinfo_old = fetched_statinfo
+ while fetched_statinfo == fetched_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ fetched_statinfo = stat(fetched_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())