diff options
| -rwxr-xr-x | test/scripts/continuous-merge-start.sh | 2 | ||||
| -rwxr-xr-x | test/scripts/continuous-merge-test-run-1.sh | 14 | ||||
| -rwxr-xr-x | test/scripts/light-system-test-prepare.sh | 1 | ||||
| -rwxr-xr-x | tools/merge | 23 | ||||
| -rwxr-xr-x | tools/merge_backup.py | 2 | ||||
| -rwxr-xr-x | tools/merge_dist.py | 2 | ||||
| -rwxr-xr-x | tools/merge_fetch.py | 206 | ||||
| -rw-r--r-- | tools/mergetools.py | 45 | ||||
| -rwxr-xr-x | tools/testcase1.py | 2 | 
9 files changed, 264 insertions, 33 deletions
| diff --git a/test/scripts/continuous-merge-start.sh b/test/scripts/continuous-merge-start.sh index 666512f..5791d12 100755 --- a/test/scripts/continuous-merge-start.sh +++ b/test/scripts/continuous-merge-start.sh @@ -7,4 +7,4 @@ top_srcdir=$(cd $(dirname $0)/../..; pwd)  . ${top_srcdir}/test/scripts/testutils.sh -do_merge --mergeinterval 30 --pidfile merge.pid +do_merge --mergeinterval 10 --pidfile merge.pid diff --git a/test/scripts/continuous-merge-test-run-1.sh b/test/scripts/continuous-merge-test-run-1.sh index 537ff2f..1a1dae6 100755 --- a/test/scripts/continuous-merge-test-run-1.sh +++ b/test/scripts/continuous-merge-test-run-1.sh @@ -3,6 +3,8 @@  set -o nounset  set -o errexit +SLEEP=40 +  top_srcdir=$(cd $(dirname $0)/../..; pwd)  . ${top_srcdir}/test/scripts/testutils.sh @@ -15,8 +17,8 @@ for certfile in ${top_srcdir}/tools/testcerts/cert[1-5].txt ${top_srcdir}/tools/      python ${top_srcdir}/tools/submitcert.py --parallel=1 --store $certfile --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed"  done  python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg  || fail "GC failed" -echo "$0: sleeping for 60 seconds" -sleep 60 +echo "$0: sleeping for $SLEEP seconds" +sleep $SLEEP  check_sth  assert_equal "Tree size" "$(get_treesize)" 7 @@ -37,8 +39,8 @@ echo NOTE: merge backup should fail with 111 Connection refused  ${top_srcdir}/tools/to_catlfish.py to_erl nodes/merge-2/ "init:stop()"  python ${top_srcdir}/tools/submitcert.py --parallel=1 --store ${top_srcdir}/tools/testcerts/cert6.txt --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed" -echo "$0: sleeping for 60 seconds" -sleep 60 +echo "$0: sleeping for $SLEEP seconds" +sleep $SLEEP  assert_equal "Tree size" "$(get_treesize)" 7  check_sth @@ -49,8 +51,8 @@ for i in 1 2 3 4 5 6 7 8 9 10; do      if curl -s --cacert httpsca/demoCA/cacert.pem -4 https://localhost:8181 > /dev/null ; then break; fi  done -echo "$0: sleeping for 60 seconds" -sleep 60 +echo "$0: sleeping for $SLEEP seconds" +sleep $SLEEP  assert_equal "Tree size" "$(get_treesize)" 8  check_sth diff --git a/test/scripts/light-system-test-prepare.sh b/test/scripts/light-system-test-prepare.sh index 466f3aa..278435f 100755 --- a/test/scripts/light-system-test-prepare.sh +++ b/test/scripts/light-system-test-prepare.sh @@ -42,6 +42,7 @@ mkdir keys  (cd keys ; ../../../tools/create-key.sh logkey)  openssl pkcs8 -topk8 -nocrypt -in keys/logkey-private.pem -out keys/logkey-private.pkcs8  mkdir mergedb +mkdir mergedb/chains          # needed for merge_fetch worker lockfile  touch mergedb/logorder  mkdir mergedb-secondary  touch mergedb-secondary/logorder diff --git a/tools/merge b/tools/merge index 0d1bf0e..4ba0438 100755 --- a/tools/merge +++ b/tools/merge @@ -5,7 +5,7 @@ import os  import sys  import signal  from time import sleep -from mergetools import parse_args +from mergetools import parse_args, terminate_child_procs  from multiprocessing import Process  import merge_fetch, merge_backup, merge_sth, merge_dist @@ -21,27 +21,38 @@ def run_once():      return ret  def term(signal, arg): +    terminate_child_procs()      sys.exit(1)  def run_continuously(pidfilepath):      """Run continuously.""" -    parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist) +    parts = (('fetch', merge_fetch), +             ('backup', merge_backup), +             ('sth', merge_sth), +             ('dist', merge_dist))      procs = {}      for part, mod in parts:          procs[part] = Process(target=mod.main, name='merge_%s' % part) -        procs[part].daemon = True          procs[part].start() +        #print >>sys.stderr, "DEBUG:", part, "started, pid", procs[part].pid      if pidfilepath:          open(pidfilepath, 'w').write(str(os.getpid()) + '\n')      signal.signal(signal.SIGTERM, term) -    while True: +    retval = 0 +    keep_going = True +    while keep_going: +        sleep(1)          for name, p in procs.items():              if not p.is_alive():                  print >>sys.stderr, "\nERROR:", name, "process is gone; exiting" -                return 1 -        sleep(1) +                retval = 1                # Fail. +                keep_going = False +                break + +    terminate_child_procs() +    return retval  def main():      """Main""" diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 723fc7a..32bae53 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -217,7 +217,7 @@ def main():              return err          fetched_statinfo_old = fetched_statinfo          while fetched_statinfo == fetched_statinfo_old: -            sleep(args.mergeinterval / 30) +            sleep(max(3, args.mergeinterval / 10))              fetched_statinfo = stat(fetched_path)      return 0 diff --git a/tools/merge_dist.py b/tools/merge_dist.py index 7a13bfa..b018f63 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -167,7 +167,7 @@ def main():              return 1          sth_statinfo_old = sth_statinfo          while sth_statinfo == sth_statinfo_old: -            sleep(args.mergeinterval / 30) +            sleep(max(3, args.mergeinterval / 10))              sth_statinfo = stat(sth_path)      return 0 diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 7973fae..6accca4 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,14 +10,18 @@  import sys  import struct  import subprocess +import signal  import logging  from time import sleep +from multiprocessing import Process, Pipe +from random import Random  from mergetools import get_logorder, verify_entry, get_new_entries, \       chunks, fsync_logorder, get_entries, add_to_logorder, \ -     hexencode, parse_args, perm, flock_ex_or_fail, Status +     hexencode, hexdecode, parse_args, perm, flock_ex_or_fail, Status, \ +     terminate_child_procs  from certtools import timing_point, write_file, create_ssl_context -def merge_fetch(args, config, localconfig): +def merge_fetch_sequenced(args, config, localconfig):      paths = localconfig["paths"]      storagenodes = config["storagenodes"]      mergedb = paths["mergedb"] @@ -96,14 +100,191 @@ def merge_fetch(args, config, localconfig):      else:          return (tree_size, logorder[tree_size-1]) +def merge_fetch_worker(args, localconfig, storagenode, pipe): +    paths = localconfig["paths"] +    mergedb = paths["mergedb"] +    chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") +    own_key = (localconfig["nodename"], +               "%s/%s-private.pem" % (paths["privatekeys"], +                                      localconfig["nodename"])) +    to_fetch = set() +    timeout = max(3, args.mergeinterval / 10) +    while True: +        if pipe.poll(timeout): +            msg = pipe.recv().split() +            if len(msg) < 2: +                continue +            cmd = msg[0] +            ehash = msg[1] +            if cmd == 'FETCH': +                to_fetch.add(hexdecode(ehash)) +            else: +                logging.warning("%s: unknown command from parent: %s", +                                storagenode["name"], msg) + +        if len(to_fetch) > 0: +            logging.info("%s: fetching %d entries", storagenode["name"], +                         len(to_fetch)) +            # TODO: Consider running the verifycert process longer. +            verifycert = subprocess.Popen( +                [paths["verifycert_bin"], paths["known_roots"]], +                stdin=subprocess.PIPE, stdout=subprocess.PIPE) +            # Chunking for letting other workers take the chainsdb lock. +            for chunk in chunks(list(to_fetch), 100): +                chainsdb.lock_ex() +                entries = get_entries(storagenode["name"], +                                      "https://%s/" % storagenode["address"], +                                      own_key, paths, chunk) +                for ehash in chunk: +                    entry = entries[ehash] +                    verify_entry(verifycert, entry, ehash) +                    chainsdb.add(ehash, entry) +                chainsdb.commit() +                for ehash in chunk: +                    pipe.send('FETCHED %s' % hexencode(ehash)) +                    to_fetch.remove(ehash) +            verifycert.communicate(struct.pack("I", 0)) + +        new_entries = get_new_entries(storagenode["name"], +                                      "https://%s/" % storagenode["address"], +                                      own_key, paths) +        if len(new_entries) > 0: +            logging.info("%s: got %d new entries", storagenode["name"], +                         len(new_entries)) +            for ehash in new_entries: +                pipe.send('NEWENTRY %s' % hexencode(ehash)) + +def term(signal, arg): +    terminate_child_procs() +    sys.exit(1) + +def newworker(name, args): +    my_conn, child_conn = Pipe() +    p = Process(target=merge_fetch_worker, +                args=tuple(args + [child_conn]), +                name='merge_fetch_%s' % name) +    p.daemon = True +    p.start() +    logging.debug("%s started, pid %d", name, p.pid) +    return (name, my_conn, p) + +def merge_fetch_parallel(args, config, localconfig): +    paths = localconfig["paths"] +    storagenodes = config["storagenodes"] +    mergedb = paths["mergedb"] +    logorderfile = mergedb + "/logorder" +    currentsizefile = mergedb + "/fetched" + +    rand = Random() +    signal.signal(signal.SIGTERM, term) + +    procs = {} +    for storagenode in storagenodes: +        name = storagenode['name'] +        procs[name] = newworker(name, [args, localconfig, storagenode]) + +    logorder = get_logorder(logorderfile)   # List of entries in log. +    entries_in_log = set(logorder)          # Set of entries in log. +    entries_to_fetch = set()                # Set of entries to fetch. +    fetch = {}                              # Dict with entries to fetch. +    while procs: +        assert(not entries_to_fetch) +        # Poll worker processes. +        for name, pipe, p in procs.values(): +            if not p.is_alive(): +                logging.warning("%s is gone, restarting", name) +                procs[name] = newworker(name, [args, localconfig, +                                               storagenodes[name]]) +                continue +            logging.info("polling %s", name) +            if pipe.poll(1): +                msg = pipe.recv().split() +                if len(msg) < 2: +                    logging.warning("unknown command from %s: %s", name, msg) +                    continue +                cmd = msg[0] +                ehash = msg[1] +                if cmd == 'NEWENTRY': +                    logging.info("NEWENTRY at %s: %s", name, ehash) +                    entries_to_fetch.add(ehash) +                    logging.debug("entries_to_fetch: %s", entries_to_fetch) +                elif cmd == 'FETCHED': +                    logging.info("FETCHED from %s: %s", name, ehash) +                    logorder.append(ehash) +                    add_to_logorder(logorderfile, hexdecode(ehash)) +                    fsync_logorder(logorderfile) +                    entries_in_log.add(ehash) +                    if ehash in entries_to_fetch: +                        entries_to_fetch.remove(ehash) +                    del fetch[ehash] +                else: +                    logging.warning("unknown command from %s: %s", name, msg) + +        # Ask workers to fetch entries. +        logging.debug("nof entries to fetch including entries in log: %d", +                      len(entries_to_fetch)) +        entries_to_fetch -= entries_in_log +        logging.info("entries to fetch: %d", len(entries_to_fetch)) +        # Add entries in entries_to_fetch as keys in dictionary fetch, +        # values being a list of storage nodes, in randomised order. +        for e in entries_to_fetch: +            if not e in fetch: +                l = procs.values() +                rand.shuffle(l) +                fetch[e] = l +        # For each entry to fetch, treat its list of nodes as a +        # circular list and ask the one in the front to fetch the +        # entry. +        while entries_to_fetch: +            ehash = entries_to_fetch.pop() +            nodes = fetch[ehash] +            node = nodes.pop(0) +            fetch[ehash] = nodes.append(node) +            name, pipe, p = node +            logging.info("asking %s to FETCH %s", name, ehash) +            pipe.send("FETCH %s" % ehash) + +        # Update the 'fetched' file. +        logsize = len(logorder) +        if logsize == 0: +            last_hash = '' +        else: +            last_hash = logorder[logsize - 1] +        logging.info("updating 'fetched' file: %d %s", logsize-1, last_hash) +        currentsize = {"index": logsize - 1, "hash": last_hash} +        logging.debug("writing to %s: %s", currentsizefile, currentsize) +        write_file(currentsizefile, currentsize) + +    return 0 +  def main():      """ -    Fetch new entries from all storage nodes, in sequence. +    If no `--mergeinterval': +      Fetch new entries from all storage nodes, in sequence, updating +      the 'logorder' file and the 'chains' database. -    Indicate the current position by writing the hash and its 'logorder' -    index, 0-based, to 'fetched'. +      Write 'fetched' to reflect how far in 'logorder' we've succesfully +      fetched and verified. -    Sleep some and start over, or exit if there's no `--mergeinterval'. +    If `--mergeinterval': +      Start one process per storage node, read their stdout for learning +      about two things: (i) new entries ready for fetching ("NEWENTRY") and +      (ii) new entries being succesfully fetched ("FETCHED"). + +      Write to their stdin ("FETCH") when they should fetch another entry. +      Update 'logorder' and the 'chains' database as we see new FETCHED +      messages. + +      Write 'fetched' to reflect how far in 'logorder' we've succesfully +      fetched and verified. + +      Keep doing this forever. + +    NOTE: The point of having 'fetched' is that it can be atomically +    written while 'logorder' cannot (unless we're fine with rewriting it +    for each and every update, which we're not). + +    TODO: Deduplicate some code.      """      args, config, localconfig = parse_args()      paths = localconfig["paths"] @@ -124,17 +305,14 @@ def main():      create_ssl_context(cafile=paths["https_cacertfile"]) -    while True: -        logsize, last_hash = merge_fetch(args, config, localconfig) +    if args.mergeinterval: +        return merge_fetch_parallel(args, config, localconfig) +    else: +        logsize, last_hash = merge_fetch_sequenced(args, config, localconfig)          currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}          logging.debug("writing to %s: %s", currentsizefile, currentsize)          write_file(currentsizefile, currentsize) -        if args.mergeinterval is None: -            break -        logging.debug("sleeping %d seconds", args.mergeinterval / 10) -        sleep(args.mergeinterval / 10) - -    return 0 +        return 0  if __name__ == '__main__':      sys.exit(main()) diff --git a/tools/mergetools.py b/tools/mergetools.py index 5471fe4..9a4f6b2 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -13,6 +13,7 @@ import requests  import time  import fcntl  import errno +import multiprocessing  import logging  try:      import permdb @@ -289,7 +290,7 @@ def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):  def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):      try: -        json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] +        json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries]          result = http_request(              baseurl + "plop/v1/merge/sendentry",              json.dumps(json_entries), @@ -301,7 +302,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):          sys.exit(1)      except ValueError, e:          print >>sys.stderr, "==== FAILED REQUEST ====" -        print >>sys.stderr, hash +        print >>sys.stderr, ehash          print >>sys.stderr, "======= RESPONSE ======="          print >>sys.stderr, result          print >>sys.stderr, "========================" @@ -435,6 +436,10 @@ def waitforfile(path):      return statinfo  def flock_ex_or_fail(path): +    """ +    To be used at most once per process. Will otherwise leak file +    descriptors. +    """      try:          fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)      except IOError, e: @@ -443,6 +448,30 @@ def flock_ex_or_fail(path):          return False      return True +def flock_ex_wait(path): +    fd = os.open(path, os.O_CREAT) +    logging.debug("waiting for exclusive lock on %s (%s)", fd, path) +    fcntl.flock(fd, fcntl.LOCK_EX) +    logging.debug("taken exclusive lock on %s", fd) +    return fd + +def flock_sh_wait(path): +    fd = os.open(path, os.O_CREAT) +    logging.debug("waiting for shared lock on %s (%s)", fd, path) +    fcntl.flock(fd, fcntl.LOCK_SH) +    logging.debug("taken shared lock on %s", fd) +    return fd + +def flock_release(fd): +    logging.debug("releasing lock on %s", fd) +    fcntl.flock(fd, fcntl.LOCK_UN) +    os.close(fd) + +def terminate_child_procs(): +    for p in multiprocessing.active_children(): +        #print >>sys.stderr, "DEBUG: terminating pid", p.pid +        p.terminate() +  class Status:      def __init__(self, path):          self.path = path @@ -452,12 +481,17 @@ class Status:  class FileDB:      def __init__(self, path):          self.path = path +        self.lockfile = None      def get(self, key):          return read_chain(self.path, key)      def add(self, key, value):          return write_chain(key, value, self.path) +    def lock_sh(self): +        self.lockfile = flock_sh_wait(self.path + "/.lock") +    def lock_ex(self): +        self.lockfile = flock_ex_wait(self.path + "/.lock")      def commit(self): -        pass +        flock_release(self.lockfile)  class PermDB:      def __init__(self, path): @@ -466,5 +500,10 @@ class PermDB:          return permdb.getvalue(self.permdbobj, key)      def add(self, key, value):          return permdb.addvalue(self.permdbobj, key, value) +    def lock_sh(self): +        assert False                                   # NYI +    def lock_ex(self): +        assert False                                   # NYI      def commit(self):          permdb.committree(self.permdbobj) + diff --git a/tools/testcase1.py b/tools/testcase1.py index 98b9179..885c24d 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -156,7 +156,7 @@ def merge():          return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",                                  "--localconfig", testdir + "/catlfish-test-local-merge.cfg"])      else: -        n = 60 +        n = 40          print "testcase1.py: sleeping", n, "seconds waiting for merge"          sleep(n)          return 0 | 
