From 49445bfe1985054ee18691bf27c4033285401ddb Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Fri, 17 Mar 2017 00:58:54 +0100 Subject: Added benchmark reporting. Change timing_point to time.time interface. --- tools/merge_fetch.py | 46 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) (limited to 'tools/merge_fetch.py') diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index b0b6ce6..fb5cbb8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -41,6 +41,43 @@ def do_reportstatus(node, baseurl, own_key, target, variable, status): sys.stderr.flush() raise e +report_timing_point_counter = 0 + +def report_timing_point(timer_dict, statusservers, own_key, target): + if timer_dict["deltatimes"]: + (name, deltatime) = timer_dict["deltatimes"][-1] + stoptime = timer_dict["lasttime"] + starttime = stoptime - deltatime + global report_timing_point_counter + report_timing_point_counter += 1 + + starttime_unix = starttime * 1000.0 + + reportbench(statusservers, own_key, target, name, report_timing_point_counter, starttime_unix, deltatime*1000.0) + +def reportbench(statusservers, own_key, target, tag, seq, starttime, elapsed): + for statusserver in statusservers: + do_reportbench(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, tag, seq, starttime, elapsed) + +def do_reportbench(node, baseurl, own_key, target, tag, seq, starttime, elapsed): + try: + (ownname, _) = own_key + result = http_request(baseurl + "plop/v1/bench/merge_fetch", + json.dumps([{"source":ownname, "target":target, "tag": tag, "seq": seq, "starttime": starttime, "elapsed": elapsed}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, tag, seq, starttime, elapsed + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] @@ -53,12 +90,15 @@ def merge_fetch(args, config, localconfig, currentsizefile): assert(localconfig["nodename"] == config["primarymergenode"]) statusservers = config.get("statusservers") timing = timing_point() + report_timing_point(timing, statusservers, own_key, None) logorder = get_logorder(logorderfile) - timing_point(timing, "get logorder") certsinlog = set(logorder) + timing_point(timing, "get logorder") + report_timing_point(timing, statusservers, own_key, None) + new_entries_per_node = {} new_entries = set() entries_to_fetch = {} @@ -75,7 +115,8 @@ def merge_fetch(args, config, localconfig, currentsizefile): entries_to_fetch[storagenode["name"]] = [] except requests.exceptions.ConnectionError: pass - timing_point(timing, "get new entries") + timing_point(timing, "get new entries") + report_timing_point(timing, statusservers, own_key, storagenode["name"]) new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" @@ -132,6 +173,7 @@ def merge_fetch(args, config, localconfig, currentsizefile): chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") + report_timing_point(timing, statusservers, own_key, None) print >>sys.stderr, "added", added_entries, "entries" sys.stderr.flush() -- cgit v1.1