summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-25 00:47:13 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-25 00:47:13 +0100
commit49b6e85963ef55fb6cfa1876fe825730f95658bc (patch)
treeb3f5c605d64c984a092a34c32b34305d4a3d687a /tools
parent19a2a611a839c0318f58347e2d93943c8e2401a5 (diff)
Parallelise merge_fetch.py.
NOTE: Not supporting permdb yet! We're still not passing the tests because merge_backup.py exits when the secondary merge disappears.
Diffstat (limited to 'tools')
-rwxr-xr-xtools/merge23
-rwxr-xr-xtools/merge_backup.py2
-rwxr-xr-xtools/merge_dist.py2
-rwxr-xr-xtools/merge_fetch.py206
-rw-r--r--tools/mergetools.py45
-rwxr-xr-xtools/testcase1.py2
6 files changed, 254 insertions, 26 deletions
diff --git a/tools/merge b/tools/merge
index 0d1bf0e..4ba0438 100755
--- a/tools/merge
+++ b/tools/merge
@@ -5,7 +5,7 @@ import os
import sys
import signal
from time import sleep
-from mergetools import parse_args
+from mergetools import parse_args, terminate_child_procs
from multiprocessing import Process
import merge_fetch, merge_backup, merge_sth, merge_dist
@@ -21,27 +21,38 @@ def run_once():
return ret
def term(signal, arg):
+ terminate_child_procs()
sys.exit(1)
def run_continuously(pidfilepath):
"""Run continuously."""
- parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist)
+ parts = (('fetch', merge_fetch),
+ ('backup', merge_backup),
+ ('sth', merge_sth),
+ ('dist', merge_dist))
procs = {}
for part, mod in parts:
procs[part] = Process(target=mod.main, name='merge_%s' % part)
- procs[part].daemon = True
procs[part].start()
+ #print >>sys.stderr, "DEBUG:", part, "started, pid", procs[part].pid
if pidfilepath:
open(pidfilepath, 'w').write(str(os.getpid()) + '\n')
signal.signal(signal.SIGTERM, term)
- while True:
+ retval = 0
+ keep_going = True
+ while keep_going:
+ sleep(1)
for name, p in procs.items():
if not p.is_alive():
print >>sys.stderr, "\nERROR:", name, "process is gone; exiting"
- return 1
- sleep(1)
+ retval = 1 # Fail.
+ keep_going = False
+ break
+
+ terminate_child_procs()
+ return retval
def main():
"""Main"""
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 723fc7a..32bae53 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -217,7 +217,7 @@ def main():
return err
fetched_statinfo_old = fetched_statinfo
while fetched_statinfo == fetched_statinfo_old:
- sleep(args.mergeinterval / 30)
+ sleep(max(3, args.mergeinterval / 10))
fetched_statinfo = stat(fetched_path)
return 0
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index 7a13bfa..b018f63 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -167,7 +167,7 @@ def main():
return 1
sth_statinfo_old = sth_statinfo
while sth_statinfo == sth_statinfo_old:
- sleep(args.mergeinterval / 30)
+ sleep(max(3, args.mergeinterval / 10))
sth_statinfo = stat(sth_path)
return 0
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index 7973fae..6accca4 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -10,14 +10,18 @@
import sys
import struct
import subprocess
+import signal
import logging
from time import sleep
+from multiprocessing import Process, Pipe
+from random import Random
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
- hexencode, parse_args, perm, flock_ex_or_fail, Status
+ hexencode, hexdecode, parse_args, perm, flock_ex_or_fail, Status, \
+ terminate_child_procs
from certtools import timing_point, write_file, create_ssl_context
-def merge_fetch(args, config, localconfig):
+def merge_fetch_sequenced(args, config, localconfig):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
@@ -96,14 +100,191 @@ def merge_fetch(args, config, localconfig):
else:
return (tree_size, logorder[tree_size-1])
+def merge_fetch_worker(args, localconfig, storagenode, pipe):
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
+ own_key = (localconfig["nodename"],
+ "%s/%s-private.pem" % (paths["privatekeys"],
+ localconfig["nodename"]))
+ to_fetch = set()
+ timeout = max(3, args.mergeinterval / 10)
+ while True:
+ if pipe.poll(timeout):
+ msg = pipe.recv().split()
+ if len(msg) < 2:
+ continue
+ cmd = msg[0]
+ ehash = msg[1]
+ if cmd == 'FETCH':
+ to_fetch.add(hexdecode(ehash))
+ else:
+ logging.warning("%s: unknown command from parent: %s",
+ storagenode["name"], msg)
+
+ if len(to_fetch) > 0:
+ logging.info("%s: fetching %d entries", storagenode["name"],
+ len(to_fetch))
+ # TODO: Consider running the verifycert process longer.
+ verifycert = subprocess.Popen(
+ [paths["verifycert_bin"], paths["known_roots"]],
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ # Chunking for letting other workers take the chainsdb lock.
+ for chunk in chunks(list(to_fetch), 100):
+ chainsdb.lock_ex()
+ entries = get_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths, chunk)
+ for ehash in chunk:
+ entry = entries[ehash]
+ verify_entry(verifycert, entry, ehash)
+ chainsdb.add(ehash, entry)
+ chainsdb.commit()
+ for ehash in chunk:
+ pipe.send('FETCHED %s' % hexencode(ehash))
+ to_fetch.remove(ehash)
+ verifycert.communicate(struct.pack("I", 0))
+
+ new_entries = get_new_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths)
+ if len(new_entries) > 0:
+ logging.info("%s: got %d new entries", storagenode["name"],
+ len(new_entries))
+ for ehash in new_entries:
+ pipe.send('NEWENTRY %s' % hexencode(ehash))
+
+def term(signal, arg):
+ terminate_child_procs()
+ sys.exit(1)
+
+def newworker(name, args):
+ my_conn, child_conn = Pipe()
+ p = Process(target=merge_fetch_worker,
+ args=tuple(args + [child_conn]),
+ name='merge_fetch_%s' % name)
+ p.daemon = True
+ p.start()
+ logging.debug("%s started, pid %d", name, p.pid)
+ return (name, my_conn, p)
+
+def merge_fetch_parallel(args, config, localconfig):
+ paths = localconfig["paths"]
+ storagenodes = config["storagenodes"]
+ mergedb = paths["mergedb"]
+ logorderfile = mergedb + "/logorder"
+ currentsizefile = mergedb + "/fetched"
+
+ rand = Random()
+ signal.signal(signal.SIGTERM, term)
+
+ procs = {}
+ for storagenode in storagenodes:
+ name = storagenode['name']
+ procs[name] = newworker(name, [args, localconfig, storagenode])
+
+ logorder = get_logorder(logorderfile) # List of entries in log.
+ entries_in_log = set(logorder) # Set of entries in log.
+ entries_to_fetch = set() # Set of entries to fetch.
+ fetch = {} # Dict with entries to fetch.
+ while procs:
+ assert(not entries_to_fetch)
+ # Poll worker processes.
+ for name, pipe, p in procs.values():
+ if not p.is_alive():
+ logging.warning("%s is gone, restarting", name)
+ procs[name] = newworker(name, [args, localconfig,
+ storagenodes[name]])
+ continue
+ logging.info("polling %s", name)
+ if pipe.poll(1):
+ msg = pipe.recv().split()
+ if len(msg) < 2:
+ logging.warning("unknown command from %s: %s", name, msg)
+ continue
+ cmd = msg[0]
+ ehash = msg[1]
+ if cmd == 'NEWENTRY':
+ logging.info("NEWENTRY at %s: %s", name, ehash)
+ entries_to_fetch.add(ehash)
+ logging.debug("entries_to_fetch: %s", entries_to_fetch)
+ elif cmd == 'FETCHED':
+ logging.info("FETCHED from %s: %s", name, ehash)
+ logorder.append(ehash)
+ add_to_logorder(logorderfile, hexdecode(ehash))
+ fsync_logorder(logorderfile)
+ entries_in_log.add(ehash)
+ if ehash in entries_to_fetch:
+ entries_to_fetch.remove(ehash)
+ del fetch[ehash]
+ else:
+ logging.warning("unknown command from %s: %s", name, msg)
+
+ # Ask workers to fetch entries.
+ logging.debug("nof entries to fetch including entries in log: %d",
+ len(entries_to_fetch))
+ entries_to_fetch -= entries_in_log
+ logging.info("entries to fetch: %d", len(entries_to_fetch))
+ # Add entries in entries_to_fetch as keys in dictionary fetch,
+ # values being a list of storage nodes, in randomised order.
+ for e in entries_to_fetch:
+ if not e in fetch:
+ l = procs.values()
+ rand.shuffle(l)
+ fetch[e] = l
+ # For each entry to fetch, treat its list of nodes as a
+ # circular list and ask the one in the front to fetch the
+ # entry.
+ while entries_to_fetch:
+ ehash = entries_to_fetch.pop()
+ nodes = fetch[ehash]
+ node = nodes.pop(0)
+ fetch[ehash] = nodes.append(node)
+ name, pipe, p = node
+ logging.info("asking %s to FETCH %s", name, ehash)
+ pipe.send("FETCH %s" % ehash)
+
+ # Update the 'fetched' file.
+ logsize = len(logorder)
+ if logsize == 0:
+ last_hash = ''
+ else:
+ last_hash = logorder[logsize - 1]
+ logging.info("updating 'fetched' file: %d %s", logsize-1, last_hash)
+ currentsize = {"index": logsize - 1, "hash": last_hash}
+ logging.debug("writing to %s: %s", currentsizefile, currentsize)
+ write_file(currentsizefile, currentsize)
+
+ return 0
+
def main():
"""
- Fetch new entries from all storage nodes, in sequence.
+ If no `--mergeinterval':
+ Fetch new entries from all storage nodes, in sequence, updating
+ the 'logorder' file and the 'chains' database.
- Indicate the current position by writing the hash and its 'logorder'
- index, 0-based, to 'fetched'.
+ Write 'fetched' to reflect how far in 'logorder' we've succesfully
+ fetched and verified.
- Sleep some and start over, or exit if there's no `--mergeinterval'.
+ If `--mergeinterval':
+ Start one process per storage node, read their stdout for learning
+ about two things: (i) new entries ready for fetching ("NEWENTRY") and
+ (ii) new entries being succesfully fetched ("FETCHED").
+
+ Write to their stdin ("FETCH") when they should fetch another entry.
+ Update 'logorder' and the 'chains' database as we see new FETCHED
+ messages.
+
+ Write 'fetched' to reflect how far in 'logorder' we've succesfully
+ fetched and verified.
+
+ Keep doing this forever.
+
+ NOTE: The point of having 'fetched' is that it can be atomically
+ written while 'logorder' cannot (unless we're fine with rewriting it
+ for each and every update, which we're not).
+
+ TODO: Deduplicate some code.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
@@ -124,17 +305,14 @@ def main():
create_ssl_context(cafile=paths["https_cacertfile"])
- while True:
- logsize, last_hash = merge_fetch(args, config, localconfig)
+ if args.mergeinterval:
+ return merge_fetch_parallel(args, config, localconfig)
+ else:
+ logsize, last_hash = merge_fetch_sequenced(args, config, localconfig)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
logging.debug("writing to %s: %s", currentsizefile, currentsize)
write_file(currentsizefile, currentsize)
- if args.mergeinterval is None:
- break
- logging.debug("sleeping %d seconds", args.mergeinterval / 10)
- sleep(args.mergeinterval / 10)
-
- return 0
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 5471fe4..9a4f6b2 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -13,6 +13,7 @@ import requests
import time
import fcntl
import errno
+import multiprocessing
import logging
try:
import permdb
@@ -289,7 +290,7 @@ def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
try:
- json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
+ json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries]
result = http_request(
baseurl + "plop/v1/merge/sendentry",
json.dumps(json_entries),
@@ -301,7 +302,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, hash
+ print >>sys.stderr, ehash
print >>sys.stderr, "======= RESPONSE ======="
print >>sys.stderr, result
print >>sys.stderr, "========================"
@@ -435,6 +436,10 @@ def waitforfile(path):
return statinfo
def flock_ex_or_fail(path):
+ """
+ To be used at most once per process. Will otherwise leak file
+ descriptors.
+ """
try:
fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
except IOError, e:
@@ -443,6 +448,30 @@ def flock_ex_or_fail(path):
return False
return True
+def flock_ex_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for exclusive lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ logging.debug("taken exclusive lock on %s", fd)
+ return fd
+
+def flock_sh_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for shared lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_SH)
+ logging.debug("taken shared lock on %s", fd)
+ return fd
+
+def flock_release(fd):
+ logging.debug("releasing lock on %s", fd)
+ fcntl.flock(fd, fcntl.LOCK_UN)
+ os.close(fd)
+
+def terminate_child_procs():
+ for p in multiprocessing.active_children():
+ #print >>sys.stderr, "DEBUG: terminating pid", p.pid
+ p.terminate()
+
class Status:
def __init__(self, path):
self.path = path
@@ -452,12 +481,17 @@ class Status:
class FileDB:
def __init__(self, path):
self.path = path
+ self.lockfile = None
def get(self, key):
return read_chain(self.path, key)
def add(self, key, value):
return write_chain(key, value, self.path)
+ def lock_sh(self):
+ self.lockfile = flock_sh_wait(self.path + "/.lock")
+ def lock_ex(self):
+ self.lockfile = flock_ex_wait(self.path + "/.lock")
def commit(self):
- pass
+ flock_release(self.lockfile)
class PermDB:
def __init__(self, path):
@@ -466,5 +500,10 @@ class PermDB:
return permdb.getvalue(self.permdbobj, key)
def add(self, key, value):
return permdb.addvalue(self.permdbobj, key, value)
+ def lock_sh(self):
+ assert False # NYI
+ def lock_ex(self):
+ assert False # NYI
def commit(self):
permdb.committree(self.permdbobj)
+
diff --git a/tools/testcase1.py b/tools/testcase1.py
index 98b9179..885c24d 100755
--- a/tools/testcase1.py
+++ b/tools/testcase1.py
@@ -156,7 +156,7 @@ def merge():
return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",
"--localconfig", testdir + "/catlfish-test-local-merge.cfg"])
else:
- n = 60
+ n = 40
print "testcase1.py: sleeping", n, "seconds waiting for merge"
sleep(n)
return 0