diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-25 00:47:13 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-25 00:47:13 +0100 |
commit | 49b6e85963ef55fb6cfa1876fe825730f95658bc (patch) | |
tree | b3f5c605d64c984a092a34c32b34305d4a3d687a /tools | |
parent | 19a2a611a839c0318f58347e2d93943c8e2401a5 (diff) |
Parallelise merge_fetch.py.
NOTE: Not supporting permdb yet!
We're still not passing the tests because merge_backup.py exits when
the secondary merge disappears.
Diffstat (limited to 'tools')
-rwxr-xr-x | tools/merge | 23 | ||||
-rwxr-xr-x | tools/merge_backup.py | 2 | ||||
-rwxr-xr-x | tools/merge_dist.py | 2 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 206 | ||||
-rw-r--r-- | tools/mergetools.py | 45 | ||||
-rwxr-xr-x | tools/testcase1.py | 2 |
6 files changed, 254 insertions, 26 deletions
diff --git a/tools/merge b/tools/merge index 0d1bf0e..4ba0438 100755 --- a/tools/merge +++ b/tools/merge @@ -5,7 +5,7 @@ import os import sys import signal from time import sleep -from mergetools import parse_args +from mergetools import parse_args, terminate_child_procs from multiprocessing import Process import merge_fetch, merge_backup, merge_sth, merge_dist @@ -21,27 +21,38 @@ def run_once(): return ret def term(signal, arg): + terminate_child_procs() sys.exit(1) def run_continuously(pidfilepath): """Run continuously.""" - parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist) + parts = (('fetch', merge_fetch), + ('backup', merge_backup), + ('sth', merge_sth), + ('dist', merge_dist)) procs = {} for part, mod in parts: procs[part] = Process(target=mod.main, name='merge_%s' % part) - procs[part].daemon = True procs[part].start() + #print >>sys.stderr, "DEBUG:", part, "started, pid", procs[part].pid if pidfilepath: open(pidfilepath, 'w').write(str(os.getpid()) + '\n') signal.signal(signal.SIGTERM, term) - while True: + retval = 0 + keep_going = True + while keep_going: + sleep(1) for name, p in procs.items(): if not p.is_alive(): print >>sys.stderr, "\nERROR:", name, "process is gone; exiting" - return 1 - sleep(1) + retval = 1 # Fail. + keep_going = False + break + + terminate_child_procs() + return retval def main(): """Main""" diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 723fc7a..32bae53 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -217,7 +217,7 @@ def main(): return err fetched_statinfo_old = fetched_statinfo while fetched_statinfo == fetched_statinfo_old: - sleep(args.mergeinterval / 30) + sleep(max(3, args.mergeinterval / 10)) fetched_statinfo = stat(fetched_path) return 0 diff --git a/tools/merge_dist.py b/tools/merge_dist.py index 7a13bfa..b018f63 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -167,7 +167,7 @@ def main(): return 1 sth_statinfo_old = sth_statinfo while sth_statinfo == sth_statinfo_old: - sleep(args.mergeinterval / 30) + sleep(max(3, args.mergeinterval / 10)) sth_statinfo = stat(sth_path) return 0 diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 7973fae..6accca4 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,14 +10,18 @@ import sys import struct import subprocess +import signal import logging from time import sleep +from multiprocessing import Process, Pipe +from random import Random from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ - hexencode, parse_args, perm, flock_ex_or_fail, Status + hexencode, hexdecode, parse_args, perm, flock_ex_or_fail, Status, \ + terminate_child_procs from certtools import timing_point, write_file, create_ssl_context -def merge_fetch(args, config, localconfig): +def merge_fetch_sequenced(args, config, localconfig): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] @@ -96,14 +100,191 @@ def merge_fetch(args, config, localconfig): else: return (tree_size, logorder[tree_size-1]) +def merge_fetch_worker(args, localconfig, storagenode, pipe): + paths = localconfig["paths"] + mergedb = paths["mergedb"] + chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") + own_key = (localconfig["nodename"], + "%s/%s-private.pem" % (paths["privatekeys"], + localconfig["nodename"])) + to_fetch = set() + timeout = max(3, args.mergeinterval / 10) + while True: + if pipe.poll(timeout): + msg = pipe.recv().split() + if len(msg) < 2: + continue + cmd = msg[0] + ehash = msg[1] + if cmd == 'FETCH': + to_fetch.add(hexdecode(ehash)) + else: + logging.warning("%s: unknown command from parent: %s", + storagenode["name"], msg) + + if len(to_fetch) > 0: + logging.info("%s: fetching %d entries", storagenode["name"], + len(to_fetch)) + # TODO: Consider running the verifycert process longer. + verifycert = subprocess.Popen( + [paths["verifycert_bin"], paths["known_roots"]], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + # Chunking for letting other workers take the chainsdb lock. + for chunk in chunks(list(to_fetch), 100): + chainsdb.lock_ex() + entries = get_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths, chunk) + for ehash in chunk: + entry = entries[ehash] + verify_entry(verifycert, entry, ehash) + chainsdb.add(ehash, entry) + chainsdb.commit() + for ehash in chunk: + pipe.send('FETCHED %s' % hexencode(ehash)) + to_fetch.remove(ehash) + verifycert.communicate(struct.pack("I", 0)) + + new_entries = get_new_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths) + if len(new_entries) > 0: + logging.info("%s: got %d new entries", storagenode["name"], + len(new_entries)) + for ehash in new_entries: + pipe.send('NEWENTRY %s' % hexencode(ehash)) + +def term(signal, arg): + terminate_child_procs() + sys.exit(1) + +def newworker(name, args): + my_conn, child_conn = Pipe() + p = Process(target=merge_fetch_worker, + args=tuple(args + [child_conn]), + name='merge_fetch_%s' % name) + p.daemon = True + p.start() + logging.debug("%s started, pid %d", name, p.pid) + return (name, my_conn, p) + +def merge_fetch_parallel(args, config, localconfig): + paths = localconfig["paths"] + storagenodes = config["storagenodes"] + mergedb = paths["mergedb"] + logorderfile = mergedb + "/logorder" + currentsizefile = mergedb + "/fetched" + + rand = Random() + signal.signal(signal.SIGTERM, term) + + procs = {} + for storagenode in storagenodes: + name = storagenode['name'] + procs[name] = newworker(name, [args, localconfig, storagenode]) + + logorder = get_logorder(logorderfile) # List of entries in log. + entries_in_log = set(logorder) # Set of entries in log. + entries_to_fetch = set() # Set of entries to fetch. + fetch = {} # Dict with entries to fetch. + while procs: + assert(not entries_to_fetch) + # Poll worker processes. + for name, pipe, p in procs.values(): + if not p.is_alive(): + logging.warning("%s is gone, restarting", name) + procs[name] = newworker(name, [args, localconfig, + storagenodes[name]]) + continue + logging.info("polling %s", name) + if pipe.poll(1): + msg = pipe.recv().split() + if len(msg) < 2: + logging.warning("unknown command from %s: %s", name, msg) + continue + cmd = msg[0] + ehash = msg[1] + if cmd == 'NEWENTRY': + logging.info("NEWENTRY at %s: %s", name, ehash) + entries_to_fetch.add(ehash) + logging.debug("entries_to_fetch: %s", entries_to_fetch) + elif cmd == 'FETCHED': + logging.info("FETCHED from %s: %s", name, ehash) + logorder.append(ehash) + add_to_logorder(logorderfile, hexdecode(ehash)) + fsync_logorder(logorderfile) + entries_in_log.add(ehash) + if ehash in entries_to_fetch: + entries_to_fetch.remove(ehash) + del fetch[ehash] + else: + logging.warning("unknown command from %s: %s", name, msg) + + # Ask workers to fetch entries. + logging.debug("nof entries to fetch including entries in log: %d", + len(entries_to_fetch)) + entries_to_fetch -= entries_in_log + logging.info("entries to fetch: %d", len(entries_to_fetch)) + # Add entries in entries_to_fetch as keys in dictionary fetch, + # values being a list of storage nodes, in randomised order. + for e in entries_to_fetch: + if not e in fetch: + l = procs.values() + rand.shuffle(l) + fetch[e] = l + # For each entry to fetch, treat its list of nodes as a + # circular list and ask the one in the front to fetch the + # entry. + while entries_to_fetch: + ehash = entries_to_fetch.pop() + nodes = fetch[ehash] + node = nodes.pop(0) + fetch[ehash] = nodes.append(node) + name, pipe, p = node + logging.info("asking %s to FETCH %s", name, ehash) + pipe.send("FETCH %s" % ehash) + + # Update the 'fetched' file. + logsize = len(logorder) + if logsize == 0: + last_hash = '' + else: + last_hash = logorder[logsize - 1] + logging.info("updating 'fetched' file: %d %s", logsize-1, last_hash) + currentsize = {"index": logsize - 1, "hash": last_hash} + logging.debug("writing to %s: %s", currentsizefile, currentsize) + write_file(currentsizefile, currentsize) + + return 0 + def main(): """ - Fetch new entries from all storage nodes, in sequence. + If no `--mergeinterval': + Fetch new entries from all storage nodes, in sequence, updating + the 'logorder' file and the 'chains' database. - Indicate the current position by writing the hash and its 'logorder' - index, 0-based, to 'fetched'. + Write 'fetched' to reflect how far in 'logorder' we've succesfully + fetched and verified. - Sleep some and start over, or exit if there's no `--mergeinterval'. + If `--mergeinterval': + Start one process per storage node, read their stdout for learning + about two things: (i) new entries ready for fetching ("NEWENTRY") and + (ii) new entries being succesfully fetched ("FETCHED"). + + Write to their stdin ("FETCH") when they should fetch another entry. + Update 'logorder' and the 'chains' database as we see new FETCHED + messages. + + Write 'fetched' to reflect how far in 'logorder' we've succesfully + fetched and verified. + + Keep doing this forever. + + NOTE: The point of having 'fetched' is that it can be atomically + written while 'logorder' cannot (unless we're fine with rewriting it + for each and every update, which we're not). + + TODO: Deduplicate some code. """ args, config, localconfig = parse_args() paths = localconfig["paths"] @@ -124,17 +305,14 @@ def main(): create_ssl_context(cafile=paths["https_cacertfile"]) - while True: - logsize, last_hash = merge_fetch(args, config, localconfig) + if args.mergeinterval: + return merge_fetch_parallel(args, config, localconfig) + else: + logsize, last_hash = merge_fetch_sequenced(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) - if args.mergeinterval is None: - break - logging.debug("sleeping %d seconds", args.mergeinterval / 10) - sleep(args.mergeinterval / 10) - - return 0 + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/mergetools.py b/tools/mergetools.py index 5471fe4..9a4f6b2 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -13,6 +13,7 @@ import requests import time import fcntl import errno +import multiprocessing import logging try: import permdb @@ -289,7 +290,7 @@ def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: - json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] + json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries] result = http_request( baseurl + "plop/v1/merge/sendentry", json.dumps(json_entries), @@ -301,7 +302,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, hash + print >>sys.stderr, ehash print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" @@ -435,6 +436,10 @@ def waitforfile(path): return statinfo def flock_ex_or_fail(path): + """ + To be used at most once per process. Will otherwise leak file + descriptors. + """ try: fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) except IOError, e: @@ -443,6 +448,30 @@ def flock_ex_or_fail(path): return False return True +def flock_ex_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for exclusive lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_EX) + logging.debug("taken exclusive lock on %s", fd) + return fd + +def flock_sh_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for shared lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_SH) + logging.debug("taken shared lock on %s", fd) + return fd + +def flock_release(fd): + logging.debug("releasing lock on %s", fd) + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + +def terminate_child_procs(): + for p in multiprocessing.active_children(): + #print >>sys.stderr, "DEBUG: terminating pid", p.pid + p.terminate() + class Status: def __init__(self, path): self.path = path @@ -452,12 +481,17 @@ class Status: class FileDB: def __init__(self, path): self.path = path + self.lockfile = None def get(self, key): return read_chain(self.path, key) def add(self, key, value): return write_chain(key, value, self.path) + def lock_sh(self): + self.lockfile = flock_sh_wait(self.path + "/.lock") + def lock_ex(self): + self.lockfile = flock_ex_wait(self.path + "/.lock") def commit(self): - pass + flock_release(self.lockfile) class PermDB: def __init__(self, path): @@ -466,5 +500,10 @@ class PermDB: return permdb.getvalue(self.permdbobj, key) def add(self, key, value): return permdb.addvalue(self.permdbobj, key, value) + def lock_sh(self): + assert False # NYI + def lock_ex(self): + assert False # NYI def commit(self): permdb.committree(self.permdbobj) + diff --git a/tools/testcase1.py b/tools/testcase1.py index 98b9179..885c24d 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -156,7 +156,7 @@ def merge(): return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) else: - n = 60 + n = 40 print "testcase1.py: sleeping", n, "seconds waiting for merge" sleep(n) return 0 |