summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-27 23:59:25 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-27 23:59:25 +0100
commit4e1f11749167c7c79a3fc6a0e146487e7cc1022c (patch)
treebe0d3d75718bb4b1dc0bfe6dd0d2da4517779ea5
parentec61afe24f5d1189654b7b751dad47ab640f21dc (diff)
Parallelise merge_backup.
We're still failing the tests when the merge secondary goes away, sometimes.
-rwxr-xr-xtools/merge_backup.py139
1 files changed, 94 insertions, 45 deletions
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index f25b22a..c057b5a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -15,12 +15,14 @@ import logging
from time import sleep
from base64 import b64encode, b64decode
from os import stat
+from multiprocessing import Process, Pipe, active_children
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, \
+ Status, loginit
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -97,12 +99,63 @@ def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timin
timing_point(timing, "verifyroot")
return root_hash
-def merge_backup(args, config, localconfig, secondaries):
+def do_it(backupargs):
+ secondary, localconfig, chainsdb, logorder, s, timing = backupargs
maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
+ nodename = secondary["name"]
+ nodeaddress = "https://%s/" % secondary["address"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
+ tree_size = len(logorder)
+
+ logging.info("backing up to node %s", nodename)
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ logging.info("verified size %d", verifiedsize)
+
+ if verifiedsize == tree_size:
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing)
+ else:
+ while verifiedsize < tree_size:
+ uptopos = min(verifiedsize + maxwindow, tree_size)
+
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s)
+ timing_point(timing, "sendlog")
+
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s)
+
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing)
+
+ verifiedsize = uptopos
+ setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize)
+ return root_hash
+
+def worker(pipe, backupargs):
+ root_hash = do_it(backupargs)
+ pipe.send(root_hash)
+ return 0
+
+def start_worker(backupargs):
+ _, _, _, nodename, _, _, _, _, _, _, _ = backupargs
+ parent_conn, child_conn = Pipe()
+ p = Process(target=worker,
+ args=(child_conn, backupargs),
+ name='backup_%s' % nodename)
+ p.start()
+ return p, parent_conn
+
+def update_backupfile(mergedb, nodename, tree_size, root_hash):
+ backuppath = mergedb + "/verified." + nodename
+ backupdata = {"tree_size": tree_size,
+ "sha256_root_hash": hexencode(root_hash)}
+ logging.debug("writing to %s: %s", backuppath, backupdata)
+ write_file(backuppath, backupdata)
+
+def merge_backup(args, config, localconfig, secondaries):
+ paths = localconfig["paths"]
mergedb = paths["mergedb"]
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
@@ -117,43 +170,43 @@ def merge_backup(args, config, localconfig, secondaries):
tree_size = len(logorder)
timing_point(timing, "get logorder")
+ procs = {}
for secondary in secondaries:
if secondary["name"] == config["primarymergenode"]:
continue
- nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- logging.info("backing up to node %s", nodename)
- verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
- timing_point(timing, "get verified size")
- logging.info("verified size %d", verifiedsize)
- if verifiedsize == tree_size:
- root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing)
+ backupargs = (secondary, localconfig, chainsdb, logorder, s, timing)
+ if args.mergeinterval:
+ pipe_mine, pipe_theirs = Pipe()
+ p = Process(target=worker,
+ args=(pipe_theirs, backupargs),
+ name='backup_%s' % nodename)
+ p.start()
+ procs[p] = (nodename, pipe_mine)
else:
- while verifiedsize < tree_size:
- uptopos = min(verifiedsize + maxwindow, tree_size)
-
- entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
- sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths, s)
- timing_point(timing, "sendlog")
-
- fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing, s)
-
- root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing)
-
- verifiedsize = uptopos
- setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize)
-
- backuppath = mergedb + "/verified." + nodename
- backupdata = {"tree_size": tree_size,
- "sha256_root_hash": hexencode(root_hash)}
- logging.debug("writing to %s: %s", backuppath, backupdata)
- write_file(backuppath, backupdata)
-
- if args.timing:
- logging.debug("timing: merge_backup: %s", timing["deltatimes"])
-
+ root_hash = do_it(backupargs)
+ update_backupfile(mergedb, nodename, tree_size, root_hash)
+
+ if args.mergeinterval:
+ while True:
+ for p in list(procs):
+ if not p.is_alive():
+ p.join()
+ nodename, pipe = procs[p]
+ if p.exitcode != 0:
+ logging.warning("%s failure: %d", nodename, p.exitcode)
+ continue
+ root_hash = pipe.recv()
+ update_backupfile(mergedb, nodename, tree_size, root_hash)
+ del procs[p]
+ if not procs:
+ break
+ sleep(1)
+
+ if args.timing:
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
return 0
def main():
@@ -163,9 +216,9 @@ def main():
Read 'logorder' up until what's indicated by 'fetched' and build the
tree.
- Distribute entries to all secondaries, write tree size and tree head
- to 'backup.<secondary>' files as each secondary is verified to have
- the entries.
+ Distribute entries to all secondaries, in parallel if `--mergeinterval'.
+ Write tree size and tree head to 'backup.<secondary>' files as each
+ secondary is verified to have the entries.
If `--mergeinterval', wait until 'fetched' is updated and read it
and start over from the point where 'logorder' is read.
@@ -190,28 +243,24 @@ def main():
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- create_ssl_context(cafile=paths["https_cacertfile"])
-
if len(args.node) == 0:
nodes = all_secondaries
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
- if args.mergeinterval is None:
- return merge_backup(args, config, localconfig, nodes)
-
+ create_ssl_context(cafile=paths["https_cacertfile"])
fetched_statinfo = waitforfile(fetched_path)
-
+ retval = 0
while True:
- err = merge_backup(args, config, localconfig, nodes)
- if err:
- return err
+ retval = merge_backup(args, config, localconfig, nodes)
+ if retval or not args.mergeinterval:
+ break
fetched_statinfo_old = fetched_statinfo
while fetched_statinfo == fetched_statinfo_old:
sleep(max(3, args.mergeinterval / 10))
fetched_statinfo = stat(fetched_path)
- return 0
+ return retval
if __name__ == '__main__':
sys.exit(main())