diff options
author | Magnus Ahltorp <map@kth.se> | 2017-03-17 00:58:54 +0100 |
---|---|---|
committer | Magnus Ahltorp <map@kth.se> | 2017-03-17 14:45:56 +0100 |
commit | 8fb19e1dd19998b7e5b2cff9031eaf52dac46b51 (patch) | |
tree | 404a493eb094747dddcdd414f05e53b30da43b5b /tools/merge_fetch.py | |
parent | abe1da31a293c2765b4bb3fca42a08cdf336fcc7 (diff) |
Added benchmark reporting.
Change timing_point to time.time interface.
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-x | tools/merge_fetch.py | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index b0b6ce6..fb5cbb8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -41,6 +41,43 @@ def do_reportstatus(node, baseurl, own_key, target, variable, status): sys.stderr.flush() raise e +report_timing_point_counter = 0 + +def report_timing_point(timer_dict, statusservers, own_key, target): + if timer_dict["deltatimes"]: + (name, deltatime) = timer_dict["deltatimes"][-1] + stoptime = timer_dict["lasttime"] + starttime = stoptime - deltatime + global report_timing_point_counter + report_timing_point_counter += 1 + + starttime_unix = starttime * 1000.0 + + reportbench(statusservers, own_key, target, name, report_timing_point_counter, starttime_unix, deltatime*1000.0) + +def reportbench(statusservers, own_key, target, tag, seq, starttime, elapsed): + for statusserver in statusservers: + do_reportbench(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, tag, seq, starttime, elapsed) + +def do_reportbench(node, baseurl, own_key, target, tag, seq, starttime, elapsed): + try: + (ownname, _) = own_key + result = http_request(baseurl + "plop/v1/bench/merge_fetch", + json.dumps([{"source":ownname, "target":target, "tag": tag, "seq": seq, "starttime": starttime, "elapsed": elapsed}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, tag, seq, starttime, elapsed + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e + def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] @@ -53,12 +90,15 @@ def merge_fetch(args, config, localconfig, currentsizefile): assert(localconfig["nodename"] == config["primarymergenode"]) statusservers = config.get("statusservers") timing = timing_point() + report_timing_point(timing, statusservers, own_key, None) logorder = get_logorder(logorderfile) - timing_point(timing, "get logorder") certsinlog = set(logorder) + timing_point(timing, "get logorder") + report_timing_point(timing, statusservers, own_key, None) + new_entries_per_node = {} new_entries = set() entries_to_fetch = {} @@ -75,7 +115,8 @@ def merge_fetch(args, config, localconfig, currentsizefile): entries_to_fetch[storagenode["name"]] = [] except requests.exceptions.ConnectionError: pass - timing_point(timing, "get new entries") + timing_point(timing, "get new entries") + report_timing_point(timing, statusservers, own_key, storagenode["name"]) new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" @@ -132,6 +173,7 @@ def merge_fetch(args, config, localconfig, currentsizefile): chainsdb.commit() fsync_logorder(logorderfile) timing_point(timing, "add entries") + report_timing_point(timing, statusservers, own_key, None) print >>sys.stderr, "added", added_entries, "entries" sys.stderr.flush() |