From 2b9e8c620006705a9459a61c88c7c0cd8fb02212 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 00:52:46 +0100 Subject: Statusserver --- tools/compileconfig.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++---- tools/merge_fetch.py | 44 +++++++++++++++++++++++++++++++++++--- 2 files changed, 94 insertions(+), 7 deletions(-) (limited to 'tools') diff --git a/tools/compileconfig.py b/tools/compileconfig.py index d5a22df..c5ad00c 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -74,7 +74,7 @@ def parse_address(address): def get_node_config(nodename, config): nodetype = [] nodeconfig = {} - for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]: + for t in ["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]: for node in config[t]: if node["name"] == nodename: nodetype.append(t) @@ -125,7 +125,22 @@ def gen_http_servers(nodetype, nodeconfig, bind_addresses, bind_publicaddress, b if "mergenodes" in nodetype: (host, port) = get_address(bind_addresses["merge"], nodeconfig["mergenodes"]) https_servers.append((Symbol("frontend_https_api"), host, port, Symbol("frontend"))) - if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes"]): + if "statsservers" in nodetype: + (host, port) = get_address(None, nodeconfig["statsservers"]) + https_servers.append((Symbol("statsserver_https_api"), host, port, Symbol("statsserver"))) + + if bind_publicaddress: + (publichost, publicport) = parse_address(bind_publicaddress) + else: + (_, publicport) = parse_address(nodeconfig["statsservers"]["publicaddress"]) + publichost = "0.0.0.0" + + if bind_publichttpaddress: + (publichttphost, publichttpport) = parse_address(bind_publichttpaddress) + http_servers.append((Symbol("external_http_api"), publichttphost, publichttpport, Symbol("statsserver"))) + https_servers.append((Symbol("external_https_api"), publichost, publicport, Symbol("statsserver"))) + + if nodetype - set(["frontendnodes", "storagenodes", "signingnodes", "mergenodes", "statsservers"]): print >>sys.stderr, "unknown nodetype", nodetype sys.exit(1) @@ -269,6 +284,14 @@ def gen_config(nodename, config, localconfig): (Symbol("https_keyfile"), paths["https_keyfile"]), ] + if "statsservers" in nodetype: + plopconfig += [ + (Symbol("https_servers"), https_servers), + (Symbol("http_servers"), http_servers), + (Symbol("https_certfile"), paths["https_certfile"]), + (Symbol("https_keyfile"), paths["https_keyfile"]), + ] + catlfishconfig.append((Symbol("mmd"), config["mmd"])) lagerconfig = [ @@ -340,6 +363,8 @@ def gen_config(nodename, config, localconfig): storagenodeaddresses = ["https://%s/plop/v1/storage/" % node["address"] for node in config["storagenodes"]] frontendnodenames = [node["name"] for node in config["frontendnodes"]] frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]] + statsservernames = [node["name"] for node in config["statsservers"]] + statsserveraddresses = ["https://%s/plop/v1/status/" % node["address"] for node in config["statsservers"]] allowed_clients = [] allowed_servers = [] @@ -412,6 +437,27 @@ def gen_config(nodename, config, localconfig): (Symbol("plopcontrol"), plopcontrolfilename), ] + reloadableplopconfig.append((Symbol("statsservers"), statsserveraddresses)) + allowed_servers += [ + ("/plop/v1/status/merge_dist", statsservernames), + ("/plop/v1/status/merge_backup", statsservernames), + ("/plop/v1/status/merge_sth", statsservernames), + ("/plop/v1/status/merge_fetch", statsservernames), + ("/plop/v1/status/storage", statsservernames), + ("/plop/v1/status/merge_errors", statsservernames), + ] + + if "statsservers" in nodetype: + allowed_clients += [ + ("/plop/v1/status/merge_dist", mergenodenames), + ("/plop/v1/status/merge_backup", mergenodenames), + ("/plop/v1/status/merge_sth", mergenodenames), + ("/plop/v1/status/merge_fetch", mergenodenames), + ("/plop/v1/status/merge_errors", mergenodenames), + ("/plop/v1/status/storage", list(storagenodenames)), + ("/status", Symbol("noauth")), + ] + reloadableplopconfig += [ (Symbol("allowed_clients"), list(allowed_clients)), (Symbol("allowed_servers"), list(allowed_servers)), @@ -463,11 +509,12 @@ def gen_testmakefile(config, testmakefile, shellvars=False): signingnodenames = set([node["name"] for node in config["signingnodes"]]) mergenodenames = set([node["name"] for node in config["mergenodes"]]) mergesecondarynodenames = set([node["name"] for node in config["mergenodes"] if node["name"] != config["primarymergenode"]]) + statsservernodenames = set([node["name"] for node in config.get("statsservers")]) frontendnodenames_except_last = sorted(frontendnodenames)[:-1] frontendnodenames_except_first = sorted(frontendnodenames)[1:] - allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"] + allnodes = config["frontendnodes"] + config["storagenodes"] + config["signingnodes"] + config["mergenodes"] + config["statsservers"] testaddresses = multivaldict([(node["name"], node["address"]) for node in allnodes]) @@ -480,6 +527,7 @@ def gen_testmakefile(config, testmakefile, shellvars=False): print_nodevar(configfile, delimiter, "SIGNING", "catlfish", signingnodenames, testaddresses) print_nodevar(configfile, delimiter, "MERGESECONDARY", "catlfish", mergesecondarynodenames, testaddresses) print_nodevar(configfile, delimiter, "MERGEPRIMARY", "merge", [config["primarymergenode"]], testaddresses, gentesturl=False) + print_nodevar(configfile, delimiter, "STATSSERVER", "statsserver", statsservernodenames, testaddresses) print >>configfile, "NODES=" + delimiter + " ".join(set([node["name"] for node in allnodes])) + delimiter @@ -493,8 +541,9 @@ def printnodenames(config): storagenodenames = set([node["name"] for node in config["storagenodes"]]) signingnodenames = set([node["name"] for node in config["signingnodes"]]) mergenodenames = set([node["name"] for node in config["mergenodes"]]) + statsservernodenames = set([node["name"] for node in config.get("statsservers")]) - print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames) + print " ".join(frontendnodenames|storagenodenames|signingnodenames|mergenodenames|statsservernodenames) def main(): parser = argparse.ArgumentParser(description="") diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index e71d3f1..165dee8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -15,9 +15,32 @@ from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, parse_args, perm -from certtools import timing_point, write_file, create_ssl_context +from certtools import timing_point, write_file, create_ssl_context, http_request +import json + +def reportstatus(statsservers, own_key, target, variable, status): + for statsserver in statsservers: + do_reportstatus(statsserver["name"], "https://%s/" % statsserver["address"], own_key, target, variable, status) + +def do_reportstatus(node, baseurl, own_key, target, variable, status): + try: + result = http_request(baseurl + "plop/v1/status/merge_fetch", + json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, variable, status + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e -def merge_fetch(args, config, localconfig): +def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] @@ -26,6 +49,8 @@ def merge_fetch(args, config, localconfig): own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + assert(localconfig["nodename"] == config["primarymergenode"]) + statsservers = config.get("statsservers") timing = timing_point() logorder = get_logorder(logorderfile) @@ -54,6 +79,7 @@ def merge_fetch(args, config, localconfig): new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" sys.stderr.flush() + reportstatus(statsservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -67,6 +93,8 @@ def merge_fetch(args, config, localconfig): [paths["verifycert_bin"], paths["knownroots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + noncommitted = 0 + added_entries = 0 for storagenode in storagenodes: if storagenode["name"] not in entries_to_fetch: @@ -83,10 +111,19 @@ def merge_fetch(args, config, localconfig): entry = entries[ehash] verify_entry(verifycert, entry, ehash) chainsdb.add(ehash, entry) + noncommitted += 1 add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 + if noncommitted >= 1000: + chainsdb.commit() + fsync_logorder(logorderfile) + noncommitted = 0 + tree_size = len(logorder) + currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])} + write_file(currentsizefile, currentsize) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) print >>sys.stderr, added_entries, sys.stderr.flush() print >>sys.stderr @@ -104,6 +141,7 @@ def merge_fetch(args, config, localconfig): sys.stderr.flush() tree_size = len(logorder) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) if tree_size == 0: return (0, '') else: @@ -125,7 +163,7 @@ def main(): create_ssl_context(cafile=paths["https_cacertfile"]) while True: - logsize, last_hash = merge_fetch(args, config, localconfig) + logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize write_file(currentsizefile, currentsize) -- cgit v1.1