summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/catlfish_compat.erl10
-rw-r--r--src/catlfish_web.erl8
-rw-r--r--test/Makefile6
-rwxr-xr-xtest/scripts/light-system-test-prepare.sh16
-rwxr-xr-xtest/scripts/perf-test.sh71
-rw-r--r--test/scripts/testutils.sh2
-rw-r--r--tools/certtools.py15
-rwxr-xr-xtools/compileconfig.py3
-rwxr-xr-xtools/merge_backup.py166
-rwxr-xr-xtools/merge_dist.py134
-rwxr-xr-xtools/merge_fetch.py30
-rw-r--r--tools/mergetools.py55
-rwxr-xr-xtools/parsebench.py62
13 files changed, 379 insertions, 199 deletions
diff --git a/src/catlfish_compat.erl b/src/catlfish_compat.erl
index 183eb44..6dab325 100644
--- a/src/catlfish_compat.erl
+++ b/src/catlfish_compat.erl
@@ -10,6 +10,11 @@ poison_val(Value) ->
poison_val(erlang:system_info(otp_release), Value).
%% @doc Dig out alg, params and key from issuer.
+unpack_issuer("R16" ++ _, Issuer) ->
+ #'SubjectPublicKeyInfo'{
+ algorithm = #'AlgorithmIdentifier'{algorithm = Alg, parameters = Params},
+ subjectPublicKey = {0, Key}} = Issuer,
+ {Alg, Params, Key};
unpack_issuer("17", Issuer) ->
#'SubjectPublicKeyInfo'{
algorithm = #'AlgorithmIdentifier'{algorithm = Alg, parameters = Params},
@@ -22,6 +27,9 @@ unpack_issuer("18", Issuer) ->
{Alg, Params, Key}.
%% @doc Unpack a #'Certificate'.signature, return the signature.
+unpack_signature("R16" ++ _, Signature) ->
+ {_, Sig} = Signature,
+ Sig;
unpack_signature("17", Signature) ->
{_, Sig} = Signature,
Sig;
@@ -29,6 +37,8 @@ unpack_signature("18", Signature) ->
Signature.
%% Use a list for R17 and a binary for newer versions.
+poison_val("R16" ++ _, Val) ->
+ Val;
poison_val("17", Val) ->
Val;
poison_val("18", Val) ->
diff --git a/src/catlfish_web.erl b/src/catlfish_web.erl
index f44745d..12441cf 100644
--- a/src/catlfish_web.erl
+++ b/src/catlfish_web.erl
@@ -43,14 +43,16 @@ loop(Req, Module) ->
{403, [{"Content-Type", "text/plain"}],
"Invalid credentials"};
success ->
+ lager:info("GET ~p", [Path]),
lager:debug("GET ~p ~p", [Path, Query]),
add_auth(Path,
Module:request(get, App, Fun, Query));
noauth ->
+ lager:info("GET ~p", [Path]),
lager:debug("GET ~p ~p", [Path, Query]),
Module:request(get, App, Fun, Query)
end,
- lager:debug("GET finished: ~p us",
+ lager:info("GET finished: ~p us",
[timer:now_diff(os:timestamp(), Starttime)]),
case Result of
none ->
@@ -67,14 +69,16 @@ loop(Req, Module) ->
{403, [{"Content-Type", "text/plain"}],
"Invalid credentials"};
success ->
+ lager:info("POST ~p", [Path]),
lager:debug("POST ~p ~p", [Path, Body]),
add_auth(Path,
Module:request(post, App, Fun, Body));
noauth ->
+ lager:info("POST ~p", [Path]),
lager:debug("POST ~p ~p", [Path, Body]),
Module:request(post, App, Fun, Body)
end,
- lager:debug("POST finished: ~p us",
+ lager:info("POST finished: ~p us",
[timer:now_diff(os:timestamp(), Starttime)]),
case Result of
none ->
diff --git a/test/Makefile b/test/Makefile
index bd58cfe..c92c30d 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -18,3 +18,9 @@ tests:
rm -r $(INSTDIR)/tests || true
mkdir $(INSTDIR)/tests
(cd $(INSTDIR)/tests && ../../test/scripts/light-system-test.sh)
+
+perf-tests:
+ @make tests-makemk
+ rm -r $(INSTDIR)/tests || true
+ mkdir $(INSTDIR)/tests
+ (cd $(INSTDIR)/tests && ../../test/scripts/perf-test.sh)
diff --git a/test/scripts/light-system-test-prepare.sh b/test/scripts/light-system-test-prepare.sh
index 466f3aa..6f6dd07 100755
--- a/test/scripts/light-system-test-prepare.sh
+++ b/test/scripts/light-system-test-prepare.sh
@@ -39,7 +39,7 @@ createcert () {
createca
createcert
mkdir keys
-(cd keys ; ../../../tools/create-key.sh logkey)
+(cd keys ; ${top_srcdir}/tools/create-key.sh logkey)
openssl pkcs8 -topk8 -nocrypt -in keys/logkey-private.pem -out keys/logkey-private.pkcs8
mkdir mergedb
touch mergedb/logorder
@@ -47,24 +47,24 @@ mkdir mergedb-secondary
touch mergedb-secondary/logorder
printf 0 > mergedb-secondary/verifiedsize
mkdir known_roots
-cp ../../tools/testcerts/roots/* known_roots
+cp ${top_srcdir}/tools/testcerts/roots/* known_roots
for machine in ${MACHINES}; do \
- ../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-${machine}.cfg
+ ${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-${machine}.cfg
mkdir -p machine/machine-${machine}/db
touch machine/machine-${machine}/db/index && touch machine/machine-${machine}/db/newentries
done
-../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-merge-2.cfg
-../../tools/compileconfig.py --config ../../test/catlfish-test.cfg --localconfig ../../test/catlfish-test-local-signing.cfg
+${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge-2.cfg
+${top_srcdir}/tools/compileconfig.py --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-signing.cfg
mkdir privatekeys
mkdir publickeys
for node in ${NODES}; do \
- (cd privatekeys ; ../../../tools/create-key.sh ${node}) ; \
+ (cd privatekeys ; ${top_srcdir}/tools/create-key.sh ${node}) ; \
mv privatekeys/${node}.pem publickeys/ ; \
mkdir -p nodes/${node}/log
done
-(cd privatekeys ; ../../../tools/create-key.sh merge-1)
+(cd privatekeys ; ${top_srcdir}/tools/create-key.sh merge-1)
mv privatekeys/merge-1.pem publickeys/
-(cd privatekeys ; ../../../tools/create-key.sh merge-2)
+(cd privatekeys ; ${top_srcdir}/tools/create-key.sh merge-2)
mv privatekeys/merge-2.pem publickeys/
test -x ${SOFTHSM} && ${SOFTHSM} --init-token --slot=0 --label=mylabel --so-pin=ffff --pin=ffff || true
test -x ${SOFTHSM} && ${SOFTHSM} --import keys/logkey-private.pkcs8 --slot 0 --label mylabel --pin ffff --id 00 || true
diff --git a/test/scripts/perf-test.sh b/test/scripts/perf-test.sh
new file mode 100755
index 0000000..aaf3b1d
--- /dev/null
+++ b/test/scripts/perf-test.sh
@@ -0,0 +1,71 @@
+#!/bin/sh
+
+set -o nounset
+set -o errexit
+
+top_srcdir=$(cd $(dirname $0)/../..; pwd)
+
+. ${top_srcdir}/test/scripts/testutils.sh
+
+SCRIPTS=${top_srcdir}/test/scripts
+
+tests_start() {
+ ${SCRIPTS}/light-system-test-start.sh
+}
+
+tests_stop() {
+ ${SCRIPTS}/light-system-test-stop.sh
+}
+
+${SCRIPTS}/light-system-test-prepare.sh
+
+cp ${top_srcdir}/test/known_roots/* known_roots
+
+tests_start
+
+do_merge
+check_sth
+
+assert_equal "Tree size" "$(get_treesize)" 0
+
+python ${top_srcdir}/tools/submitcert.py --parallel=1 --store ${top_srcdir}/test/bulktestcerts/0000.zip --sct-file=submittedcerts ${BASEURL} --publickey=keys/logkey.pem --cafile httpsca/demoCA/cacert.pem || (tests_stop ; fail "Submission failed")
+
+
+do_merge 2> bench-1 || (tests_stop ; fail "Merge failed")
+check_sth || (tests_stop ; fail "Check STH failed")
+
+sleep 5
+tests_stop
+sleep 5
+
+mv mergedb mergedb-down
+mv mergedb-secondary mergedb
+mkdir mergedb-secondary
+touch mergedb-secondary/logorder
+printf 0 > mergedb-secondary/verifiedsize
+
+tests_start
+
+do_merge 2> bench-2 || (tests_stop ; fail "Merge failed")
+check_sth || (tests_stop ; fail "Check STH failed")
+
+sleep 5
+tests_stop
+sleep 5
+
+mv machine/machine-1 machine/machine-1-down
+mkdir -p machine/machine-1/db
+touch machine/machine-1/db/index
+touch machine/machine-1/db/newentries
+
+tests_start
+
+do_merge 2> bench-3 || (tests_stop ; fail "Merge failed")
+check_sth || (tests_stop ; fail "Check STH failed")
+
+sleep 5
+tests_stop
+sleep 5
+
+grep timing: bench-[123] > bench.txt
+${top_srcdir}/tools/parsebench.py bench.txt > bench.html
diff --git a/test/scripts/testutils.sh b/test/scripts/testutils.sh
index 94d6223..e779e07 100644
--- a/test/scripts/testutils.sh
+++ b/test/scripts/testutils.sh
@@ -18,5 +18,5 @@ check_sth() {
}
do_merge() {
- ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed"
+ ${top_srcdir}/tools/merge --config ${top_srcdir}/test/catlfish-test.cfg --timing --localconfig ${top_srcdir}/test/catlfish-test-local-merge.cfg || fail "Merge failed"
}
diff --git a/tools/certtools.py b/tools/certtools.py
index 1523c97..0009d5d 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -108,13 +108,16 @@ def urlget(url, params=None):
pass
return requests.get(url, verify=sslparameters.cafile, params=params)
-def urlpost(url, data):
+def urlpost(url, data, session=None):
with warnings.catch_warnings():
try:
warnings.filterwarnings("ignore", category=requests.packages.urllib3.exceptions.SubjectAltNameWarning)
except AttributeError:
pass
- return requests.post(url, data=data, verify=sslparameters.cafile)
+ if session:
+ return session.post(url, data=data, verify=sslparameters.cafile)
+ else:
+ return requests.post(url, data=data, verify=sslparameters.cafile)
def get_sth(baseurl):
result = urlget(baseurl + "ct/v1/get-sth")
@@ -157,9 +160,9 @@ def unpack_tls_array(packed_data, length_len):
rest_data = packed_data[length_len+length:]
return (unpacked_data, rest_data)
-def add_chain(baseurl, submission):
+def add_chain(baseurl, submission, session=None):
try:
- result = urlpost(baseurl + "ct/v1/add-chain", json.dumps(submission))
+ result = urlpost(baseurl + "ct/v1/add-chain", json.dumps(submission), session=session)
if result.status_code == requests.codes.ok:
return result.json()
else:
@@ -175,10 +178,10 @@ def add_chain(baseurl, submission):
print "========================"
raise e
-def add_prechain(baseurl, submission):
+def add_prechain(baseurl, submission, session=None):
try:
result = urlpost(baseurl + "ct/v1/add-pre-chain",
- json.dumps(submission))
+ json.dumps(submission), session=session)
if result.status_code == requests.codes.ok:
return result.json()
diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index 7ba2fac..1fa352e 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -129,7 +129,8 @@ def allowed_clients_frontend(mergenodenames, primarymergenode):
return [
("/plop/v1/frontend/sendentry", mergenodenames),
("/plop/v1/frontend/sendlog", mergenodenames),
- ("/plop/v1/frontend/sendsth", [primarymergenode]),
+ ("/plop/v1/frontend/publish-sth", [primarymergenode]),
+ ("/plop/v1/frontend/verify-entries", [primarymergenode]),
("/plop/v1/frontend/currentposition", mergenodenames),
("/plop/v1/frontend/missingentries", mergenodenames),
]
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index abe9f36..2c17d90 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -9,6 +9,7 @@ import base64
import select
import requests
from time import sleep
+from base64 import b64encode, b64decode
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
@@ -30,8 +31,78 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
continue
return sendlogresult
+sendlog_discover_chunksize = 100000
+def sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
+ if sendlogresult == None:
+ sys.exit(1)
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "backup_sendlog:", sendlogresult
+ sys.exit(1)
+ verifiedsize += len(chunk)
+ print >>sys.stderr, verifiedsize,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ fetched_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries_merge(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries_merge:", sendentryresult
+ sys.exit(1)
+ fetched_entries += len(missingentry_hashes)
+ #print >>sys.stderr, fetched_entries,
+ #sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentriesforbackup(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
+def check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing):
+ tree = build_merkle_tree(logorder[:tree_size])
+ root_hash = tree[-1][0]
+ timing_point(timing, "build tree")
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ if verifyrootresult["result"] != "ok":
+ print >>sys.stderr, "verifyroot:", verifyrootresult
+ sys.exit(1)
+ secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
+ if root_hash != secondary_root_hash:
+ print >>sys.stderr, "secondary root hash was", \
+ hexencode(secondary_root_hash)
+ print >>sys.stderr, " expected", hexencode(root_hash)
+ sys.exit(1)
+ timing_point(timing, "verifyroot")
+ return root_hash
+
def merge_backup(args, config, localconfig, secondaries):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -48,10 +119,6 @@ def merge_backup(args, config, localconfig, secondaries):
tree_size = len(logorder)
timing_point(timing, "get logorder")
- tree = build_merkle_tree(logorder)
- root_hash = tree[-1][0]
- timing_point(timing, "build tree")
-
for secondary in secondaries:
if secondary["name"] == config["primarymergenode"]:
continue
@@ -65,92 +132,23 @@ def merge_backup(args, config, localconfig, secondaries):
print >>sys.stderr, "verified size", verifiedsize
sys.stderr.flush()
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
-
- print >>sys.stderr, "determining end of log:",
- for chunk in chunks(entries, 100000):
- sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
- if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
-
- if verifiedsize > 100000:
- verifiedsize -= 100000
+ if verifiedsize == tree_size:
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, tree_size, timing)
else:
- verifiedsize = 0
+ while verifiedsize < tree_size:
+ uptopos = min(verifiedsize + maxwindow, tree_size)
- timing_point(timing, "checklog")
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
- entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
- if sendlogresult == None:
- sys.exit(1)
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
- verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
- missingentries = get_missingentriesforbackup(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
-
- while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ root_hash = check_root(logorder, nodename, nodeaddress, own_key, paths, uptopos, timing)
- fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
- with requests.sessions.Session() as session:
- for missingentry_chunk in chunks(missingentries, 100):
- missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
- hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
- sendentryresult = sendentries_merge(nodename, nodeaddress,
- own_key, paths,
- hashes_and_entries, session)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
- fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
-
- missingentries = get_missingentriesforbackup(nodename, nodeaddress,
- own_key, paths)
- timing_point(timing, "get missing")
-
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
- if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
- secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
- if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
- timing_point(timing, "verifyroot")
+ verifiedsize = uptopos
+ setverifiedsize(nodename, nodeaddress, own_key, paths, verifiedsize)
- setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index 2af1d6c..ded25a1 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -6,14 +6,78 @@
import sys
import json
+import base64
+import requests
from time import sleep
from base64 import b64encode, b64decode
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentry, parse_args, perm
+ publish_sth, sendlog, sendentries, parse_args, perm, get_frontend_verifiedsize, \
+ frontend_verify_entries
+
+def sendlog_helper(entries, curpos, nodename, nodeaddress, own_key, paths):
+ print >>sys.stderr, "sending log:",
+ sys.stderr.flush()
+ for chunk in chunks(entries, 1000):
+ for trynumber in range(5, 0, -1):
+ sendlogresult = sendlog(nodename, nodeaddress,
+ own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if sendlogresult == None:
+ if trynumber == 1:
+ sys.exit(1)
+ sleep(10)
+ print >>sys.stderr, "tries left:", trynumber
+ sys.stderr.flush()
+ continue
+ break
+ if sendlogresult["result"] != "ok":
+ print >>sys.stderr, "sendlog:", sendlogresult
+ sys.exit(1)
+ curpos += len(chunk)
+ print >>sys.stderr, curpos,
+ sys.stderr.flush()
+ print >>sys.stderr
+ print >>sys.stderr, "log sent"
+ sys.stderr.flush()
+
+def fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing):
+ missingentries = get_missingentries(nodename, nodeaddress, own_key,
+ paths)
+ timing_point(timing, "get missing")
+
+ while missingentries:
+ print >>sys.stderr, "missing entries:", len(missingentries)
+ sys.stderr.flush()
+
+ sent_entries = 0
+ print >>sys.stderr, "sending missing entries",
+ sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for missingentry_chunk in chunks(missingentries, 100):
+ missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
+ hashes_and_entries = [(hash, chainsdb.get(hash)) for hash in missingentry_hashes]
+ sendentryresult = sendentries(nodename, nodeaddress,
+ own_key, paths,
+ hashes_and_entries, session)
+ if sendentryresult["result"] != "ok":
+ print >>sys.stderr, "sendentries:", sendentryresult
+ sys.exit(1)
+ sent_entries += len(missingentry_hashes)
+ print >>sys.stderr, sent_entries,
+ sys.stderr.flush()
+ print >>sys.stderr
+ sys.stderr.flush()
+ timing_point(timing, "send missing")
+
+ missingentries = get_missingentries(nodename, nodeaddress,
+ own_key, paths)
+ timing_point(timing, "get missing")
+
def merge_dist(args, localconfig, frontendnodes, timestamp):
+ maxwindow = localconfig.get("maxwindow", 1000)
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -53,62 +117,28 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
print >>sys.stderr, "current position", curpos
sys.stderr.flush()
- entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
- for chunk in chunks(entries, 1000):
- for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
- break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
- curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
- timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ verifiedsize = get_frontend_verifiedsize(nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "get verified size")
+ print >>sys.stderr, "verified size", verifiedsize
- missingentries = get_missingentries(nodename, nodeaddress, own_key,
- paths)
- timing_point(timing, "get missing")
+ assert verifiedsize >= curpos
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
- sent_entries = 0
- print >>sys.stderr, "send missing entries",
- sys.stderr.flush()
- for missingentry in missingentries:
- ehash = b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
- chainsdb.get(ehash), ehash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry:", sendentryresult
- sys.exit(1)
- sent_entries += 1
- if sent_entries % 1000 == 0:
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
- timing_point(timing, "send missing")
+ while verifiedsize < len(logorder):
+ uptopos = min(verifiedsize + maxwindow, len(logorder))
+
+ entries = [b64encode(entry) for entry in logorder[verifiedsize:uptopos]]
+ sendlog_helper(entries, verifiedsize, nodename, nodeaddress, own_key, paths)
+ timing_point(timing, "sendlog")
+
+ fill_in_missing_entries(nodename, nodeaddress, own_key, paths, chainsdb, timing)
+ verifiedsize = frontend_verify_entries(nodename, nodeaddress, own_key, paths, uptopos)
+
print >>sys.stderr, "sending sth to node", nodename
sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
+ publishsthresult = publish_sth(nodename, nodeaddress, own_key, paths, sth)
+ if publishsthresult["result"] != "ok":
+ print >>sys.stderr, "publishsth:", publishsthresult
sys.exit(1)
timing_point(timing, "send sth")
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index 3028b30..ddd2f06 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -7,6 +7,7 @@
import sys
import struct
import subprocess
+import requests
from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
@@ -63,20 +64,21 @@ def merge_fetch(args, config, localconfig):
print >>sys.stderr, "getting %d entries from %s:" % \
(len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
sys.stderr.flush()
- for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
- entries = get_entries(storagenode["name"],
- "https://%s/" % storagenode["address"],
- own_key, paths, chunk)
- for ehash in chunk:
- entry = entries[ehash]
- verify_entry(verifycert, entry, ehash)
- chainsdb.add(ehash, entry)
- add_to_logorder(logorderfile, ehash)
- logorder.append(ehash)
- certsinlog.add(ehash)
- added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
+ with requests.sessions.Session() as session:
+ for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
+ entries = get_entries(storagenode["name"],
+ "https://%s/" % storagenode["address"],
+ own_key, paths, chunk, session=session)
+ for ehash in chunk:
+ entry = entries[ehash]
+ verify_entry(verifycert, entry, ehash)
+ chainsdb.add(ehash, entry)
+ add_to_logorder(logorderfile, ehash)
+ logorder.append(ehash)
+ certsinlog.add(ehash)
+ added_entries += 1
+ print >>sys.stderr, added_entries,
+ sys.stderr.flush()
print >>sys.stderr
sys.stderr.flush()
chainsdb.commit()
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 80fbf0b..94901a9 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -179,13 +179,13 @@ def get_new_entries(node, baseurl, own_key, paths):
print >>sys.stderr, "ERROR: fetchnewentries", e.response
sys.exit(1)
-def get_entries(node, baseurl, own_key, paths, hashes):
+def get_entries(node, baseurl, own_key, paths, hashes, session=None):
try:
params = {"hash":[base64.b64encode(ehash) for ehash in hashes]}
result = http_request(baseurl + "plop/v1/storage/getentry",
params=params,
key=own_key, verifynode=node,
- publickeydir=paths["publickeys"])
+ publickeydir=paths["publickeys"], session=session)
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
entries = dict([(base64.b64decode(entry["hash"]),
@@ -214,6 +214,25 @@ def get_curpos(node, baseurl, own_key, paths):
print >>sys.stderr, "ERROR: currentposition", e.response
sys.exit(1)
+def get_frontend_verifiedsize(node, baseurl, own_key, paths):
+ return frontend_verify_entries(node, baseurl, own_key, paths, 0)
+
+def frontend_verify_entries(node, baseurl, own_key, paths, size):
+ try:
+ arguments = {"verify_to": size}
+ result = http_request(baseurl + "plop/v1/frontend/verify-entries",
+ json.dumps(arguments),
+ key=own_key, verifynode=node,
+ publickeydir=paths["publickeys"])
+ parsed_result = json.loads(result)
+ if parsed_result.get(u"result") == u"ok":
+ return parsed_result[u"verified"]
+ print >>sys.stderr, "ERROR: verify-entries", parsed_result
+ sys.exit(1)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: verify-entries", e.response
+ sys.exit(1)
+
def get_verifiedsize(node, baseurl, own_key, paths):
try:
result = http_request(baseurl + "plop/v1/merge/verifiedsize",
@@ -258,6 +277,10 @@ def backup_sendlog(node, baseurl, own_key, paths, submission):
print >>sys.stderr, "ERROR: backup_sendlog", e.response
sys.stderr.flush()
return None
+ except requests.packages.urllib3.exceptions.NewConnectionError, e:
+ print >>sys.stderr, "ERROR: backup_sendlog new connection error"
+ sys.stderr.flush()
+ return None
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
print >>sys.stderr, submission
@@ -267,16 +290,17 @@ def backup_sendlog(node, baseurl, own_key, paths, submission):
sys.stderr.flush()
raise e
-def sendentry(node, baseurl, own_key, paths, entry, ehash):
+def sendentries(node, baseurl, own_key, paths, entries, session=None):
try:
+ json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
result = http_request(
baseurl + "plop/v1/frontend/sendentry",
- json.dumps({"entry":base64.b64encode(entry),
- "treeleafhash":base64.b64encode(ehash)}),
- key=own_key, verifynode=node, publickeydir=paths["publickeys"])
+ json.dumps(json_entries),
+ key=own_key, verifynode=node, publickeydir=paths["publickeys"],
+ session=session)
return json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.reponse
+ print >>sys.stderr, "ERROR: sendentries", e.response
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
@@ -286,9 +310,9 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash):
print >>sys.stderr, "========================"
sys.stderr.flush()
raise e
-
-def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
- return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)])
+ except requests.exceptions.ConnectionError, e:
+ print >>sys.stderr, "ERROR: sendentries", baseurl, e.request, e.response
+ sys.exit(1)
def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
try:
@@ -300,7 +324,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
session=session)
return json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry_merge", e.response
+ print >>sys.stderr, "ERROR: sendentries_merge", e.response
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
@@ -310,15 +334,18 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
print >>sys.stderr, "========================"
sys.stderr.flush()
raise e
+ except requests.exceptions.ConnectionError, e:
+ print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e.request, e.response
+ sys.exit(1)
-def sendsth(node, baseurl, own_key, paths, submission):
+def publish_sth(node, baseurl, own_key, paths, submission):
try:
- result = http_request(baseurl + "plop/v1/frontend/sendsth",
+ result = http_request(baseurl + "plop/v1/frontend/publish-sth",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
return json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.response
+ print >>sys.stderr, "ERROR: publish-sth", e.response
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
diff --git a/tools/parsebench.py b/tools/parsebench.py
index 96c36d6..6897b57 100755
--- a/tools/parsebench.py
+++ b/tools/parsebench.py
@@ -10,8 +10,8 @@ import itertools
def parse_one_line(line):
row = line.rstrip().split(":")
- assert row[0].startswith("mergeoutput.")
- iteration = int(row[0][12:])
+ assert row[0].startswith("bench-")
+ iteration = int(row[0][6:])
stage = row[2].strip()
data = ast.literal_eval(row[3].strip())
return (iteration, stage, data)
@@ -30,35 +30,63 @@ def main():
print "</body>"
print "</html>"
+scale = 0.25
+
def parse_one_file(filename):
lines = [parse_one_line(line) for line in open(filename)]
iterations = itertools.groupby(lines, lambda x: x[0])
print "<h1>%s</h1>" % (filename,)
print "<div>"
- legend = []
+ firsttime = True
+
+ stageorderdict = {}
+ stageorder = []
+ stages = {}
+ itemorder = {}
+
+ for (i, iteration) in iterations:
+ for (_, stage, data) in iteration:
+ if stage not in stages:
+ stageorderdict[stage] = len(stageorderdict)
+ stageorder.append(stage)
+ stages[stage] = {}
+ itemorder[stage] = []
+ for (item, useconds) in data:
+ if item not in stages[stage]:
+ itemorder[stage].append(item)
+ stages[stage][item] = len(stages[stage])
+
+ iterations = itertools.groupby(lines, lambda x: x[0])
for (i, iteration) in iterations:
- print "<table><tr>"
- for (stagen, (_, stage, data)) in enumerate(iteration):
- if i == 0:
- legend.append("<div><span>%s</span>" % (stage,))
+ print >>sys.stderr, (i, iteration)
+ sys.stdout.write("<div style='margin-bottom: 1em;'>")
+ for (_, stage, data) in iteration:
data = list(data)
for (itemn, (item, useconds)) in enumerate(data):
seconds = useconds / 1000000
- step = 50 / (len(data) - 1)
- print "<td style='width: %dpx; padding: 0; background-color: hsl(%d, 90%%, %d%%);' title='%s:%s %d'>" % (seconds/4, stagen * 90, itemn * step + 40, stage, item, seconds)
- if i == 0:
- legend.append("<span style='background-color: hsl(%d, 90%%, %d%%);'>%s</span>" % (stagen * 90, itemn * step + 40, item))
+ shades = stages[stage]
+ step = 50 / (len(shades) - 1)
+ shade = shades[item]
+ stagen = stageorderdict[stage]
+ print "<div class='element' style='display: inline-block; margin: 0; width: %dpx; padding: 0; background-color: hsl(%d, 90%%, %d%%);' title='%s:%s %d'>" % (int(seconds*scale), stagen * 90, shade * step + 40, stage, item, seconds)
print "&nbsp;"
- print "</td>"
- if i == 0:
- legend.append("</div>")
- print "</tr></table>"
+ sys.stdout.write("</div>")
+ sys.stdout.write("</div>")
print "</div>"
print "<div style='height: 30px;'>"
print "</div>"
print "<div>"
- for row in legend:
- print row
+
+ for stage in stageorder:
+ print "<div><span>%s</span>" % (stage,)
+ shades = stages[stage]
+ for item in itemorder[stage]:
+ shade = shades[item]
+ step = 50 / (len(shades) - 1)
+
+ stagen = stageorderdict[stage]
+ print "<span style='background-color: hsl(%d, 90%%, %d%%);'>%s</span>" % (stagen * 90, shade * step + 40, item)
+ print "</div>"
print "</div>"
main()