diff options
-rw-r--r-- | reltool.config | 4 | ||||
-rw-r--r-- | test/catlfish-test-mergefailover.cfg.in | 5 | ||||
-rw-r--r-- | test/catlfish-test.cfg.in | 5 | ||||
-rwxr-xr-x | test/scripts/light-system-test.sh | 4 | ||||
-rwxr-xr-x | test/scripts/perf-test.sh | 4 | ||||
-rwxr-xr-x | tools/compileconfig.py | 57 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 44 |
7 files changed, 111 insertions, 12 deletions
diff --git a/reltool.config b/reltool.config index 759d67b..2de8936 100644 --- a/reltool.config +++ b/reltool.config @@ -4,14 +4,16 @@ {app_file, strip}, {rel, "catlfish", "1.0.1-alpha-dev", [sasl, catlfish]}, {rel, "merge", "1.0.1-alpha-dev", [sasl, merge]}, + {rel, "statsserver", "1.0.1-alpha-dev", [sasl, statsserver]}, {boot_rel, "catlfish"}, {profile, standalone}, {incl_sys_filters, ["^bin/", "^erts-.*/", "^lib"]}, - {incl_app_filters, ["^ebin/", "^priv/", "^src/", "^merge/ebin/", "^merge/src/"]}, + {incl_app_filters, ["^ebin/", "^priv/", "^src/", "^merge/ebin/", "^merge/src/", "^statsserver/ebin/", "^statsserver/src/"]}, {escript, "./verifycert.erl", [{incl_cond, include}]}, {app, catlfish, [{app_file, all}, {lib_dir, "."}]}, {app, plop, [{app_file, all}, {lib_dir, "../plop"}]}, {app, merge, [{app_file, all}, {lib_dir, "../plop/merge"}]}, + {app, statsserver, [{app_file, all}, {lib_dir, "../plop/statsserver"}]}, {app, mochiweb, [{app_file, all}, {lib_dir, "../mochiweb"}]}, {app, idna, [{app_file, all}, {lib_dir, "../hackney/deps/idna"}]}, {app, hackney, [{app_file, all}, {lib_dir, "../hackney"}]}, diff --git a/test/catlfish-test-mergefailover.cfg.in b/test/catlfish-test-mergefailover.cfg.in index 2e2cd1a..7f0fbcb 100644 --- a/test/catlfish-test-mergefailover.cfg.in +++ b/test/catlfish-test-mergefailover.cfg.in @@ -38,3 +38,8 @@ backup-quorum-size: 1 storage-quorum-size: 1 mmd: 86400 + +statsservers: + - name: statsserver + address: localhost:9081 + publicaddress: localhost:9082 diff --git a/test/catlfish-test.cfg.in b/test/catlfish-test.cfg.in index c3b59b0..18724e8 100644 --- a/test/catlfish-test.cfg.in +++ b/test/catlfish-test.cfg.in @@ -38,3 +38,8 @@ backup-quorum-size: 1 storage-quorum-size: 1 mmd: 86400 + +statsservers: + - name: statsserver + address: localhost:9081 + publicaddress: localhost:9082 diff --git a/test/scripts/light-system-test.sh b/test/scripts/light-system-test.sh index 69eb5c7..da91929 100755 --- a/test/scripts/light-system-test.sh +++ b/test/scripts/light-system-test.sh @@ -18,11 +18,11 @@ tests_stop() { } tests_stop_all() { - ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing + ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing statsserver } ${SCRIPTS}/light-system-test-prepare.sh -tests_start signing +tests_start statsserver signing tests_start merge_2only frontendexceptlast ${top_srcdir}/tools/initlog.py --config machine/merge-1/catlfish-test.cfg --localconfig machine/merge-1/catlfish-test-local-merge-1.cfg tests_start mergeprimary diff --git a/test/scripts/perf-test.sh b/test/scripts/perf-test.sh index 085de14..0d6daa3 100755 --- a/test/scripts/perf-test.sh +++ b/test/scripts/perf-test.sh @@ -18,7 +18,7 @@ tests_stop() { } tests_stop_all() { - ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing + ${SCRIPTS}/light-system-test-stop.sh mergeprimary mergesecondary frontend signing statsserver } ${SCRIPTS}/light-system-test-prepare.sh @@ -26,7 +26,7 @@ ${top_srcdir}/tools/initlog.py --config machine/merge-1/catlfish-test.cfg --loca cp ${top_srcdir}/test/known_roots/* known_roots -tests_start signing +tests_start signing statsserver tests_start merge_2only frontendexceptlast mergeprimary do_merge 0 diff --git a/tools/compileconfig.py b/tools/compileconfig.py index d5a22df..c5ad00c 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -74,7 +74,7 @@ def parse_address(address): def get_node_config(nodename, config): nodetype = [] nodeconfig = {} - for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]: + for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]: for node in config[t]: if node["name"] == nodename: nodetype.append(t) @@ -125,7 +125,22 @@ def gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, b if "mergenodes" in nodetype: (host, port) = get_address(bind_addresses["merge"], nodeconfig["mergenodes"]) https_servers.append((Symbol("frontend_https_api"), host, port, Symbol("frontend"))) - if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]): + if "statsservers" in nodetype: + (host, port) = get_address(None, nodeconfig["statsservers"]) + https_servers.append((Symbol("statsserver_https_api"), host, port, Symbol("statsserver"))) + + if bind_publicaddress: + (publichost, publicport) = parse_address(bind_publicaddress) + else: + (_, publicport) = parse_address(nodeconfig["statsservers"]["publicaddress"]) + publichost = "0.0.0.0" + + if bind_publichttpaddress: + (publichttphost, publichttpport) = parse_address(bind_publichttpaddress) + http_servers.append((Symbol("external_http_api"), publichttphost, publichttpport, Symbol("statsserver"))) + https_servers.append((Symbol("external_https_api"), publichost, publicport, Symbol("statsserver"))) + + if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]): print >>sys.stderr, "unknown nodetype", nodetype sys.exit(1) @@ -269,6 +284,14 @@ def gen_config(nodename, config, localconfig): (Symbol("https_keyfile"), paths["https_keyfile"]), ] + if "statsservers" in nodetype: + plopconfig += [ + (Symbol("https_servers"), https_servers), + (Symbol("http_servers"), http_servers), + (Symbol("https_certfile"), paths["https_certfile"]), + (Symbol("https_keyfile"), paths["https_keyfile"]), + ] + catlfishconfig.append((Symbol("mmd"), config["mmd"])) lagerconfig = [ @@ -340,6 +363,8 @@ def gen_config(nodename, config, localconfig): storagenodeaddresses = ["https://%s/plop/v1/storage/" % node["address"] for node in config["storagenodes"]] frontendnodenames = [node["name"] for node in config["frontendnodes"]] frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]] + statsservernames = [node["name"] for node in config["statsservers"]] + statsserveraddresses = ["https://%s/plop/v1/status/" % node["address"] for node in config["statsservers"]] allowed_clients = [] allowed_servers = [] @@ -412,6 +437,27 @@ def gen_config(nodename, config, localconfig): (Symbol("plopcontrol"), plopcontrolfilename), ] + reloadableplopconfig.append((Symbol("statsservers"), statsserveraddresses)) + allowed_servers += [ + ("/plop/v1/status/merge_dist", statsservernames), + ("/plop/v1/status/merge_backup", statsservernames), + ("/plop/v1/status/merge_sth", statsservernames), + ("/plop/v1/status/merge_fetch", statsservernames), + ("/plop/v1/status/storage", statsservernames), + ("/plop/v1/status/merge_errors", statsservernames), + ] + + if "statsservers" in nodetype: + allowed_clients += [ + ("/plop/v1/status/merge_dist", mergenodenames), + ("/plop/v1/status/merge_backup", mergenodenames), + ("/plop/v1/status/merge_sth", mergenodenames), + ("/plop/v1/status/merge_fetch", mergenodenames), + ("/plop/v1/status/merge_errors", mergenodenames), + ("/plop/v1/status/storage", list(storagenodenames)), + ("/status", Symbol("noauth")), + ] + reloadableplopconfig += [ (Symbol("allowed_clients"), list(allowed_clients)), (Symbol("allowed_servers"), list(allowed_servers)), @@ -463,11 +509,12 @@ def gen_testmakefile(config, testmakefile, shellvars=False): signingnodenames = set([node["name"] for node in config["signingnodes"]]) mergenodenames = set([node["name"] for node in config["mergenodes"]]) mergesecondarynodenames = set([node["name"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]]) + statsservernodenames = set([node["name"] for node in config.get("statsservers")]) frontendnodenames_except_last = sorted(frontendnodenames)[:-1] frontendnodenames_except_first = sorted(frontendnodenames)[1:] - allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"] + allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"] + config["statsservers"] testaddresses = multivaldict([(node["name"], node["address"]) for node in allnodes]) @@ -480,6 +527,7 @@ def gen_testmakefile(config, testmakefile, shellvars=False): print_nodevar(configfile, delimiter, "SIGNING", "catlfish", signingnodenames, testaddresses) print_nodevar(configfile, delimiter, "MERGESECONDARY", "catlfish", mergesecondarynodenames, testaddresses) print_nodevar(configfile, delimiter, "MERGEPRIMARY", "merge", [config["primarymergenode"]], testaddresses, gentesturl=False) + print_nodevar(configfile, delimiter, "STATSSERVER", "statsserver", statsservernodenames, testaddresses) print >>configfile, "NODES=" + delimiter + " ".join(set([node["name"] for node in allnodes])) + delimiter @@ -493,8 +541,9 @@ def printnodenames(config): storagenodenames = set([node["name"] for node in config["storagenodes"]]) signingnodenames = set([node["name"] for node in config["signingnodes"]]) mergenodenames = set([node["name"] for node in config["mergenodes"]]) + statsservernodenames = set([node["name"] for node in config.get("statsservers")]) - print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames) + print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames|statsservernodenames) def main(): parser = argparse.ArgumentParser(description="") diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index e71d3f1..165dee8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -15,9 +15,32 @@ from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, parse_args, perm -from certtools import timing_point, write_file, create_ssl_context +from certtools import timing_point, write_file, create_ssl_context, http_request +import json + +def reportstatus(statsservers, own_key, target, variable, status): + for statsserver in statsservers: + do_reportstatus(statsserver["name"], "https://%s/" % statsserver["address"], own_key, target, variable, status) + +def do_reportstatus(node, baseurl, own_key, target, variable, status): + try: + result = http_request(baseurl + "plop/v1/status/merge_fetch", + json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, variable, status + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e -def merge_fetch(args, config, localconfig): +def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] @@ -26,6 +49,8 @@ def merge_fetch(args, config, localconfig): own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + assert(localconfig["nodename"] == config["primarymergenode"]) + statsservers = config.get("statsservers") timing = timing_point() logorder = get_logorder(logorderfile) @@ -54,6 +79,7 @@ def merge_fetch(args, config, localconfig): new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" sys.stderr.flush() + reportstatus(statsservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -67,6 +93,8 @@ def merge_fetch(args, config, localconfig): [paths["verifycert_bin"], paths["knownroots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + noncommitted = 0 + added_entries = 0 for storagenode in storagenodes: if storagenode["name"] not in entries_to_fetch: @@ -83,10 +111,19 @@ def merge_fetch(args, config, localconfig): entry = entries[ehash] verify_entry(verifycert, entry, ehash) chainsdb.add(ehash, entry) + noncommitted += 1 add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 + if noncommitted >= 1000: + chainsdb.commit() + fsync_logorder(logorderfile) + noncommitted = 0 + tree_size = len(logorder) + currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])} + write_file(currentsizefile, currentsize) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) print >>sys.stderr, added_entries, sys.stderr.flush() print >>sys.stderr @@ -104,6 +141,7 @@ def merge_fetch(args, config, localconfig): sys.stderr.flush() tree_size = len(logorder) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) if tree_size == 0: return (0, '') else: @@ -125,7 +163,7 @@ def main(): create_ssl_context(cafile=paths["https_cacertfile"]) while True: - logsize, last_hash = merge_fetch(args, config, localconfig) + logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize write_file(currentsizefile, currentsize) |