diff options
author | Linus Nordberg <linus@nordu.net> | 2016-11-23 17:09:48 +0100 |
---|---|---|
committer | Linus Nordberg <linus@nordu.net> | 2016-11-23 17:09:48 +0100 |
commit | 19a2a611a839c0318f58347e2d93943c8e2401a5 (patch) | |
tree | 18cd302161a88d4546b39792a4bff6b1ade95c77 | |
parent | 27e368196ce65e109c027987c706a697356f7bc5 (diff) |
WIP
Merge can run as four separate processes, plus a fifth controlling
proces 'merge'.
Tests are limited to testcase1.py and they're failing because of the
test with the dead merge secondary. Tests are also time consuming
because they're waiting for 60s each time a merge needs to be
verified. This could be improved by peeking at the control files, for
example.
-rw-r--r-- | doc/merge.txt | 9 | ||||
-rw-r--r-- | doc/minimalsystem.txt | 15 | ||||
-rw-r--r-- | test/Makefile | 19 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-start.sh | 10 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-stop.sh | 6 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-test-run-1.sh | 56 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-test.sh | 27 | ||||
-rw-r--r-- | test/scripts/testutils.sh | 2 | ||||
-rwxr-xr-x | tools/merge | 62 | ||||
-rwxr-xr-x | tools/merge_backup.py | 141 | ||||
-rwxr-xr-x | tools/merge_dist.py | 141 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 59 | ||||
-rwxr-xr-x | tools/merge_sth.py | 81 | ||||
-rw-r--r-- | tools/mergetools.py | 110 | ||||
-rwxr-xr-x | tools/testcase1.py | 16 |
15 files changed, 525 insertions, 229 deletions
diff --git a/doc/merge.txt b/doc/merge.txt index d12424f..c7d8dc9 100644 --- a/doc/merge.txt +++ b/doc/merge.txt @@ -20,6 +20,13 @@ The merge process - merge-dist distributes 'sth' and missing entries to frontend nodes. + [logorder] + v + fetch backup sth dist + v ^ v ^ v ^ + [fetched] [backup.<secondary>] [sth] + + Merge distribution (merge_dist) ----------------------------------------------------- @@ -83,8 +90,6 @@ Merge backup (merge_backup) TODO ==== -- Run the three pieces in parallell. - - Improve merge-fetch by parallellising it using one process per storage node writing to a "queue info" file (storage-node, hash) and a single "queue handling process" reading queue files and writing to the diff --git a/doc/minimalsystem.txt b/doc/minimalsystem.txt index 061b6cc..3f10784 100644 --- a/doc/minimalsystem.txt +++ b/doc/minimalsystem.txt @@ -46,18 +46,21 @@ If you want to submit all the files in a directory, name directory with a `/` at the end, for example `tools/testcerts/`. -Running merge once ------------------- +Running merge +------------- - tools/merge.py --config test/catlfish-test.cfg +To run merge once, do + + tools/merge --config test/catlfish-test.cfg --localconfig test/catlfish-test-local-merge.cfg This will read the submitted certificates from the storage node, decide the order, and publish the certificates to the frontend server. -If you want to run the system continuously, run the merge command in -cron or in a while loop. See `packaging/docker/catlfish-dev/merge.sh` -for an example of the latter. +If you want to run merge continuously, use `--mergeinterval' or run +the merge command in a while loop. See +`catlfish-dockerfiles/catlfish-dev/merge.sh` for an example of the +latter. Verifying SCT:s --------------- diff --git a/test/Makefile b/test/Makefile index bd58cfe..cee5186 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,20 +1,27 @@ PREFIX=.. INSTDIR=$(PREFIX)/catlfish +all: tests + tests-start: - ./scripts/light-system-test-start.sh + (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test-start.sh) tests-stop: - ./scripts/light-system-test-stop.sh - -tests-wait: - sleep 5 + (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test-stop.sh) tests-makemk: $(PREFIX)/tools/compileconfig.py --config=$(PREFIX)/test/catlfish-test.cfg --testshellvars=$(PREFIX)/test/test.shvars --machines 1 -tests: +tests: tests-basic + +tests-basic: @make tests-makemk rm -r $(INSTDIR)/tests || true mkdir $(INSTDIR)/tests (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test.sh) + +tests-continuous-merge: + @make tests-makemk + rm -r $(INSTDIR)/tests || true + mkdir $(INSTDIR)/tests + (cd $(INSTDIR)/tests && ../../test/scripts/continuous-merge-test.sh) diff --git a/test/scripts/continuous-merge-start.sh b/test/scripts/continuous-merge-start.sh new file mode 100755 index 0000000..666512f --- /dev/null +++ b/test/scripts/continuous-merge-start.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/scripts/testutils.sh + +do_merge --mergeinterval 30 --pidfile merge.pid diff --git a/test/scripts/continuous-merge-stop.sh b/test/scripts/continuous-merge-stop.sh new file mode 100755 index 0000000..f6632f8 --- /dev/null +++ b/test/scripts/continuous-merge-stop.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +set -o nounset + +kill $(cat merge.pid) +exit 0 diff --git a/test/scripts/continuous-merge-test-run-1.sh b/test/scripts/continuous-merge-test-run-1.sh new file mode 100755 index 0000000..537ff2f --- /dev/null +++ b/test/scripts/continuous-merge-test-run-1.sh @@ -0,0 +1,56 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/scripts/testutils.sh + +python ${top_srcdir}/tools/testcase1.py https://localhost:8080/ keys/logkey.pem httpsca/demoCA/cacert.pem ${top_srcdir}/test --nomerge || fail "Tests failed" +check_sth +python ${top_srcdir}/tools/fetchallcerts.py ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Verification failed" +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" +for certfile in ${top_srcdir}/tools/testcerts/cert[1-5].txt ${top_srcdir}/tools/testcerts/pre[12].txt; do + python ${top_srcdir}/tools/submitcert.py --parallel=1 --store $certfile --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed" +done +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" +echo "$0: sleeping for 60 seconds" +sleep 60 +check_sth + +assert_equal "Tree size" "$(get_treesize)" 7 + +mkdir fetchcertstore +python ${top_srcdir}/tools/fetchallcerts.py ${BASEURL} --store fetchcertstore --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Verification failed" +(cd fetchcertstore && unzip 0000.zip) + +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert1.txt fetchcertstore/00000000 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert2.txt fetchcertstore/00000001 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert3.txt fetchcertstore/00000002 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert4.txt fetchcertstore/00000003 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert5.txt fetchcertstore/00000004 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/pre1.txt:${top_srcdir}/tools/testcerts/pre2.txt fetchcertstore/00000005:fetchcertstore/00000006 || fail"Verification failed" +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" + +echo NOTE: merge backup should fail with 111 Connection refused +${top_srcdir}/tools/to_catlfish.py to_erl nodes/merge-2/ "init:stop()" +python ${top_srcdir}/tools/submitcert.py --parallel=1 --store ${top_srcdir}/tools/testcerts/cert6.txt --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed" + +echo "$0: sleeping for 60 seconds" +sleep 60 +assert_equal "Tree size" "$(get_treesize)" 7 + +check_sth +../bin/run_erl -daemon nodes/merge-2/ nodes/merge-2/log/ "exec ../bin/erl -config merge-2" +for i in 1 2 3 4 5 6 7 8 9 10; do + echo "waiting for system to start" ; \ + sleep 0.5 ; \ + if curl -s --cacert httpsca/demoCA/cacert.pem -4 https://localhost:8181 > /dev/null ; then break; fi +done + +echo "$0: sleeping for 60 seconds" +sleep 60 +assert_equal "Tree size" "$(get_treesize)" 8 + +check_sth diff --git a/test/scripts/continuous-merge-test.sh b/test/scripts/continuous-merge-test.sh new file mode 100755 index 0000000..3bb6de2 --- /dev/null +++ b/test/scripts/continuous-merge-test.sh @@ -0,0 +1,27 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/test.shvars + +SCRIPTS=${top_srcdir}/test/scripts + +tests_start() { + ${SCRIPTS}/light-system-test-start.sh + ${SCRIPTS}/continuous-merge-start.sh & +} + +tests_stop() { + ${SCRIPTS}/continuous-merge-stop.sh + ${SCRIPTS}/light-system-test-stop.sh +} + +echo "$0: starting" +${SCRIPTS}/light-system-test-prepare.sh +tests_start +${SCRIPTS}/continuous-merge-test-run-1.sh || (echo "Tests failed"; sleep 5; tests_stop; false) +sleep 5 +tests_stop diff --git a/test/scripts/testutils.sh b/test/scripts/testutils.sh index 94d6223..908c990 100644 --- a/test/scripts/testutils.sh +++ b/test/scripts/testutils.sh @@ -18,5 +18,5 @@ check_sth() { } do_merge() { - ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed" + ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg $@ || fail "Merge failed" } diff --git a/tools/merge b/tools/merge index b5a50d5..0d1bf0e 100755 --- a/tools/merge +++ b/tools/merge @@ -1,10 +1,58 @@ -#! /bin/sh +#! /usr/bin/env python +"""merge""" -set -o errexit +import os +import sys +import signal +from time import sleep +from mergetools import parse_args +from multiprocessing import Process +import merge_fetch, merge_backup, merge_sth, merge_dist -BINDIR=$(dirname $0) +def run_once(): + """Merge once.""" + ret = merge_fetch.main() + if ret == 0: + ret = merge_backup.main() + if ret == 0: + ret = merge_sth.main() + if ret == 0: + ret = merge_dist.main() + return ret -$BINDIR/merge_fetch.py "$@" -$BINDIR/merge_backup.py "$@" -$BINDIR/merge_sth.py "$@" -$BINDIR/merge_dist.py "$@" +def term(signal, arg): + sys.exit(1) + +def run_continuously(pidfilepath): + """Run continuously.""" + parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist) + procs = {} + for part, mod in parts: + procs[part] = Process(target=mod.main, name='merge_%s' % part) + procs[part].daemon = True + procs[part].start() + + if pidfilepath: + open(pidfilepath, 'w').write(str(os.getpid()) + '\n') + + signal.signal(signal.SIGTERM, term) + while True: + for name, p in procs.items(): + if not p.is_alive(): + print >>sys.stderr, "\nERROR:", name, "process is gone; exiting" + return 1 + sleep(1) + +def main(): + """Main""" + args, _, _ = parse_args() + + if args.mergeinterval is None: + ret = run_once() + else: + ret = run_continuously(args.pidfile) + + return ret + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 4f688c3..723fc7a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -11,13 +11,16 @@ import sys import base64 import select import requests +import errno +import logging from time import sleep +from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): if trynumber == 1: return None select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() + logging.info("tries left: %d", trynumber) continue return sendlogresult - def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" + statusfile = mergedb + "/merge_backup.status" + s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) @@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries): nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + logging.info("backing up to node %s", nodename) + try: + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when getting verified size from %s", nodename) + return 1 timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() + logging.info("verified size %d", verifiedsize) entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "determining end of log:", + logging.info("determining end of log") for chunk in chunks(entries, 100000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) + logging.error("sendlog result was None") + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() + s.status("INFO: determining end of log: %d" % verifiedsize) if verifiedsize > 100000: verifiedsize -= 100000 else: verifiedsize = 0 + logging.info("end of log determined") timing_point(timing, "checklog") entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: - sys.exit(1) + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sending log: %d" % verifiedsize) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + logging.info("log sent") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("missing entries: %d", len(missingentries)) fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() + logging.info("fetching missing entries") with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] @@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries): own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) + logging.error("sendentry_merge: %s", sendentryresult) + return 1 fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: fetching missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) + try: + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when verifying root at %s", nodename) + return 1 if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) + logging.error("verifyroot: %s", verifyrootresult) + return 1 secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) + logging.error("secondary root hash was %s, expected %s", + hexencode(secondary_root_hash), + hexencode(root_hash)) + return 1 timing_point(timing, "verifyroot") setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} - #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata + logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: - print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read logorder file up until what's indicated by fetched file and - build the tree. + Wait until 'fetched' exists and read it. + + Read 'logorder' up until what's indicated by 'fetched' and build the + tree. Distribute entries to all secondaries, write tree size and tree head - to backup.<secondary> files as each secondary is verified to have + to 'backup.<secondary>' files as each secondary is verified to have the entries. - Sleep some and start over. + If `--mergeinterval', wait until 'fetched' is updated and read it + and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_backup.lock" + fetched_path = mergedb + "/fetched" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_backup.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - paths = localconfig["paths"] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: @@ -187,12 +206,20 @@ def main(): else: nodes = [n for n in all_secondaries if n["name"] in args.node] + if args.mergeinterval is None: + return merge_backup(args, config, localconfig, nodes) + + fetched_statinfo = waitforfile(fetched_path) + while True: - merge_backup(args, config, localconfig, nodes) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + err = merge_backup(args, config, localconfig, nodes) + if err != 0: + return err + fetched_statinfo_old = fetched_statinfo + while fetched_statinfo == fetched_statinfo_old: + sleep(args.mergeinterval / 30) + fetched_statinfo = stat(fetched_path) + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_dist.py b/tools/merge_dist.py index a9b5c60..7a13bfa 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -9,12 +9,15 @@ # import sys import json +import logging from time import sleep from base64 import b64encode, b64decode +from os import stat from certtools import timing_point, \ create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ - sendsth, sendlog, sendentry, parse_args, perm + sendsth, sendlog, sendentry, parse_args, perm, waitforfile, \ + flock_ex_or_fail, Status def merge_dist(args, localconfig, frontendnodes, timestamp): paths = localconfig["paths"] @@ -25,17 +28,19 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" sthfile = mergedb + "/sth" + statusfile = mergedb + "/merge_dist.status" + s = Status(statusfile) create_ssl_context(cafile=paths["https_cacertfile"]) timing = timing_point() try: sth = json.loads(open(sthfile, 'r').read()) except (IOError, ValueError): - print >>sys.stderr, "No valid STH file found in", sthfile + logging.warning("No valid STH file found in %s", sthfile) return timestamp if sth['timestamp'] < timestamp: - print >>sys.stderr, "New STH file older than the previous one:", \ - sth['timestamp'], "<", timestamp + logging.warning("New STH file older than the previous one: %d < %d", + sth['timestamp'], timestamp) return timestamp if sth['timestamp'] == timestamp: return timestamp @@ -49,96 +54,122 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): nodename = frontendnode["name"] timing = timing_point() - print >>sys.stderr, "distributing for node", nodename - sys.stderr.flush() - curpos = get_curpos(nodename, nodeaddress, own_key, paths) + logging.info("distributing for node %s", nodename) + ok, curpos = get_curpos(nodename, nodeaddress, own_key, paths) + if not ok: + logging.error("get_curpos: %s", curpos) + continue timing_point(timing, "get curpos") - print >>sys.stderr, "current position", curpos - sys.stderr.flush() + logging.info("current position %d", curpos) entries = [b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log: %d", len(entries)) + sendlog_fail = False for chunk in chunks(entries, 1000): for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, - own_key, paths, - {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - sleep(10) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue + ok, sendlogresult = sendlog(nodename, nodeaddress, own_key, paths, + {"start": curpos, "hashes": chunk}) + if ok: + break + sleep(10) + logging.warning("tries left: %d", trynumber) + if not ok or sendlogresult.get("result") != "ok": + logging.error("sendlog: %s", sendlogresult) + sendlog_fail = True break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sendlog %d" % curpos) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + if sendlog_fail: + logging.error("sendlog failed for %s", nodename) + continue missingentries = get_missingentries(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("sending missing entries: %d", len(missingentries)) sent_entries = 0 - print >>sys.stderr, "send missing entries", - sys.stderr.flush() + sendentry_fail = False for missingentry in missingentries: ehash = b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, - chainsdb.get(ehash), ehash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry:", sendentryresult - sys.exit(1) + ok, sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, + chainsdb.get(ehash), ehash) + if not ok or sendentryresult.get("result") != "ok": + logging.error("sendentry: %s", sendentryresult) + sendentry_fail = True + break sent_entries += 1 if sent_entries % 1000 == 0: - print >>sys.stderr, sent_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: sendentry %d" % sent_entries) timing_point(timing, "send missing") + if sendentry_fail: + logging.error("sendentry failed for %s", nodename) + continue - print >>sys.stderr, "sending sth to node", nodename - sys.stderr.flush() - sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "sendsth:", sendsthresult - sys.exit(1) + logging.info("sending sth to node %s", nodename) + sendsth_fail = False + ok, sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) + if not ok or sendsthresult.get("result") != "ok": + logging.error("sendsth: %s", sendsthresult) + sendsth_fail = True timing_point(timing, "send sth") if args.timing: - print >>sys.stderr, "timing: merge_dist:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_dist: %s", timing["deltatimes"]) + + if sendsth_fail: + logging.error("sendsth failed for %s", nodename) + continue return timestamp def main(): """ + Wait until 'sth' exists and read it. + Distribute missing entries and the STH to all frontend nodes. + + If `--mergeinterval', wait until 'sth' is updated and read it and + start distributing again. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_dist.lock" timestamp = 0 + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_dist.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + if len(args.node) == 0: nodes = config["frontendnodes"] else: nodes = [n for n in config["frontendnodes"] if n["name"] in args.node] + if args.mergeinterval is None: + if merge_dist(args, localconfig, nodes, timestamp) < 0: + return 1 + return 0 + + sth_path = localconfig["paths"]["mergedb"] + "/sth" + sth_statinfo = waitforfile(sth_path) while True: - timestamp = merge_dist(args, localconfig, nodes, timestamp) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + if merge_dist(args, localconfig, nodes, timestamp) < 0: + return 1 + sth_statinfo_old = sth_statinfo + while sth_statinfo == sth_statinfo_old: + sleep(args.mergeinterval / 30) + sth_statinfo = stat(sth_path) + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index db274a3..7973fae 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,10 +10,11 @@ import sys import struct import subprocess +import logging from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ - hexencode, parse_args, perm + hexencode, parse_args, perm, flock_ex_or_fail, Status from certtools import timing_point, write_file, create_ssl_context def merge_fetch(args, config, localconfig): @@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig): storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" + statusfile = mergedb + "/merge_fetch.status" + s = Status(statusfile) chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig): entries_to_fetch = {} for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() + logging.info("getting new entries from %s", storagenode["name"]) new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig): timing_point(timing, "get new entries") new_entries -= certsinlog - print >>sys.stderr, "adding", len(new_entries), "entries" - sys.stderr.flush() + logging.info("adding %d entries", len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig): added_entries = 0 for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % \ - (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() + nentries = len(entries_to_fetch[storagenode["name"]]) + logging.info("getting %d entries from %s", nentries, storagenode["name"]) for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig): logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: getting %d entries from %s: %d" % + (nentries, storagenode["name"], added_entries)) chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") - print >>sys.stderr, "added", added_entries, "entries" - sys.stderr.flush() + logging.info("added %d entries", added_entries) verifycert.communicate(struct.pack("I", 0)) if args.timing: - print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_fetch: %s", timing["deltatimes"]) tree_size = len(logorder) if tree_size == 0: @@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig): def main(): """ - Fetch new entries from all storage nodes. + Fetch new entries from all storage nodes, in sequence. - Indicate current position by writing the index in the logorder file - (0-based) to the 'fetched' file. + Indicate the current position by writing the hash and its 'logorder' + index, 0-based, to 'fetched'. - Sleep some and start over. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" + lockfile = mergedb + "/.merge_fetch.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_fetch.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} - #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize + logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) - if args.interval is None: + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + logging.debug("sleeping %d seconds", args.mergeinterval / 10) + sleep(args.mergeinterval / 10) + + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_sth.py b/tools/merge_sth.py index f4aec53..97f6e24 100755 --- a/tools/merge_sth.py +++ b/tools/merge_sth.py @@ -9,12 +9,13 @@ # import sys import json -import urllib2 import time import requests +import logging from base64 import b64encode +from datetime import datetime, timedelta from mergetools import parse_args, get_nfetched, hexencode, hexdecode, \ - get_logorder, get_sth + get_logorder, get_sth, flock_ex_or_fail from certtools import create_ssl_context, get_public_key_from_file, \ timing_point, create_sth_signature, write_file, check_sth_signature, \ build_merkle_tree @@ -39,6 +40,7 @@ def merge_sth(args, config, localconfig): trees = [{'tree_size': get_nfetched(currentsizefile, logorderfile), 'sha256_root_hash': ''}] + logging.debug("starting point, trees: %s", trees) for mergenode in mergenodes: if mergenode["name"] == config["primarymergenode"]: continue @@ -49,28 +51,29 @@ def merge_sth(args, config, localconfig): tree = {'tree_size': 0, "sha256_root_hash": ''} trees.append(tree) trees.sort(key=lambda e: e['tree_size'], reverse=True) - #print >>sys.stderr, "DEBUG: trees:", trees + logging.debug("trees: %s", trees) if backupquorum > len(trees) - 1: - print >>sys.stderr, "backup quorum > number of secondaries:", \ - backupquorum, ">", len(trees) - 1 - return + logging.error("backup quorum > number of secondaries: %d > %d", + backupquorum, len(trees) - 1) + return -1 tree_size = trees[backupquorum]['tree_size'] root_hash = hexdecode(trees[backupquorum]['sha256_root_hash']) - #print >>sys.stderr, "DEBUG: tree size candidate at backupquorum", backupquorum, ":", tree_size + logging.debug("tree size candidate at backupquorum %d: %d", backupquorum, + tree_size) cur_sth = get_sth(sthfile) if tree_size < cur_sth['tree_size']: - print >>sys.stderr, "candidate tree < current tree:", \ - tree_size, "<", cur_sth['tree_size'] - return + logging.info("candidate tree < current tree: %d < %d", + tree_size, cur_sth['tree_size']) + return 0 assert tree_size >= 0 # Don't read logorder without limit. logorder = get_logorder(logorderfile, tree_size) timing_point(timing, "get logorder") if tree_size == -1: tree_size = len(logorder) - print >>sys.stderr, "new tree size will be", tree_size + logging.info("new tree size will be %d", tree_size) root_hash_calc = build_merkle_tree(logorder)[-1][0] assert root_hash == '' or root_hash == root_hash_calc @@ -87,11 +90,10 @@ def merge_sth(args, config, localconfig): key=own_key) break except requests.exceptions.HTTPError, e: - print >>sys.stderr, e.response - sys.stderr.flush() + logging.warning("create_sth_signature error: %s", e.response) if tree_head_signature == None: - print >>sys.stderr, "Could not contact any signing nodes" - sys.exit(1) + logging.error("Could not contact any signing nodes") + return 0 sth = {"tree_size": tree_size, "timestamp": timestamp, "sha256_root_hash": b64encode(root_hash), @@ -100,34 +102,59 @@ def merge_sth(args, config, localconfig): check_sth_signature(ctbaseurl, sth, publickey=logpublickey) timing_point(timing, "build sth") - print hexencode(root_hash), timestamp, tree_size - sys.stdout.flush() + logging.info("new root: %s %d %d", hexencode(root_hash), timestamp, tree_size) write_file(sthfile, sth) if args.timing: - print >>sys.stderr, "timing: merge_sth:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_sth: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read file 'sth' to get current tree size, assuming zero if file not + Read 'sth' to get the current tree size, assuming zero if file not found. Read tree sizes from the backup.<secondary> files, put them in a - list and sort it. Let new tree size equal list[backup-quorum]. Barf - on a new tree size smaller than the currently published tree size. + list and sort the list. Let new tree size be list[backup-quorum]. If + the new tree size is smaller than the currently published tree size, + stop here. + + Decide on a timestamp, build an STH and write it to 'sth'. - Decide on a timestamp, build an STH and write it to file 'sth'. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_sth.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_sth.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 while True: - merge_sth(args, config, localconfig) - if args.interval is None: + merge_start_time = datetime.now() + ret = merge_sth(args, config, localconfig) + if ret < 0: + return 1 + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - time.sleep(args.interval) + sleep = (merge_start_time + timedelta(seconds=args.mergeinterval) - + datetime.now()).seconds + if sleep > 0: + logging.debug("sleeping %d seconds", sleep) + time.sleep(sleep) + + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/mergetools.py b/tools/mergetools.py index 80fbf0b..5471fe4 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -10,11 +10,15 @@ import json import yaml import argparse import requests +import time +import fcntl +import errno +import logging try: import permdb except ImportError: pass -from certtools import get_leaf_hash, http_request, get_leaf_hash +from certtools import get_leaf_hash, http_request def parselogrow(row): return base64.b16decode(row, casefold=True) @@ -33,7 +37,7 @@ def get_nfetched(currentsizefile, logorderfile): try: limit = json.loads(open(currentsizefile).read()) except (IOError, ValueError): - return -1 + return 0 if limit['index'] >= 0: with open(logorderfile, 'r') as f: f.seek(limit['index']*65) @@ -207,12 +211,10 @@ def get_curpos(node, baseurl, own_key, paths): publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": - return parsed_result[u"position"] - print >>sys.stderr, "ERROR: currentposition", parsed_result - sys.exit(1) + return True, parsed_result[u"position"] + return False, parsed_result except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: currentposition", e.response - sys.exit(1) + return False, e.response def get_verifiedsize(node, baseurl, own_key, paths): try: @@ -234,19 +236,16 @@ def sendlog(node, baseurl, own_key, paths, submission): result = http_request(baseurl + "plop/v1/frontend/sendlog", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.response - sys.stderr.flush() - return None + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(submission) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def backup_sendlog(node, baseurl, own_key, paths, submission): try: @@ -274,18 +273,16 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash): json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.reponse - sys.exit(1) + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, ehash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(ehash) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)]) @@ -304,7 +301,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, ehash + print >>sys.stderr, hash print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" @@ -316,18 +313,16 @@ def sendsth(node, baseurl, own_key, paths, submission): result = http_request(baseurl + "plop/v1/frontend/sendsth", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendsth", e.response - sys.exit(1) + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(submission) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def verifyroot(node, baseurl, own_key, paths, treesize): try: @@ -403,10 +398,17 @@ def parse_args(): required=True) parser.add_argument('--localconfig', help="Local configuration", required=True) - parser.add_argument('--interval', type=int, metavar="n", - help="Repeate every N seconds") + # FIXME: verify that 0 < N < 1d + parser.add_argument('--mergeinterval', type=int, metavar="n", + help="Merge every N seconds") parser.add_argument("--timing", action='store_true', help="Print timing information") + parser.add_argument("--pidfile", type=str, metavar="file", + help="Store PID in FILE") + parser.add_argument("--logdir", type=str, default=".", metavar="dir", + help="Write logfiles in DIR [default: .]") + parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level", + help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]") args = parser.parse_args() config = yaml.load(open(args.config)) @@ -421,6 +423,32 @@ def perm(dbtype, path): return PermDB(path) assert False +def waitforfile(path): + statinfo = None + while statinfo is None: + try: + statinfo = os.stat(path) + except OSError, e: + if e.errno != errno.ENOENT: + raise + time.sleep(1) + return statinfo + +def flock_ex_or_fail(path): + try: + fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) + except IOError, e: + if e.errno != errno.EWOULDBLOCK: + raise + return False + return True + +class Status: + def __init__(self, path): + self.path = path + def status(self, s): + open(self.path, 'w').write(s) + class FileDB: def __init__(self, path): self.path = path diff --git a/tools/testcase1.py b/tools/testcase1.py index 81d589a..98b9179 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -13,6 +13,7 @@ import struct import hashlib import itertools import os.path +from time import sleep from certtools import * baseurls = [sys.argv[1]] @@ -20,6 +21,9 @@ logpublickeyfile = sys.argv[2] cacertfile = sys.argv[3] toolsdir = os.path.dirname(sys.argv[0]) testdir = sys.argv[4] +do_merge = True +if len(sys.argv) > 5 and sys.argv[5] == '--nomerge': + do_merge = False certfiles = [toolsdir + ("/testcerts/cert%d.txt" % e) for e in range(1, 6)] @@ -121,7 +125,7 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl): assert_equal(len(entries), 1, "get_entries", quiet=True) fetched_entry = entries["entries"][0] merkle_tree_leaf = pack_mtl(timestamp, chain[0]) - leaf_input = base64.decodestring(fetched_entry["leaf_input"]) + leaf_input = base64.decodestring(fetched_entry["leaf_input"]) assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True) extra_data = base64.decodestring(fetched_entry["extra_data"]) certchain = decode_certificate_chain(extra_data) @@ -148,8 +152,14 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl): len(submittedcertchain)) def merge(): - return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", - "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) + if do_merge: + return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", + "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) + else: + n = 60 + print "testcase1.py: sleeping", n, "seconds waiting for merge" + sleep(n) + return 0 mergeresult = merge() assert_equal(mergeresult, 0, "merge", quiet=True, fatal=True) |