diff options
-rw-r--r-- | src/catlfish_compat.erl | 10 | ||||
-rw-r--r-- | src/catlfish_web.erl | 8 | ||||
-rw-r--r-- | test/Makefile | 6 | ||||
-rwxr-xr-x | test/scripts/light-system-test-prepare.sh | 16 | ||||
-rwxr-xr-x | test/scripts/perf-test.sh | 71 | ||||
-rw-r--r-- | test/scripts/testutils.sh | 2 | ||||
-rw-r--r-- | tools/certtools.py | 15 | ||||
-rwxr-xr-x | tools/compileconfig.py | 3 | ||||
-rwxr-xr-x | tools/merge_backup.py | 166 | ||||
-rwxr-xr-x | tools/merge_dist.py | 134 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 30 | ||||
-rw-r--r-- | tools/mergetools.py | 55 | ||||
-rwxr-xr-x | tools/parsebench.py | 62 |
13 files changed, 379 insertions, 199 deletions
diff --git a/src/catlfish_compat.erl b/src/catlfish_compat.erl index 183eb44..6dab325 100644 --- a/src/catlfish_compat.erl +++ b/src/catlfish_compat.erl @@ -10,6 +10,11 @@ poison_val(Value) -> poison_val(erlang:system_info(otp_release), Value). %% @doc Dig out alg, params and key from issuer. +unpack_issuer("R16" ++ _, Issuer) -> + #'SubjectPublicKeyInfo'{ + algorithm = #'AlgorithmIdentifier'{algorithm = Alg, parameters = Params}, + subjectPublicKey = {0, Key}} = Issuer, + {Alg, Params, Key}; unpack_issuer("17", Issuer) -> #'SubjectPublicKeyInfo'{ algorithm = #'AlgorithmIdentifier'{algorithm = Alg, parameters = Params}, @@ -22,6 +27,9 @@ unpack_issuer("18", Issuer) -> {Alg, Params, Key}. %% @doc Unpack a #'Certificate'.signature, return the signature. +unpack_signature("R16" ++ _, Signature) -> + {_, Sig} = Signature, + Sig; unpack_signature("17", Signature) -> {_, Sig} = Signature, Sig; @@ -29,6 +37,8 @@ unpack_signature("18", Signature) -> Signature. %% Use a list for R17 and a binary for newer versions. +poison_val("R16" ++ _, Val) -> + Val; poison_val("17", Val) -> Val; poison_val("18", Val) -> diff --git a/src/catlfish_web.erl b/src/catlfish_web.erl index f44745d..12441cf 100644 --- a/src/catlfish_web.erl +++ b/src/catlfish_web.erl @@ -43,14 +43,16 @@ loop(Req, Module) -> {403, [{"Content-Type", "text/plain"}], "Invalid credentials"}; success -> + lager:info("GET ~p", [Path]), lager:debug("GET ~p ~p", [Path, Query]), add_auth(Path, Module:request(get, App, Fun, Query)); noauth -> + lager:info("GET ~p", [Path]), lager:debug("GET ~p ~p", [Path, Query]), Module:request(get, App, Fun, Query) end, - lager:debug("GET finished: ~p us", + lager:info("GET finished: ~p us", [timer:now_diff(os:timestamp(), Starttime)]), case Result of none -> @@ -67,14 +69,16 @@ loop(Req, Module) -> {403, [{"Content-Type", "text/plain"}], "Invalid credentials"}; success -> + lager:info("POST ~p", [Path]), lager:debug("POST ~p ~p", [Path, Body]), add_auth(Path, Module:request(post, App, Fun, Body)); noauth -> + lager:info("POST ~p", [Path]), lager:debug("POST ~p ~p", [Path, Body]), Module:request(post, App, Fun, Body) end, - lager:debug("POST finished: ~p us", + lager:info("POST finished: ~p us", [timer:now_diff(os:timestamp(), Starttime)]), case Result of none -> diff --git a/test/Makefile b/test/Makefile index bd58cfe..c92c30d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -18,3 +18,9 @@ tests: rm -r $(INSTDIR)/tests || true mkdir $(INSTDIR)/tests (cd $(INSTDIR)/tests && ../../test/scripts/light-system-test.sh) + +perf-tests: + @make tests-makemk + rm -r $(INSTDIR)/tests || true + mkdir $(INSTDIR)/tests + (cd $(INSTDIR)/tests && ../../test/scripts/perf-test.sh) diff --git a/test/scripts/light-system-test-prepare.sh b/test/scripts/light-system-test-prepare.sh index 466f3aa..6f6dd07 100755 --- a/test/scripts/light-system-test-prepare.sh +++ b/test/scripts/light-system-test-prepare.sh @@ -39,7 +39,7 @@ createcert () { createca createcert mkdir keys -(cd keys ; ../../../tools/create-key.sh logkey) +(cd keys ; ${top_srcdir}/tools/create-key.sh logkey) openssl pkcs8 -topk8 -nocrypt -in keys/logkey-private.pem -out keys/logkey-private.pkcs8 mkdir mergedb touch mergedb/logorder @@ -47,24 +47,24 @@ mkdir mergedb-secondary touch mergedb-secondary/logorder printf 0 > mergedb-secondary/verifiedsize mkdir known_roots -cp ../../tools/testcerts/roots/* known_roots +cp ${top_srcdir}/tools/testcerts/roots/* known_roots for machine in ${MACHINES}; do \ - ../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-${machine}.cfg + ${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-${machine}.cfg mkdir -p machine/machine-${machine}/db touch machine/machine-${machine}/db/index && touch machine/machine-${machine}/db/newentries done -../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-merge-2.cfg -../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-signing.cfg +${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge-2.cfg +${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-signing.cfg mkdir privatekeys mkdir publickeys for node in ${NODES}; do \ - (cd privatekeys ; ../../../tools/create-key.sh ${node}) ; \ + (cd privatekeys ; ${top_srcdir}/tools/create-key.sh ${node}) ; \ mv privatekeys/${node}.pem publickeys/ ; \ mkdir -p nodes/${node}/log done -(cd privatekeys ; ../../../tools/create-key.sh merge-1) +(cd privatekeys ; ${top_srcdir}/tools/create-key.sh merge-1) mv privatekeys/merge-1.pem publickeys/ -(cd privatekeys ; ../../../tools/create-key.sh merge-2) +(cd privatekeys ; ${top_srcdir}/tools/create-key.sh merge-2) mv privatekeys/merge-2.pem publickeys/ test -x ${SOFTHSM} && ${SOFTHSM} --init-token --slot=0 --label=mylabel --so-pin=ffff --pin=ffff || true test -x ${SOFTHSM} && ${SOFTHSM} --import keys/logkey-private.pkcs8 --slot 0 --label mylabel --pin ffff --id 00 || true diff --git a/test/scripts/perf-test.sh b/test/scripts/perf-test.sh new file mode 100755 index 0000000..aaf3b1d --- /dev/null +++ b/test/scripts/perf-test.sh @@ -0,0 +1,71 @@ +#!/bin/sh + +set -o nounset +set -o errexit + +top_srcdir=$(cd $(dirname $0)/../..; pwd) + +. ${top_srcdir}/test/scripts/testutils.sh + +SCRIPTS=${top_srcdir}/test/scripts + +tests_start() { + ${SCRIPTS}/light-system-test-start.sh +} + +tests_stop() { + ${SCRIPTS}/light-system-test-stop.sh +} + +${SCRIPTS}/light-system-test-prepare.sh + +cp ${top_srcdir}/test/known_roots/* known_roots + +tests_start + +do_merge +check_sth + +assert_equal "Tree size" "$(get_treesize)" 0 + +python ${top_srcdir}/tools/submitcert.py --parallel=1 --store ${top_srcdir}/test/bulktestcerts/0000.zip --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || (tests_stop ; fail "Submission failed") + + +do_merge 2> bench-1 || (tests_stop ; fail "Merge failed") +check_sth || (tests_stop ; fail "Check STH failed") + +sleep 5 +tests_stop +sleep 5 + +mv mergedb mergedb-down +mv mergedb-secondary mergedb +mkdir mergedb-secondary +touch mergedb-secondary/logorder +printf 0 > mergedb-secondary/verifiedsize + +tests_start + +do_merge 2> bench-2 || (tests_stop ; fail "Merge failed") +check_sth || (tests_stop ; fail "Check STH failed") + +sleep 5 +tests_stop +sleep 5 + +mv machine/machine-1 machine/machine-1-down +mkdir -p machine/machine-1/db +touch machine/machine-1/db/index +touch machine/machine-1/db/newentries + +tests_start + +do_merge 2> bench-3 || (tests_stop ; fail "Merge failed") +check_sth || (tests_stop ; fail "Check STH failed") + +sleep 5 +tests_stop +sleep 5 + +grep timing: bench-[123] > bench.txt +${top_srcdir}/tools/parsebench.py bench.txt > bench.html diff --git a/test/scripts/testutils.sh b/test/scripts/testutils.sh index 94d6223..e779e07 100644 --- a/test/scripts/testutils.sh +++ b/test/scripts/testutils.sh @@ -18,5 +18,5 @@ check_sth() { } do_merge() { - ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed" + ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --timing --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed" } diff --git a/tools/certtools.py b/tools/certtools.py index 1523c97..0009d5d 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -108,13 +108,16 @@ def urlget(url, params=None): pass return requests.get(url, verify=sslparameters.cafile, params=params) -def urlpost(url, data): +def urlpost(url, data, session=None): with warnings.catch_warnings(): try: warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.SubjectAltNameWarning) except AttributeError: pass - return requests.post(url, data=data, verify=sslparameters.cafile) + if session: + return session.post(url, data=data, verify=sslparameters.cafile) + else: + return requests.post(url, data=data, verify=sslparameters.cafile) def get_sth(baseurl): result = urlget(baseurl + "ct/v1/get-sth") @@ -157,9 +160,9 @@ def unpack_tls_array(packed_data, length_len): rest_data = packed_data[length_len+length:] return (unpacked_data, rest_data) -def add_chain(baseurl, submission): +def add_chain(baseurl, submission, session=None): try: - result = urlpost(baseurl + "ct/v1/add-chain", json.dumps(submission)) + result = urlpost(baseurl + "ct/v1/add-chain", json.dumps(submission), session=session) if result.status_code == requests.codes.ok: return result.json() else: @@ -175,10 +178,10 @@ def add_chain(baseurl, submission): print "========================" raise e -def add_prechain(baseurl, submission): +def add_prechain(baseurl, submission, session=None): try: result = urlpost(baseurl + "ct/v1/add-pre-chain", - json.dumps(submission)) + json.dumps(submission), session=session) if result.status_code == requests.codes.ok: return result.json() diff --git a/tools/compileconfig.py b/tools/compileconfig.py index 7ba2fac..1fa352e 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -129,7 +129,8 @@ def allowed_clients_frontend(mergenodenames, primarymergenode): return [ ("/plop/v1/frontend/sendentry", mergenodenames), ("/plop/v1/frontend/sendlog", mergenodenames), - ("/plop/v1/frontend/sendsth", [primarymergenode]), + ("/plop/v1/frontend/publish-sth", [primarymergenode]), + ("/plop/v1/frontend/verify-entries", [primarymergenode]), ("/plop/v1/frontend/currentposition", mergenodenames), ("/plop/v1/frontend/missingentries", mergenodenames), ] diff --git a/tools/merge_backup.py b/tools/merge_backup.py index abe9f36..2c17d90 100755 --- a/tools/merge_backup.py +++ b/tools/merge_backup.py @@ -9,6 +9,7 @@ import base64 import select import requests from time import sleep +from base64 import b64encode, b64decode from certtools import timing_point, build_merkle_tree, write_file, \ create_ssl_context from mergetools import chunks, backup_sendlog, get_logorder, \ @@ -30,8 +31,78 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk): continue return sendlogresult +sendlog_discover_chunksize = 100000 +def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths): + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) + if sendlogresult == None: + sys.exit(1) + if sendlogresult["result"] != "ok": + print >>sys.stderr, "backup_sendlog:", sendlogresult + sys.exit(1) + verifiedsize += len(chunk) + print >>sys.stderr, verifiedsize, + sys.stderr.flush() + print >>sys.stderr + print >>sys.stderr, "log sent" + sys.stderr.flush() + +def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing): + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + fetched_entries = 0 + print >>sys.stderr, "sending missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] + sendentryresult = sendentries_merge(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentries_merge:", sendentryresult + sys.exit(1) + fetched_entries += len(missingentry_hashes) + #print >>sys.stderr, fetched_entries, + #sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentriesforbackup(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + +def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing): + tree = build_merkle_tree(logorder[:tree_size]) + root_hash = tree[-1][0] + timing_point(timing, "build tree") + verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, + tree_size) + if verifyrootresult["result"] != "ok": + print >>sys.stderr, "verifyroot:", verifyrootresult + sys.exit(1) + secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) + if root_hash != secondary_root_hash: + print >>sys.stderr, "secondary root hash was", \ + hexencode(secondary_root_hash) + print >>sys.stderr, " expected", hexencode(root_hash) + sys.exit(1) + timing_point(timing, "verifyroot") + return root_hash + def merge_backup(args, config, localconfig, secondaries): + maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -48,10 +119,6 @@ def merge_backup(args, config, localconfig, secondaries): tree_size = len(logorder) timing_point(timing, "get logorder") - tree = build_merkle_tree(logorder) - root_hash = tree[-1][0] - timing_point(timing, "build tree") - for secondary in secondaries: if secondary["name"] == config["primarymergenode"]: continue @@ -65,92 +132,23 @@ def merge_backup(args, config, localconfig, secondaries): print >>sys.stderr, "verified size", verifiedsize sys.stderr.flush() - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - - print >>sys.stderr, "determining end of log:", - for chunk in chunks(entries, 100000): - sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10]) - if sendlogresult == None: - print >>sys.stderr, "sendlog result was None" - sys.exit(1) - if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - - if verifiedsize > 100000: - verifiedsize -= 100000 + if verifiedsize == tree_size: + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing) else: - verifiedsize = 0 + while verifiedsize < tree_size: + uptopos = min(verifiedsize + maxwindow, tree_size) - timing_point(timing, "checklog") + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths) + timing_point(timing, "sendlog") - entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk) - if sendlogresult == None: - sys.exit(1) - if sendlogresult["result"] != "ok": - print >>sys.stderr, "backup_sendlog:", sendlogresult - sys.exit(1) - verifiedsize += len(chunk) - print >>sys.stderr, verifiedsize, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing) - missingentries = get_missingentriesforbackup(nodename, nodeaddress, - own_key, paths) - timing_point(timing, "get missing") - - while missingentries: - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() + root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing) - fetched_entries = 0 - print >>sys.stderr, "fetching missing entries", - sys.stderr.flush() - with requests.sessions.Session() as session: - for missingentry_chunk in chunks(missingentries, 100): - missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] - hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] - sendentryresult = sendentries_merge(nodename, nodeaddress, - own_key, paths, - hashes_and_entries, session) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry_merge:", sendentryresult - sys.exit(1) - fetched_entries += len(missingentry_hashes) - print >>sys.stderr, fetched_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") - - missingentries = get_missingentriesforbackup(nodename, nodeaddress, - own_key, paths) - timing_point(timing, "get missing") - - verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths, - tree_size) - if verifyrootresult["result"] != "ok": - print >>sys.stderr, "verifyroot:", verifyrootresult - sys.exit(1) - secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"]) - if root_hash != secondary_root_hash: - print >>sys.stderr, "secondary root hash was", \ - hexencode(secondary_root_hash) - print >>sys.stderr, " expected", hexencode(root_hash) - sys.exit(1) - timing_point(timing, "verifyroot") + verifiedsize = uptopos + setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize) - setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size) backuppath = mergedb + "/verified." + nodename backupdata = {"tree_size": tree_size, "sha256_root_hash": hexencode(root_hash)} diff --git a/tools/merge_dist.py b/tools/merge_dist.py index 2af1d6c..ded25a1 100755 --- a/tools/merge_dist.py +++ b/tools/merge_dist.py @@ -6,14 +6,78 @@ import sys import json +import base64 +import requests from time import sleep from base64 import b64encode, b64decode from certtools import timing_point, \ create_ssl_context from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \ - sendsth, sendlog, sendentry, parse_args, perm + publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \ + frontend_verify_entries + +def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths): + print >>sys.stderr, "sending log:", + sys.stderr.flush() + for chunk in chunks(entries, 1000): + for trynumber in range(5, 0, -1): + sendlogresult = sendlog(nodename, nodeaddress, + own_key, paths, + {"start": curpos, "hashes": chunk}) + if sendlogresult == None: + if trynumber == 1: + sys.exit(1) + sleep(10) + print >>sys.stderr, "tries left:", trynumber + sys.stderr.flush() + continue + break + if sendlogresult["result"] != "ok": + print >>sys.stderr, "sendlog:", sendlogresult + sys.exit(1) + curpos += len(chunk) + print >>sys.stderr, curpos, + sys.stderr.flush() + print >>sys.stderr + print >>sys.stderr, "log sent" + sys.stderr.flush() + +def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing): + missingentries = get_missingentries(nodename, nodeaddress, own_key, + paths) + timing_point(timing, "get missing") + + while missingentries: + print >>sys.stderr, "missing entries:", len(missingentries) + sys.stderr.flush() + + sent_entries = 0 + print >>sys.stderr, "sending missing entries", + sys.stderr.flush() + with requests.sessions.Session() as session: + for missingentry_chunk in chunks(missingentries, 100): + missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk] + hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes] + sendentryresult = sendentries(nodename, nodeaddress, + own_key, paths, + hashes_and_entries, session) + if sendentryresult["result"] != "ok": + print >>sys.stderr, "sendentries:", sendentryresult + sys.exit(1) + sent_entries += len(missingentry_hashes) + print >>sys.stderr, sent_entries, + sys.stderr.flush() + print >>sys.stderr + sys.stderr.flush() + timing_point(timing, "send missing") + + missingentries = get_missingentries(nodename, nodeaddress, + own_key, paths) + timing_point(timing, "get missing") + def merge_dist(args, localconfig, frontendnodes, timestamp): + maxwindow = localconfig.get("maxwindow", 1000) paths = localconfig["paths"] own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], @@ -53,62 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp): print >>sys.stderr, "current position", curpos sys.stderr.flush() - entries = [b64encode(entry) for entry in logorder[curpos:]] - print >>sys.stderr, "sending log:", - sys.stderr.flush() - for chunk in chunks(entries, 1000): - for trynumber in range(5, 0, -1): - sendlogresult = sendlog(nodename, nodeaddress, - own_key, paths, - {"start": curpos, "hashes": chunk}) - if sendlogresult == None: - if trynumber == 1: - sys.exit(1) - sleep(10) - print >>sys.stderr, "tries left:", trynumber - sys.stderr.flush() - continue - break - if sendlogresult["result"] != "ok": - print >>sys.stderr, "sendlog:", sendlogresult - sys.exit(1) - curpos += len(chunk) - print >>sys.stderr, curpos, - sys.stderr.flush() - print >>sys.stderr - timing_point(timing, "sendlog") - print >>sys.stderr, "log sent" - sys.stderr.flush() + verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths) + timing_point(timing, "get verified size") + print >>sys.stderr, "verified size", verifiedsize - missingentries = get_missingentries(nodename, nodeaddress, own_key, - paths) - timing_point(timing, "get missing") + assert verifiedsize >= curpos - print >>sys.stderr, "missing entries:", len(missingentries) - sys.stderr.flush() - sent_entries = 0 - print >>sys.stderr, "send missing entries", - sys.stderr.flush() - for missingentry in missingentries: - ehash = b64decode(missingentry) - sendentryresult = sendentry(nodename, nodeaddress, own_key, paths, - chainsdb.get(ehash), ehash) - if sendentryresult["result"] != "ok": - print >>sys.stderr, "sendentry:", sendentryresult - sys.exit(1) - sent_entries += 1 - if sent_entries % 1000 == 0: - print >>sys.stderr, sent_entries, - sys.stderr.flush() - print >>sys.stderr - sys.stderr.flush() - timing_point(timing, "send missing") + while verifiedsize < len(logorder): + uptopos = min(verifiedsize + maxwindow, len(logorder)) + + entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]] + sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths) + timing_point(timing, "sendlog") + + fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing) + verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos) + print >>sys.stderr, "sending sth to node", nodename sys.stderr.flush() - sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth) - if sendsthresult["result"] != "ok": - print >>sys.stderr, "sendsth:", sendsthresult + publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth) + if publishsthresult["result"] != "ok": + print >>sys.stderr, "publishsth:", publishsthresult sys.exit(1) timing_point(timing, "send sth") diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index 3028b30..ddd2f06 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -7,6 +7,7 @@ import sys import struct import subprocess +import requests from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ @@ -63,20 +64,21 @@ def merge_fetch(args, config, localconfig): print >>sys.stderr, "getting %d entries from %s:" % \ (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]), sys.stderr.flush() - for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): - entries = get_entries(storagenode["name"], - "https://%s/" % storagenode["address"], - own_key, paths, chunk) - for ehash in chunk: - entry = entries[ehash] - verify_entry(verifycert, entry, ehash) - chainsdb.add(ehash, entry) - add_to_logorder(logorderfile, ehash) - logorder.append(ehash) - certsinlog.add(ehash) - added_entries += 1 - print >>sys.stderr, added_entries, - sys.stderr.flush() + with requests.sessions.Session() as session: + for chunk in chunks(entries_to_fetch[storagenode["name"]], 100): + entries = get_entries(storagenode["name"], + "https://%s/" % storagenode["address"], + own_key, paths, chunk, session=session) + for ehash in chunk: + entry = entries[ehash] + verify_entry(verifycert, entry, ehash) + chainsdb.add(ehash, entry) + add_to_logorder(logorderfile, ehash) + logorder.append(ehash) + certsinlog.add(ehash) + added_entries += 1 + print >>sys.stderr, added_entries, + sys.stderr.flush() print >>sys.stderr sys.stderr.flush() chainsdb.commit() diff --git a/tools/mergetools.py b/tools/mergetools.py index 80fbf0b..94901a9 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -179,13 +179,13 @@ def get_new_entries(node, baseurl, own_key, paths): print >>sys.stderr, "ERROR: fetchnewentries", e.response sys.exit(1) -def get_entries(node, baseurl, own_key, paths, hashes): +def get_entries(node, baseurl, own_key, paths, hashes, session=None): try: params = {"hash":[base64.b64encode(ehash) for ehash in hashes]} result = http_request(baseurl + "plop/v1/storage/getentry", params=params, key=own_key, verifynode=node, - publickeydir=paths["publickeys"]) + publickeydir=paths["publickeys"], session=session) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": entries = dict([(base64.b64decode(entry["hash"]), @@ -214,6 +214,25 @@ def get_curpos(node, baseurl, own_key, paths): print >>sys.stderr, "ERROR: currentposition", e.response sys.exit(1) +def get_frontend_verifiedsize(node, baseurl, own_key, paths): + return frontend_verify_entries(node, baseurl, own_key, paths, 0) + +def frontend_verify_entries(node, baseurl, own_key, paths, size): + try: + arguments = {"verify_to": size} + result = http_request(baseurl + "plop/v1/frontend/verify-entries", + json.dumps(arguments), + key=own_key, verifynode=node, + publickeydir=paths["publickeys"]) + parsed_result = json.loads(result) + if parsed_result.get(u"result") == u"ok": + return parsed_result[u"verified"] + print >>sys.stderr, "ERROR: verify-entries", parsed_result + sys.exit(1) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: verify-entries", e.response + sys.exit(1) + def get_verifiedsize(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/merge/verifiedsize", @@ -258,6 +277,10 @@ def backup_sendlog(node, baseurl, own_key, paths, submission): print >>sys.stderr, "ERROR: backup_sendlog", e.response sys.stderr.flush() return None + except requests.packages.urllib3.exceptions.NewConnectionError, e: + print >>sys.stderr, "ERROR: backup_sendlog new connection error" + sys.stderr.flush() + return None except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, submission @@ -267,16 +290,17 @@ def backup_sendlog(node, baseurl, own_key, paths, submission): sys.stderr.flush() raise e -def sendentry(node, baseurl, own_key, paths, entry, ehash): +def sendentries(node, baseurl, own_key, paths, entries, session=None): try: + json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] result = http_request( baseurl + "plop/v1/frontend/sendentry", - json.dumps({"entry":base64.b64encode(entry), - "treeleafhash":base64.b64encode(ehash)}), - key=own_key, verifynode=node, publickeydir=paths["publickeys"]) + json.dumps(json_entries), + key=own_key, verifynode=node, publickeydir=paths["publickeys"], + session=session) return json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry", e.reponse + print >>sys.stderr, "ERROR: sendentries", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" @@ -286,9 +310,9 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash): print >>sys.stderr, "========================" sys.stderr.flush() raise e - -def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): - return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)]) + except requests.exceptions.ConnectionError, e: + print >>sys.stderr, "ERROR: sendentries", baseurl, e.request, e.response + sys.exit(1) def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: @@ -300,7 +324,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): session=session) return json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendentry_merge", e.response + print >>sys.stderr, "ERROR: sendentries_merge", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" @@ -310,15 +334,18 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): print >>sys.stderr, "========================" sys.stderr.flush() raise e + except requests.exceptions.ConnectionError, e: + print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e.request, e.response + sys.exit(1) -def sendsth(node, baseurl, own_key, paths, submission): +def publish_sth(node, baseurl, own_key, paths, submission): try: - result = http_request(baseurl + "plop/v1/frontend/sendsth", + result = http_request(baseurl + "plop/v1/frontend/publish-sth", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except requests.exceptions.HTTPError, e: - print >>sys.stderr, "ERROR: sendsth", e.response + print >>sys.stderr, "ERROR: publish-sth", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" diff --git a/tools/parsebench.py b/tools/parsebench.py index 96c36d6..6897b57 100755 --- a/tools/parsebench.py +++ b/tools/parsebench.py @@ -10,8 +10,8 @@ import itertools def parse_one_line(line): row = line.rstrip().split(":") - assert row[0].startswith("mergeoutput.") - iteration = int(row[0][12:]) + assert row[0].startswith("bench-") + iteration = int(row[0][6:]) stage = row[2].strip() data = ast.literal_eval(row[3].strip()) return (iteration, stage, data) @@ -30,35 +30,63 @@ def main(): print "</body>" print "</html>" +scale = 0.25 + def parse_one_file(filename): lines = [parse_one_line(line) for line in open(filename)] iterations = itertools.groupby(lines, lambda x: x[0]) print "<h1>%s</h1>" % (filename,) print "<div>" - legend = [] + firsttime = True + + stageorderdict = {} + stageorder = [] + stages = {} + itemorder = {} + + for (i, iteration) in iterations: + for (_, stage, data) in iteration: + if stage not in stages: + stageorderdict[stage] = len(stageorderdict) + stageorder.append(stage) + stages[stage] = {} + itemorder[stage] = [] + for (item, useconds) in data: + if item not in stages[stage]: + itemorder[stage].append(item) + stages[stage][item] = len(stages[stage]) + + iterations = itertools.groupby(lines, lambda x: x[0]) for (i, iteration) in iterations: - print "<table><tr>" - for (stagen, (_, stage, data)) in enumerate(iteration): - if i == 0: - legend.append("<div><span>%s</span>" % (stage,)) + print >>sys.stderr, (i, iteration) + sys.stdout.write("<div style='margin-bottom: 1em;'>") + for (_, stage, data) in iteration: data = list(data) for (itemn, (item, useconds)) in enumerate(data): seconds = useconds / 1000000 - step = 50 / (len(data) - 1) - print "<td style='width: %dpx; padding: 0; background-color: hsl(%d, 90%%, %d%%);' title='%s:%s %d'>" % (seconds/4, stagen * 90, itemn * step + 40, stage, item, seconds) - if i == 0: - legend.append("<span style='background-color: hsl(%d, 90%%, %d%%);'>%s</span>" % (stagen * 90, itemn * step + 40, item)) + shades = stages[stage] + step = 50 / (len(shades) - 1) + shade = shades[item] + stagen = stageorderdict[stage] + print "<div class='element' style='display: inline-block; margin: 0; width: %dpx; padding: 0; background-color: hsl(%d, 90%%, %d%%);' title='%s:%s %d'>" % (int(seconds*scale), stagen * 90, shade * step + 40, stage, item, seconds) print " " - print "</td>" - if i == 0: - legend.append("</div>") - print "</tr></table>" + sys.stdout.write("</div>") + sys.stdout.write("</div>") print "</div>" print "<div style='height: 30px;'>" print "</div>" print "<div>" - for row in legend: - print row + + for stage in stageorder: + print "<div><span>%s</span>" % (stage,) + shades = stages[stage] + for item in itemorder[stage]: + shade = shades[item] + step = 50 / (len(shades) - 1) + + stagen = stageorderdict[stage] + print "<span style='background-color: hsl(%d, 90%%, %d%%);'>%s</span>" % (stagen * 90, shade * step + 40, item) + print "</div>" print "</div>" main() |