diff options
-rw-r--r-- | doc/merge.txt | 9 | ||||
-rw-r--r-- | doc/minimalsystem.txt | 15 | ||||
-rw-r--r-- | test/Makefile | 19 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-start.sh | 10 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-stop.sh | 6 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-test-run-1.sh | 56 | ||||
-rwxr-xr-x | test/scripts/continuous-merge-test.sh | 27 | ||||
-rw-r--r-- | test/scripts/testutils.sh | 2 | ||||
-rwxr-xr-x | tools/merge | 62 | ||||
-rwxr-xr-x | tools/merge_backup.py | 141 | ||||
-rwxr-xr-x | tools/merge_dist.py | 141 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 59 | ||||
-rwxr-xr-x | tools/merge_sth.py | 81 | ||||
-rw-r--r-- | tools/mergetools.py | 110 | ||||
-rwxr-xr-x | tools/testcase1.py | 16 |
15 files changed, 525 insertions, 229 deletions
diff --git a/doc/merge.txt b/doc/merge.txt index d12424f..c7d8dc9 100644 --- a/doc/merge.txt +++ b/doc/merge.txt @@ -20,6 +20,13 @@ The merge process - merge-dist distributes 'sth' and missing entries to frontend nodes. + [logorder] + v + fetch backup sth dist + v ^ v ^ v ^ + [fetched] [backup.<secondary>] [sth] + + Merge distribution (merge_dist) ----------------------------------------------------- @@ -83,8 +90,6 @@ Merge backup (merge_backup) TODO ==== -- Run the three pieces in parallell. - - Improve merge-fetch by parallellising it using one process per storage node writing to a "queue info" file (storage-node, hash) and a single "queue handling process" reading queue files and writing to the diff --git a/doc/minimalsystem.txt b/doc/minimalsystem.txt index 061b6cc..3f10784 100644 --- a/doc/minimalsystem.txt +++ b/doc/minimalsystem.txt @@ -46,18 +46,21 @@ If you want to submit all the files in a directory, name directory with a `/` at the end, for example `tools/testcerts/`. -Running merge once ------------------- +Running merge +------------- - tools/merge.py --config test/catlfish-test.cfg +To run merge once, do + + tools/merge --config test/catlfish-test.cfg --localconfig test/catlfish-test-local-merge.cfg This will read the submitted certificates from the storage node, decide the order, and publish the certificates to the frontend server. -If you want to run the system continuously, run the merge command in -cron or in a while loop. See `packaging/docker/catlfish-dev/merge.sh` -for an example of the latter. +If you want to run merge continuously, use `--mergeinterval' or run +the merge command in a while loop. See +`catlfish-dockerfiles/catlfish-dev/merge.sh` for an example of the +latter. Verifying SCT:s --------------- diff --git a/test/Makefile b/test/Makefile index bd58cfe..cee5186 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,20 +1,27 @@ PREFIX=.. INSTDIR=$(PREFIX)/catlfish +all: tests + tests-start: - ./scripts/light-system-test-start.sh + (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test-start.sh) tests-stop: - ./scripts/light-system-test-stop.sh - -tests-wait: - sleep 5 + (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test-stop.sh) tests-makemk: $(PREFIX)/tools/compileconfig.py --config=$(PREFIX)/test/catlfish-test.cfg --testshellvars=$(PREFIX)/test/test.shvars --machines 1 -tests: +tests: tests-basic + +tests-basic: @make tests-makemk rm -r $(INSTDIR)/tests || true mkdir $(INSTDIR)/tests (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test.sh) + +tests-continuous-merge: + @make tests-makemk + rm -r $(INSTDIR)/tests || true + mkdir $(INSTDIR)/tests + (cd $(INSTDIR)/tests && ../../test/scripts/continuous-merge-test.sh) diff --git a/test/scripts/continuous-merge-start.sh b/test/scripts/continuous-merge-start.sh new file mode 100755 index 0000000..666512f --- /dev/null +++ b/test/scripts/continuous-merge-start.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/scripts/testutils.sh + +do_merge --mergeinterval 30 --pidfile merge.pid diff --git a/test/scripts/continuous-merge-stop.sh b/test/scripts/continuous-merge-stop.sh new file mode 100755 index 0000000..f6632f8 --- /dev/null +++ b/test/scripts/continuous-merge-stop.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +set -o nounset + +kill $(cat merge.pid) +exit 0 diff --git a/test/scripts/continuous-merge-test-run-1.sh b/test/scripts/continuous-merge-test-run-1.sh new file mode 100755 index 0000000..537ff2f --- /dev/null +++ b/test/scripts/continuous-merge-test-run-1.sh @@ -0,0 +1,56 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/scripts/testutils.sh + +python ${top_srcdir}/tools/testcase1.py https://localhost:8080/ keys/logkey.pem httpsca/demoCA/cacert.pem ${top_srcdir}/test --nomerge || fail "Tests failed" +check_sth +python ${top_srcdir}/tools/fetchallcerts.py ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Verification failed" +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" +for certfile in ${top_srcdir}/tools/testcerts/cert[1-5].txt ${top_srcdir}/tools/testcerts/pre[12].txt; do + python ${top_srcdir}/tools/submitcert.py --parallel=1 --store $certfile --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed" +done +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" +echo "$0: sleeping for 60 seconds" +sleep 60 +check_sth + +assert_equal "Tree size" "$(get_treesize)" 7 + +mkdir fetchcertstore +python ${top_srcdir}/tools/fetchallcerts.py ${BASEURL} --store fetchcertstore --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Verification failed" +(cd fetchcertstore && unzip 0000.zip) + +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert1.txt fetchcertstore/00000000 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert2.txt fetchcertstore/00000001 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert3.txt fetchcertstore/00000002 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert4.txt fetchcertstore/00000003 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/cert5.txt fetchcertstore/00000004 || fail "Verification failed" +python ${top_srcdir}/tools/comparecert.py ${top_srcdir}/tools/testcerts/pre1.txt:${top_srcdir}/tools/testcerts/pre2.txt fetchcertstore/00000005:fetchcertstore/00000006 || fail"Verification failed" +python ${top_srcdir}/tools/storagegc.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-1.cfg || fail "GC failed" + +echo NOTE: merge backup should fail with 111 Connection refused +${top_srcdir}/tools/to_catlfish.py to_erl nodes/merge-2/ "init:stop()" +python ${top_srcdir}/tools/submitcert.py --parallel=1 --store ${top_srcdir}/tools/testcerts/cert6.txt --check-sct --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || fail "Submission failed" + +echo "$0: sleeping for 60 seconds" +sleep 60 +assert_equal "Tree size" "$(get_treesize)" 7 + +check_sth +../bin/run_erl -daemon nodes/merge-2/ nodes/merge-2/log/ "exec ../bin/erl -config merge-2" +for i in 1 2 3 4 5 6 7 8 9 10; do + echo "waiting for system to start" ; \ + sleep 0.5 ; \ + if curl -s --cacert httpsca/demoCA/cacert.pem -4 https://localhost:8181 > /dev/null ; then break; fi +done + +echo "$0: sleeping for 60 seconds" +sleep 60 +assert_equal "Tree size" "$(get_treesize)" 8 + +check_sth diff --git a/test/scripts/continuous-merge-test.sh b/test/scripts/continuous-merge-test.sh new file mode 100755 index 0000000..3bb6de2 --- /dev/null +++ b/test/scripts/continuous-merge-test.sh @@ -0,0 +1,27 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/test.shvars + +SCRIPTS=${top_srcdir}/test/scripts + +tests_start() { + ${SCRIPTS}/light-system-test-start.sh + ${SCRIPTS}/continuous-merge-start.sh & +} + +tests_stop() { + ${SCRIPTS}/continuous-merge-stop.sh + ${SCRIPTS}/light-system-test-stop.sh +} + +echo "$0: starting" +${SCRIPTS}/light-system-test-prepare.sh +tests_start +${SCRIPTS}/continuous-merge-test-run-1.sh || (echo "Tests failed"; sleep 5; tests_stop; false) +sleep 5 +tests_stop diff --git a/test/scripts/testutils.sh b/test/scripts/testutils.sh index 94d6223..908c990 100644 --- a/test/scripts/testutils.sh +++ b/test/scripts/testutils.sh @@ -18,5 +18,5 @@ check_sth() { } do_merge() { - ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed" + ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg $@ || fail "Merge failed" } diff --git a/tools/merge b/tools/merge index b5a50d5..0d1bf0e 100755 --- a/tools/merge +++ b/tools/merge @@ -1,10 +1,58 @@ -#! /bin/sh +#! /usr/bin/env python +"""merge""" -set -o errexit +import os +import sys +import signal +from time import sleep +from mergetools import parse_args +from multiprocessing import Process +import merge_fetch, merge_backup, merge_sth, merge_dist -BINDIR=$(dirname $0) +def run_once(): + """Merge once.""" + ret = merge_fetch.main() + if ret == 0: + ret = merge_backup.main() + if ret == 0: + ret = merge_sth.main() + if ret == 0: + ret = merge_dist.main() + return ret -$BINDIR/merge_fetch.py "$@" -$BINDIR/merge_backup.py "$@" -$BINDIR/merge_sth.py "$@" -$BINDIR/merge_dist.py "$@" +def term(signal, arg): + sys.exit(1) + +def run_continuously(pidfilepath): + """Run continuously.""" + parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist) + procs = {} + for part, mod in parts: + procs[part] = Process(target=mod.main, name='merge_%s' % part) + procs[part].daemon = True + procs[part].start() + + if pidfilepath: + open(pidfilepath, 'w').write(str(os.getpid()) + '\n') + + signal.signal(signal.SIGTERM, term) + while True: + for name, p in procs.items(): + if not p.is_alive(): + print >>sys.stderr, "\nERROR:", name, "process is gone; exiting" + return 1 + sleep(1) + +def main(): + """Main""" + args, _, _ = parse_args() + + if args.mergeinterval is None: + ret = run_once() + else: + ret = run_continuously(args.pidfile) + + return ret + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tools/merge_backup.py b/tools/merge_backup.py index 4f688c3..723fc7a 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -11,13 +11,16 @@ import sys import base64 import select import requests +import errno +import logging from time import sleep +from os import stat from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ get_verifiedsize, get_missingentriesforbackup, \ hexencode, setverifiedsize, sendentries_merge, verifyroot, \ - get_nfetched, parse_args, perm + get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): for trynumber in range(5, 0, -1): @@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): if trynumber == 1: return None select.select([], [], [], 10.0) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() + logging.info("tries left: %d", trynumber) continue return sendlogresult - def merge_backup(args, config, localconfig, secondaries): paths = localconfig["paths"] own_key = (localconfig["nodename"], @@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" currentsizefile = mergedb + "/fetched" + statusfile = mergedb + "/merge_backup.status" + s = Status(statusfile) timing = timing_point() nfetched = get_nfetched(currentsizefile, logorderfile) @@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries): nodeaddress = "https://%s/" % secondary["address"] nodename = secondary["name"] timing = timing_point() - print >>sys.stderr, "backing up to node", nodename - sys.stderr.flush() - verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + logging.info("backing up to node %s", nodename) + try: + verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when getting verified size from %s", nodename) + return 1 timing_point(timing, "get verified size") - print >>sys.stderr, "verified size", verifiedsize - sys.stderr.flush() + logging.info("verified size %d", verifiedsize) entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "determining end of log:", + logging.info("determining end of log") for chunk in chunks(entries, 100000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) + logging.error("sendlog result was None") + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() + s.status("INFO: determining end of log: %d" % verifiedsize) if verifiedsize > 100000: verifiedsize -= 100000 else: verifiedsize = 0 + logging.info("end of log determined") timing_point(timing, "checklog") entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log") for chunk in chunks(entries, 1000): sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) if sendlogresult == None: - sys.exit(1) + return 1 if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) + logging.error("backup_sendlog: %s", sendlogresult) + return 1 verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sending log: %d" % verifiedsize) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + logging.info("log sent") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("missing entries: %d", len(missingentries)) fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() + logging.info("fetching missing entries") with requests.sessions.Session() as session: for missingentry_chunk in chunks(missingentries, 100): missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] @@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries): own_key, paths, hashes_and_entries, session) if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) + logging.error("sendentry_merge: %s", sendentryresult) + return 1 fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: fetching missing entries: %d" % fetched_entries) timing_point(timing, "send missing") missingentries = get_missingentriesforbackup(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) + try: + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + except requests.exceptions.ConnectionError, e: + logging.error("connection error when verifying root at %s", nodename) + return 1 if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) + logging.error("verifyroot: %s", verifyrootresult) + return 1 secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) + logging.error("secondary root hash was %s, expected %s", + hexencode(secondary_root_hash), + hexencode(root_hash)) + return 1 timing_point(timing, "verifyroot") setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} - #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata + logging.debug("writing to %s: %s", backuppath, backupdata) write_file(backuppath, backupdata) if args.timing: - print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_backup: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read logorder file up until what's indicated by fetched file and - build the tree. + Wait until 'fetched' exists and read it. + + Read 'logorder' up until what's indicated by 'fetched' and build the + tree. Distribute entries to all secondaries, write tree size and tree head - to backup.<secondary> files as each secondary is verified to have + to 'backup.<secondary>' files as each secondary is verified to have the entries. - Sleep some and start over. + If `--mergeinterval', wait until 'fetched' is updated and read it + and start over from the point where 'logorder' is read. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_backup.lock" + fetched_path = mergedb + "/fetched" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_backup.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + all_secondaries = \ [n for n in config.get('mergenodes', []) if \ n['name'] != config['primarymergenode']] - paths = localconfig["paths"] create_ssl_context(cafile=paths["https_cacertfile"]) if len(args.node) == 0: @@ -187,12 +206,20 @@ def main(): else: nodes = [n for n in all_secondaries if n["name"] in args.node] + if args.mergeinterval is None: + return merge_backup(args, config, localconfig, nodes) + + fetched_statinfo = waitforfile(fetched_path) + while True: - merge_backup(args, config, localconfig, nodes) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + err = merge_backup(args, config, localconfig, nodes) + if err != 0: + return err + fetched_statinfo_old = fetched_statinfo + while fetched_statinfo == fetched_statinfo_old: + sleep(args.mergeinterval / 30) + fetched_statinfo = stat(fetched_path) + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_dist.py b/tools/merge_dist.py index a9b5c60..7a13bfa 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -9,12 +9,15 @@ # import sys import json +import logging from time import sleep from base64 import b64encode, b64decode +from os import stat from certtools import timing_point, \ create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ - sendsth, sendlog, sendentry, parse_args, perm + sendsth, sendlog, sendentry, parse_args, perm, waitforfile, \ + flock_ex_or_fail, Status def merge_dist(args, localconfig, frontendnodes, timestamp): paths = localconfig["paths"] @@ -25,17 +28,19 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") logorderfile = mergedb + "/logorder" sthfile = mergedb + "/sth" + statusfile = mergedb + "/merge_dist.status" + s = Status(statusfile) create_ssl_context(cafile=paths["https_cacertfile"]) timing = timing_point() try: sth = json.loads(open(sthfile, 'r').read()) except (IOError, ValueError): - print >>sys.stderr, "No valid STH file found in", sthfile + logging.warning("No valid STH file found in %s", sthfile) return timestamp if sth['timestamp'] < timestamp: - print >>sys.stderr, "New STH file older than the previous one:", \ - sth['timestamp'], "<", timestamp + logging.warning("New STH file older than the previous one: %d < %d", + sth['timestamp'], timestamp) return timestamp if sth['timestamp'] == timestamp: return timestamp @@ -49,96 +54,122 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): nodename = frontendnode["name"] timing = timing_point() - print >>sys.stderr, "distributing for node", nodename - sys.stderr.flush() - curpos = get_curpos(nodename, nodeaddress, own_key, paths) + logging.info("distributing for node %s", nodename) + ok, curpos = get_curpos(nodename, nodeaddress, own_key, paths) + if not ok: + logging.error("get_curpos: %s", curpos) + continue timing_point(timing, "get curpos") - print >>sys.stderr, "current position", curpos - sys.stderr.flush() + logging.info("current position %d", curpos) entries = [b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() + logging.info("sending log: %d", len(entries)) + sendlog_fail = False for chunk in chunks(entries, 1000): for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, - own_key, paths, - {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - sleep(10) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue + ok, sendlogresult = sendlog(nodename, nodeaddress, own_key, paths, + {"start": curpos, "hashes": chunk}) + if ok: + break + sleep(10) + logging.warning("tries left: %d", trynumber) + if not ok or sendlogresult.get("result") != "ok": + logging.error("sendlog: %s", sendlogresult) + sendlog_fail = True break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr + s.status("INFO: sendlog %d" % curpos) timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + if sendlog_fail: + logging.error("sendlog failed for %s", nodename) + continue missingentries = get_missingentries(nodename, nodeaddress, own_key, paths) timing_point(timing, "get missing") - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + logging.info("sending missing entries: %d", len(missingentries)) sent_entries = 0 - print >>sys.stderr, "send missing entries", - sys.stderr.flush() + sendentry_fail = False for missingentry in missingentries: ehash = b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, - chainsdb.get(ehash), ehash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry:", sendentryresult - sys.exit(1) + ok, sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, + chainsdb.get(ehash), ehash) + if not ok or sendentryresult.get("result") != "ok": + logging.error("sendentry: %s", sendentryresult) + sendentry_fail = True + break sent_entries += 1 if sent_entries % 1000 == 0: - print >>sys.stderr, sent_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: sendentry %d" % sent_entries) timing_point(timing, "send missing") + if sendentry_fail: + logging.error("sendentry failed for %s", nodename) + continue - print >>sys.stderr, "sending sth to node", nodename - sys.stderr.flush() - sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "sendsth:", sendsthresult - sys.exit(1) + logging.info("sending sth to node %s", nodename) + sendsth_fail = False + ok, sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) + if not ok or sendsthresult.get("result") != "ok": + logging.error("sendsth: %s", sendsthresult) + sendsth_fail = True timing_point(timing, "send sth") if args.timing: - print >>sys.stderr, "timing: merge_dist:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_dist: %s", timing["deltatimes"]) + + if sendsth_fail: + logging.error("sendsth failed for %s", nodename) + continue return timestamp def main(): """ + Wait until 'sth' exists and read it. + Distribute missing entries and the STH to all frontend nodes. + + If `--mergeinterval', wait until 'sth' is updated and read it and + start distributing again. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_dist.lock" timestamp = 0 + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_dist.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + if len(args.node) == 0: nodes = config["frontendnodes"] else: nodes = [n for n in config["frontendnodes"] if n["name"] in args.node] + if args.mergeinterval is None: + if merge_dist(args, localconfig, nodes, timestamp) < 0: + return 1 + return 0 + + sth_path = localconfig["paths"]["mergedb"] + "/sth" + sth_statinfo = waitforfile(sth_path) while True: - timestamp = merge_dist(args, localconfig, nodes, timestamp) - if args.interval is None: - break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + if merge_dist(args, localconfig, nodes, timestamp) < 0: + return 1 + sth_statinfo_old = sth_statinfo + while sth_statinfo == sth_statinfo_old: + sleep(args.mergeinterval / 30) + sth_statinfo = stat(sth_path) + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index db274a3..7973fae 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -10,10 +10,11 @@ import sys import struct import subprocess +import logging from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ - hexencode, parse_args, perm + hexencode, parse_args, perm, flock_ex_or_fail, Status from certtools import timing_point, write_file, create_ssl_context def merge_fetch(args, config, localconfig): @@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig): storagenodes = config["storagenodes"] mergedb = paths["mergedb"] logorderfile = mergedb + "/logorder" + statusfile = mergedb + "/merge_fetch.status" + s = Status(statusfile) chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains") own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig): entries_to_fetch = {} for storagenode in storagenodes: - print >>sys.stderr, "getting new entries from", storagenode["name"] - sys.stderr.flush() + logging.info("getting new entries from %s", storagenode["name"]) new_entries_per_node[storagenode["name"]] = \ set(get_new_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig): timing_point(timing, "get new entries") new_entries -= certsinlog - print >>sys.stderr, "adding", len(new_entries), "entries" - sys.stderr.flush() + logging.info("adding %d entries", len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig): added_entries = 0 for storagenode in storagenodes: - print >>sys.stderr, "getting %d entries from %s:" % \ - (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), - sys.stderr.flush() + nentries = len(entries_to_fetch[storagenode["name"]]) + logging.info("getting %d entries from %s", nentries, storagenode["name"]) for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): entries = get_entries(storagenode["name"], "https://%s/" % storagenode["address"], @@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig): logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() + s.status("INFO: getting %d entries from %s: %d" % + (nentries, storagenode["name"], added_entries)) chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") - print >>sys.stderr, "added", added_entries, "entries" - sys.stderr.flush() + logging.info("added %d entries", added_entries) verifycert.communicate(struct.pack("I", 0)) if args.timing: - print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_fetch: %s", timing["deltatimes"]) tree_size = len(logorder) if tree_size == 0: @@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig): def main(): """ - Fetch new entries from all storage nodes. + Fetch new entries from all storage nodes, in sequence. - Indicate current position by writing the index in the logorder file - (0-based) to the 'fetched' file. + Indicate the current position by writing the hash and its 'logorder' + index, 0-based, to 'fetched'. - Sleep some and start over. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() paths = localconfig["paths"] mergedb = paths["mergedb"] currentsizefile = mergedb + "/fetched" + lockfile = mergedb + "/.merge_fetch.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_fetch.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 + create_ssl_context(cafile=paths["https_cacertfile"]) while True: logsize, last_hash = merge_fetch(args, config, localconfig) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} - #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize + logging.debug("writing to %s: %s", currentsizefile, currentsize) write_file(currentsizefile, currentsize) - if args.interval is None: + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - sleep(args.interval) + logging.debug("sleeping %d seconds", args.mergeinterval / 10) + sleep(args.mergeinterval / 10) + + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/merge_sth.py b/tools/merge_sth.py index f4aec53..97f6e24 100755 --- a/tools/merge_sth.py +++ b/tools/merge_sth.py @@ -9,12 +9,13 @@ # import sys import json -import urllib2 import time import requests +import logging from base64 import b64encode +from datetime import datetime, timedelta from mergetools import parse_args, get_nfetched, hexencode, hexdecode, \ - get_logorder, get_sth + get_logorder, get_sth, flock_ex_or_fail from certtools import create_ssl_context, get_public_key_from_file, \ timing_point, create_sth_signature, write_file, check_sth_signature, \ build_merkle_tree @@ -39,6 +40,7 @@ def merge_sth(args, config, localconfig): trees = [{'tree_size': get_nfetched(currentsizefile, logorderfile), 'sha256_root_hash': ''}] + logging.debug("starting point, trees: %s", trees) for mergenode in mergenodes: if mergenode["name"] == config["primarymergenode"]: continue @@ -49,28 +51,29 @@ def merge_sth(args, config, localconfig): tree = {'tree_size': 0, "sha256_root_hash": ''} trees.append(tree) trees.sort(key=lambda e: e['tree_size'], reverse=True) - #print >>sys.stderr, "DEBUG: trees:", trees + logging.debug("trees: %s", trees) if backupquorum > len(trees) - 1: - print >>sys.stderr, "backup quorum > number of secondaries:", \ - backupquorum, ">", len(trees) - 1 - return + logging.error("backup quorum > number of secondaries: %d > %d", + backupquorum, len(trees) - 1) + return -1 tree_size = trees[backupquorum]['tree_size'] root_hash = hexdecode(trees[backupquorum]['sha256_root_hash']) - #print >>sys.stderr, "DEBUG: tree size candidate at backupquorum", backupquorum, ":", tree_size + logging.debug("tree size candidate at backupquorum %d: %d", backupquorum, + tree_size) cur_sth = get_sth(sthfile) if tree_size < cur_sth['tree_size']: - print >>sys.stderr, "candidate tree < current tree:", \ - tree_size, "<", cur_sth['tree_size'] - return + logging.info("candidate tree < current tree: %d < %d", + tree_size, cur_sth['tree_size']) + return 0 assert tree_size >= 0 # Don't read logorder without limit. logorder = get_logorder(logorderfile, tree_size) timing_point(timing, "get logorder") if tree_size == -1: tree_size = len(logorder) - print >>sys.stderr, "new tree size will be", tree_size + logging.info("new tree size will be %d", tree_size) root_hash_calc = build_merkle_tree(logorder)[-1][0] assert root_hash == '' or root_hash == root_hash_calc @@ -87,11 +90,10 @@ def merge_sth(args, config, localconfig): key=own_key) break except requests.exceptions.HTTPError, e: - print >>sys.stderr, e.response - sys.stderr.flush() + logging.warning("create_sth_signature error: %s", e.response) if tree_head_signature == None: - print >>sys.stderr, "Could not contact any signing nodes" - sys.exit(1) + logging.error("Could not contact any signing nodes") + return 0 sth = {"tree_size": tree_size, "timestamp": timestamp, "sha256_root_hash": b64encode(root_hash), @@ -100,34 +102,59 @@ def merge_sth(args, config, localconfig): check_sth_signature(ctbaseurl, sth, publickey=logpublickey) timing_point(timing, "build sth") - print hexencode(root_hash), timestamp, tree_size - sys.stdout.flush() + logging.info("new root: %s %d %d", hexencode(root_hash), timestamp, tree_size) write_file(sthfile, sth) if args.timing: - print >>sys.stderr, "timing: merge_sth:", timing["deltatimes"] - sys.stderr.flush() + logging.debug("timing: merge_sth: %s", timing["deltatimes"]) + + return 0 def main(): """ - Read file 'sth' to get current tree size, assuming zero if file not + Read 'sth' to get the current tree size, assuming zero if file not found. Read tree sizes from the backup.<secondary> files, put them in a - list and sort it. Let new tree size equal list[backup-quorum]. Barf - on a new tree size smaller than the currently published tree size. + list and sort the list. Let new tree size be list[backup-quorum]. If + the new tree size is smaller than the currently published tree size, + stop here. + + Decide on a timestamp, build an STH and write it to 'sth'. - Decide on a timestamp, build an STH and write it to file 'sth'. + Sleep some and start over, or exit if there's no `--mergeinterval'. """ args, config, localconfig = parse_args() + paths = localconfig["paths"] + mergedb = paths["mergedb"] + lockfile = mergedb + "/.merge_sth.lock" + + loglevel = getattr(logging, args.loglevel.upper()) + if args.mergeinterval is None: + logging.basicConfig(level=loglevel) + else: + logging.basicConfig(filename=args.logdir + "/merge_sth.log", + level=loglevel) + + if not flock_ex_or_fail(lockfile): + logging.critical("unable to take lock %s", lockfile) + return 1 while True: - merge_sth(args, config, localconfig) - if args.interval is None: + merge_start_time = datetime.now() + ret = merge_sth(args, config, localconfig) + if ret < 0: + return 1 + if args.mergeinterval is None: break - print >>sys.stderr, "sleeping", args.interval, "seconds" - time.sleep(args.interval) + sleep = (merge_start_time + timedelta(seconds=args.mergeinterval) - + datetime.now()).seconds + if sleep > 0: + logging.debug("sleeping %d seconds", sleep) + time.sleep(sleep) + + return 0 if __name__ == '__main__': sys.exit(main()) diff --git a/tools/mergetools.py b/tools/mergetools.py index 80fbf0b..5471fe4 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -10,11 +10,15 @@ import json import yaml import argparse import requests +import time +import fcntl +import errno +import logging try: import permdb except ImportError: pass -from certtools import get_leaf_hash, http_request, get_leaf_hash +from certtools import get_leaf_hash, http_request def parselogrow(row): return base64.b16decode(row, casefold=True) @@ -33,7 +37,7 @@ def get_nfetched(currentsizefile, logorderfile): try: limit = json.loads(open(currentsizefile).read()) except (IOError, ValueError): - return -1 + return 0 if limit['index'] >= 0: with open(logorderfile, 'r') as f: f.seek(limit['index']*65) @@ -207,12 +211,10 @@ def get_curpos(node, baseurl, own_key, paths): publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": - return parsed_result[u"position"] - print >>sys.stderr, "ERROR: currentposition", parsed_result - sys.exit(1) + return True, parsed_result[u"position"] + return False, parsed_result except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: currentposition", e.response - sys.exit(1) + return False, e.response def get_verifiedsize(node, baseurl, own_key, paths): try: @@ -234,19 +236,16 @@ def sendlog(node, baseurl, own_key, paths, submission): result = http_request(baseurl + "plop/v1/frontend/sendlog", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendlog", e.response - sys.stderr.flush() - return None + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(submission) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def backup_sendlog(node, baseurl, own_key, paths, submission): try: @@ -274,18 +273,16 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash): json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.reponse - sys.exit(1) + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, ehash - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(ehash) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)]) @@ -304,7 +301,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, ehash + print >>sys.stderr, hash print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" @@ -316,18 +313,16 @@ def sendsth(node, baseurl, own_key, paths, submission): result = http_request(baseurl + "plop/v1/frontend/sendsth", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) - return json.loads(result) + return True, json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendsth", e.response - sys.exit(1) + return False, e.response except ValueError, e: - print >>sys.stderr, "==== FAILED REQUEST ====" - print >>sys.stderr, submission - print >>sys.stderr, "======= RESPONSE =======" - print >>sys.stderr, result - print >>sys.stderr, "========================" - sys.stderr.flush() - raise e + logging.error("==== FAILED REQUEST ====") + logging.error(submission) + logging.error("======= RESPONSE =======") + logging.error(result) + logging.error("========================") + return False, e def verifyroot(node, baseurl, own_key, paths, treesize): try: @@ -403,10 +398,17 @@ def parse_args(): required=True) parser.add_argument('--localconfig', help="Local configuration", required=True) - parser.add_argument('--interval', type=int, metavar="n", - help="Repeate every N seconds") + # FIXME: verify that 0 < N < 1d + parser.add_argument('--mergeinterval', type=int, metavar="n", + help="Merge every N seconds") parser.add_argument("--timing", action='store_true', help="Print timing information") + parser.add_argument("--pidfile", type=str, metavar="file", + help="Store PID in FILE") + parser.add_argument("--logdir", type=str, default=".", metavar="dir", + help="Write logfiles in DIR [default: .]") + parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level", + help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]") args = parser.parse_args() config = yaml.load(open(args.config)) @@ -421,6 +423,32 @@ def perm(dbtype, path): return PermDB(path) assert False +def waitforfile(path): + statinfo = None + while statinfo is None: + try: + statinfo = os.stat(path) + except OSError, e: + if e.errno != errno.ENOENT: + raise + time.sleep(1) + return statinfo + +def flock_ex_or_fail(path): + try: + fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) + except IOError, e: + if e.errno != errno.EWOULDBLOCK: + raise + return False + return True + +class Status: + def __init__(self, path): + self.path = path + def status(self, s): + open(self.path, 'w').write(s) + class FileDB: def __init__(self, path): self.path = path diff --git a/tools/testcase1.py b/tools/testcase1.py index 81d589a..98b9179 100755 --- a/tools/testcase1.py +++ b/tools/testcase1.py @@ -13,6 +13,7 @@ import struct import hashlib import itertools import os.path +from time import sleep from certtools import * baseurls = [sys.argv[1]] @@ -20,6 +21,9 @@ logpublickeyfile = sys.argv[2] cacertfile = sys.argv[3] toolsdir = os.path.dirname(sys.argv[0]) testdir = sys.argv[4] +do_merge = True +if len(sys.argv) > 5 and sys.argv[5] == '--nomerge': + do_merge = False certfiles = [toolsdir + ("/testcerts/cert%d.txt" % e) for e in range(1, 6)] @@ -121,7 +125,7 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl): assert_equal(len(entries), 1, "get_entries", quiet=True) fetched_entry = entries["entries"][0] merkle_tree_leaf = pack_mtl(timestamp, chain[0]) - leaf_input = base64.decodestring(fetched_entry["leaf_input"]) + leaf_input = base64.decodestring(fetched_entry["leaf_input"]) assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True) extra_data = base64.decodestring(fetched_entry["extra_data"]) certchain = decode_certificate_chain(extra_data) @@ -148,8 +152,14 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl): len(submittedcertchain)) def merge(): - return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", - "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) + if do_merge: + return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg", + "--localconfig", testdir + "/catlfish-test-local-merge.cfg"]) + else: + n = 60 + print "testcase1.py: sleeping", n, "seconds waiting for merge" + sleep(n) + return 0 mergeresult = merge() assert_equal(mergeresult, 0, "merge", quiet=True, fatal=True) |