summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--reltool.config4
-rw-r--r--test/catlfish-test-mergefailover.cfg.in5
-rw-r--r--test/catlfish-test.cfg.in5
-rwxr-xr-xtest/scripts/light-system-test.sh4
-rwxr-xr-xtest/scripts/perf-test.sh4
-rwxr-xr-xtools/compileconfig.py57
-rwxr-xr-xtools/merge_fetch.py44
7 files changed, 111 insertions, 12 deletions
diff --git a/reltool.config b/reltool.config
index 759d67b..2de8936 100644
--- a/reltool.config
+++ b/reltool.config
@@ -4,14 +4,16 @@
{app_file, strip},
{rel, "catlfish", "1.0.1-alpha-dev", [sasl, catlfish]},
{rel, "merge", "1.0.1-alpha-dev", [sasl, merge]},
+ {rel, "statsserver", "1.0.1-alpha-dev", [sasl, statsserver]},
{boot_rel, "catlfish"},
{profile, standalone},
{incl_sys_filters, ["^bin/", "^erts-.*/", "^lib"]},
- {incl_app_filters, ["^ebin/", "^priv/", "^src/", "^merge/ebin/", "^merge/src/"]},
+ {incl_app_filters, ["^ebin/", "^priv/", "^src/", "^merge/ebin/", "^merge/src/", "^statsserver/ebin/", "^statsserver/src/"]},
{escript, "./verifycert.erl", [{incl_cond, include}]},
{app, catlfish, [{app_file, all}, {lib_dir, "."}]},
{app, plop, [{app_file, all}, {lib_dir, "../plop"}]},
{app, merge, [{app_file, all}, {lib_dir, "../plop/merge"}]},
+ {app, statsserver, [{app_file, all}, {lib_dir, "../plop/statsserver"}]},
{app, mochiweb, [{app_file, all}, {lib_dir, "../mochiweb"}]},
{app, idna, [{app_file, all}, {lib_dir, "../hackney/deps/idna"}]},
{app, hackney, [{app_file, all}, {lib_dir, "../hackney"}]},
diff --git a/test/catlfish-test-mergefailover.cfg.in b/test/catlfish-test-mergefailover.cfg.in
index 2e2cd1a..7f0fbcb 100644
--- a/test/catlfish-test-mergefailover.cfg.in
+++ b/test/catlfish-test-mergefailover.cfg.in
@@ -38,3 +38,8 @@ backup-quorum-size: 1
storage-quorum-size: 1
mmd: 86400
+
+statsservers:
+ - name: statsserver
+ address: localhost:9081
+ publicaddress: localhost:9082
diff --git a/test/catlfish-test.cfg.in b/test/catlfish-test.cfg.in
index c3b59b0..18724e8 100644
--- a/test/catlfish-test.cfg.in
+++ b/test/catlfish-test.cfg.in
@@ -38,3 +38,8 @@ backup-quorum-size: 1
storage-quorum-size: 1
mmd: 86400
+
+statsservers:
+ - name: statsserver
+ address: localhost:9081
+ publicaddress: localhost:9082
diff --git a/test/scripts/light-system-test.sh b/test/scripts/light-system-test.sh
index 69eb5c7..da91929 100755
--- a/test/scripts/light-system-test.sh
+++ b/test/scripts/light-system-test.sh
@@ -18,11 +18,11 @@ tests_stop() {
}
tests_stop_all() {
- ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing
+ ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing statsserver
}
${SCRIPTS}/light-system-test-prepare.sh
-tests_start signing
+tests_start statsserver signing
tests_start merge_2only frontendexceptlast
${top_srcdir}/tools/initlog.py --config machine/merge-1/catlfish-test.cfg --localconfig machine/merge-1/catlfish-test-local-merge-1.cfg
tests_start mergeprimary
diff --git a/test/scripts/perf-test.sh b/test/scripts/perf-test.sh
index 085de14..0d6daa3 100755
--- a/test/scripts/perf-test.sh
+++ b/test/scripts/perf-test.sh
@@ -18,7 +18,7 @@ tests_stop() {
}
tests_stop_all() {
- ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing
+ ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing statsserver
}
${SCRIPTS}/light-system-test-prepare.sh
@@ -26,7 +26,7 @@ ${top_srcdir}/tools/initlog.py --config machine/merge-1/catlfish-test.cfg --loca
cp ${top_srcdir}/test/known_roots/* known_roots
-tests_start signing
+tests_start signing statsserver
tests_start merge_2only frontendexceptlast mergeprimary
do_merge 0
diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index d5a22df..c5ad00c 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -74,7 +74,7 @@ def parse_address(address):
def get_node_config(nodename, config):
nodetype = []
nodeconfig = {}
- for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]:
+ for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]:
for node in config[t]:
if node["name"] == nodename:
nodetype.append(t)
@@ -125,7 +125,22 @@ def gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, b
if "mergenodes" in nodetype:
(host, port) = get_address(bind_addresses["merge"], nodeconfig["mergenodes"])
https_servers.append((Symbol("frontend_https_api"), host, port, Symbol("frontend")))
- if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]):
+ if "statsservers" in nodetype:
+ (host, port) = get_address(None, nodeconfig["statsservers"])
+ https_servers.append((Symbol("statsserver_https_api"), host, port, Symbol("statsserver")))
+
+ if bind_publicaddress:
+ (publichost, publicport) = parse_address(bind_publicaddress)
+ else:
+ (_, publicport) = parse_address(nodeconfig["statsservers"]["publicaddress"])
+ publichost = "0.0.0.0"
+
+ if bind_publichttpaddress:
+ (publichttphost, publichttpport) = parse_address(bind_publichttpaddress)
+ http_servers.append((Symbol("external_http_api"), publichttphost, publichttpport, Symbol("statsserver")))
+ https_servers.append((Symbol("external_https_api"), publichost, publicport, Symbol("statsserver")))
+
+ if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]):
print >>sys.stderr, "unknown nodetype", nodetype
sys.exit(1)
@@ -269,6 +284,14 @@ def gen_config(nodename, config, localconfig):
(Symbol("https_keyfile"), paths["https_keyfile"]),
]
+ if "statsservers" in nodetype:
+ plopconfig += [
+ (Symbol("https_servers"), https_servers),
+ (Symbol("http_servers"), http_servers),
+ (Symbol("https_certfile"), paths["https_certfile"]),
+ (Symbol("https_keyfile"), paths["https_keyfile"]),
+ ]
+
catlfishconfig.append((Symbol("mmd"), config["mmd"]))
lagerconfig = [
@@ -340,6 +363,8 @@ def gen_config(nodename, config, localconfig):
storagenodeaddresses = ["https://%s/plop/v1/storage/" % node["address"] for node in config["storagenodes"]]
frontendnodenames = [node["name"] for node in config["frontendnodes"]]
frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]]
+ statsservernames = [node["name"] for node in config["statsservers"]]
+ statsserveraddresses = ["https://%s/plop/v1/status/" % node["address"] for node in config["statsservers"]]
allowed_clients = []
allowed_servers = []
@@ -412,6 +437,27 @@ def gen_config(nodename, config, localconfig):
(Symbol("plopcontrol"), plopcontrolfilename),
]
+ reloadableplopconfig.append((Symbol("statsservers"), statsserveraddresses))
+ allowed_servers += [
+ ("/plop/v1/status/merge_dist", statsservernames),
+ ("/plop/v1/status/merge_backup", statsservernames),
+ ("/plop/v1/status/merge_sth", statsservernames),
+ ("/plop/v1/status/merge_fetch", statsservernames),
+ ("/plop/v1/status/storage", statsservernames),
+ ("/plop/v1/status/merge_errors", statsservernames),
+ ]
+
+ if "statsservers" in nodetype:
+ allowed_clients += [
+ ("/plop/v1/status/merge_dist", mergenodenames),
+ ("/plop/v1/status/merge_backup", mergenodenames),
+ ("/plop/v1/status/merge_sth", mergenodenames),
+ ("/plop/v1/status/merge_fetch", mergenodenames),
+ ("/plop/v1/status/merge_errors", mergenodenames),
+ ("/plop/v1/status/storage", list(storagenodenames)),
+ ("/status", Symbol("noauth")),
+ ]
+
reloadableplopconfig += [
(Symbol("allowed_clients"), list(allowed_clients)),
(Symbol("allowed_servers"), list(allowed_servers)),
@@ -463,11 +509,12 @@ def gen_testmakefile(config, testmakefile, shellvars=False):
signingnodenames = set([node["name"] for node in config["signingnodes"]])
mergenodenames = set([node["name"] for node in config["mergenodes"]])
mergesecondarynodenames = set([node["name"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]])
+ statsservernodenames = set([node["name"] for node in config.get("statsservers")])
frontendnodenames_except_last = sorted(frontendnodenames)[:-1]
frontendnodenames_except_first = sorted(frontendnodenames)[1:]
- allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"]
+ allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"] + config["statsservers"]
testaddresses = multivaldict([(node["name"], node["address"]) for node in allnodes])
@@ -480,6 +527,7 @@ def gen_testmakefile(config, testmakefile, shellvars=False):
print_nodevar(configfile, delimiter, "SIGNING", "catlfish", signingnodenames, testaddresses)
print_nodevar(configfile, delimiter, "MERGESECONDARY", "catlfish", mergesecondarynodenames, testaddresses)
print_nodevar(configfile, delimiter, "MERGEPRIMARY", "merge", [config["primarymergenode"]], testaddresses, gentesturl=False)
+ print_nodevar(configfile, delimiter, "STATSSERVER", "statsserver", statsservernodenames, testaddresses)
print >>configfile, "NODES=" + delimiter + " ".join(set([node["name"] for node in allnodes])) + delimiter
@@ -493,8 +541,9 @@ def printnodenames(config):
storagenodenames = set([node["name"] for node in config["storagenodes"]])
signingnodenames = set([node["name"] for node in config["signingnodes"]])
mergenodenames = set([node["name"] for node in config["mergenodes"]])
+ statsservernodenames = set([node["name"] for node in config.get("statsservers")])
- print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames)
+ print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames|statsservernodenames)
def main():
parser = argparse.ArgumentParser(description="")
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index e71d3f1..165dee8 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -15,9 +15,32 @@ from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
hexencode, parse_args, perm
-from certtools import timing_point, write_file, create_ssl_context
+from certtools import timing_point, write_file, create_ssl_context, http_request
+import json
+
+def reportstatus(statsservers, own_key, target, variable, status):
+ for statsserver in statsservers:
+ do_reportstatus(statsserver["name"], "https://%s/" % statsserver["address"], own_key, target, variable, status)
+
+def do_reportstatus(node, baseurl, own_key, target, variable, status):
+ try:
+ result = http_request(baseurl + "plop/v1/status/merge_fetch",
+ json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key,
+ verifynode=node)
+ return json.loads(result)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: reportstatus", e.response
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, target, variable, status
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, e.response
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
-def merge_fetch(args, config, localconfig):
+def merge_fetch(args, config, localconfig, currentsizefile):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
@@ -26,6 +49,8 @@ def merge_fetch(args, config, localconfig):
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
+ assert(localconfig["nodename"] == config["primarymergenode"])
+ statsservers = config.get("statsservers")
timing = timing_point()
logorder = get_logorder(logorderfile)
@@ -54,6 +79,7 @@ def merge_fetch(args, config, localconfig):
new_entries -= certsinlog
print >>sys.stderr, "adding", len(new_entries), "entries"
sys.stderr.flush()
+ reportstatus(statsservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -67,6 +93,8 @@ def merge_fetch(args, config, localconfig):
[paths["verifycert_bin"], paths["knownroots"]],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ noncommitted = 0
+
added_entries = 0
for storagenode in storagenodes:
if storagenode["name"] not in entries_to_fetch:
@@ -83,10 +111,19 @@ def merge_fetch(args, config, localconfig):
entry = entries[ehash]
verify_entry(verifycert, entry, ehash)
chainsdb.add(ehash, entry)
+ noncommitted += 1
add_to_logorder(logorderfile, ehash)
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
+ if noncommitted >= 1000:
+ chainsdb.commit()
+ fsync_logorder(logorderfile)
+ noncommitted = 0
+ tree_size = len(logorder)
+ currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])}
+ write_file(currentsizefile, currentsize)
+ reportstatus(statsservers, own_key, "fetch", "fetched", tree_size)
print >>sys.stderr, added_entries,
sys.stderr.flush()
print >>sys.stderr
@@ -104,6 +141,7 @@ def merge_fetch(args, config, localconfig):
sys.stderr.flush()
tree_size = len(logorder)
+ reportstatus(statsservers, own_key, "fetch", "fetched", tree_size)
if tree_size == 0:
return (0, '')
else:
@@ -125,7 +163,7 @@ def main():
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
- logsize, last_hash = merge_fetch(args, config, localconfig)
+ logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
#print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
write_file(currentsizefile, currentsize)