diff options
-rw-r--r-- | tools/certtools.py | 6 | ||||
-rwxr-xr-x | tools/compileconfig.py | 11 | ||||
-rwxr-xr-x | tools/merge_fetch.py | 46 |
3 files changed, 58 insertions, 5 deletions
diff --git a/tools/certtools.py b/tools/certtools.py index cb56a8d..84d0bd9 100644 --- a/tools/certtools.py +++ b/tools/certtools.py @@ -18,6 +18,7 @@ import shutil import requests import warnings import logging +import time from time import sleep from certkeys import publickeys @@ -446,13 +447,12 @@ def get_leaf_hash(merkle_tree_leaf): return leaf_hash.digest() def timing_point(timer_dict=None, name=None): - t = datetime.datetime.now() + t = time.time() if timer_dict: starttime = timer_dict["lasttime"] stoptime = t deltatime = stoptime - starttime - microseconds = deltatime.seconds * 1000000 + deltatime.microseconds - timer_dict["deltatimes"].append((name, microseconds)) + timer_dict["deltatimes"].append((name, deltatime)) timer_dict["lasttime"] = t #print name, microseconds/1000000.0 return None diff --git a/tools/compileconfig.py b/tools/compileconfig.py index c2463df..de790ba 100755 --- a/tools/compileconfig.py +++ b/tools/compileconfig.py @@ -365,6 +365,7 @@ def gen_config(nodename, config, localconfig): frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]] statusservernames = [node["name"] for node in config["statusservers"]] statusserveraddresses = ["https://%s/plop/v1/status/" % node["address"] for node in config["statusservers"]] + benchserveraddresses = ["https://%s/plop/v1/bench/" % node["address"] for node in config["statusservers"]] allowed_clients = [] allowed_servers = [] @@ -440,6 +441,7 @@ def gen_config(nodename, config, localconfig): ] reloadableplopconfig.append((Symbol("statusservers"), statusserveraddresses)) + reloadableplopconfig.append((Symbol("benchservers"), benchserveraddresses)) allowed_servers += [ ("/plop/v1/status/merge_dist", statusservernames), ("/plop/v1/status/merge_backup", statusservernames), @@ -448,6 +450,10 @@ def gen_config(nodename, config, localconfig): ("/plop/v1/status/storage", statusservernames), ("/plop/v1/status/merge_errors", statusservernames), ("/plop/v1/status/heartbeat", statusservernames), + ("/plop/v1/bench/merge_dist", statusservernames), + ("/plop/v1/bench/merge_backup", statusservernames), + ("/plop/v1/bench/merge_sth", statusservernames), + ("/plop/v1/bench/merge_fetch", statusservernames), ] if "statusservers" in nodetype: @@ -459,7 +465,12 @@ def gen_config(nodename, config, localconfig): ("/plop/v1/status/merge_errors", mergenodenames), ("/plop/v1/status/storage", storagenodenames), ("/plop/v1/status/heartbeat", list(allnodenames)), + ("/plop/v1/bench/merge_dist", mergenodenames), + ("/plop/v1/bench/merge_backup", mergenodenames), + ("/plop/v1/bench/merge_sth", mergenodenames), + ("/plop/v1/bench/merge_fetch", mergenodenames), ("/status", Symbol("noauth")), + ("/bench", Symbol("noauth")), ] reloadableplopconfig += [ diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index b0b6ce6..fb5cbb8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -41,6 +41,43 @@ def do_reportstatus(node, baseurl, own_key, target, variable, status): sys.stderr.flush() raise e +report_timing_point_counter = 0 + +def report_timing_point(timer_dict, statusservers, own_key, target): + if timer_dict["deltatimes"]: + (name, deltatime) = timer_dict["deltatimes"][-1] + stoptime = timer_dict["lasttime"] + starttime = stoptime - deltatime + global report_timing_point_counter + report_timing_point_counter += 1 + + starttime_unix = starttime * 1000.0 + + reportbench(statusservers, own_key, target, name, report_timing_point_counter, starttime_unix, deltatime*1000.0) + +def reportbench(statusservers, own_key, target, tag, seq, starttime, elapsed): + for statusserver in statusservers: + do_reportbench(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, tag, seq, starttime, elapsed) + +def do_reportbench(node, baseurl, own_key, target, tag, seq, starttime, elapsed): + try: + (ownname, _) = own_key + result = http_request(baseurl + "plop/v1/bench/merge_fetch", + json.dumps([{"source":ownname, "target":target, "tag": tag, "seq": seq, "starttime": starttime, "elapsed": elapsed}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, tag, seq, starttime, elapsed + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] @@ -53,12 +90,15 @@ def merge_fetch(args, config, localconfig, currentsizefile): assert(localconfig["nodename"] == config["primarymergenode"]) statusservers = config.get("statusservers") timing = timing_point() + report_timing_point(timing, statusservers, own_key, None) logorder = get_logorder(logorderfile) - timing_point(timing, "get logorder") certsinlog = set(logorder) + timing_point(timing, "get logorder") + report_timing_point(timing, statusservers, own_key, None) + new_entries_per_node = {} new_entries = set() entries_to_fetch = {} @@ -75,7 +115,8 @@ def merge_fetch(args, config, localconfig, currentsizefile): entries_to_fetch[storagenode["name"]] = [] except requests.exceptions.ConnectionError: pass - timing_point(timing, "get new entries") + timing_point(timing, "get new entries") + report_timing_point(timing, statusservers, own_key, storagenode["name"]) new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" @@ -132,6 +173,7 @@ def merge_fetch(args, config, localconfig, currentsizefile): chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") + report_timing_point(timing, statusservers, own_key, None) print >>sys.stderr, "added", added_entries, "entries" sys.stderr.flush() |