summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-xtools/merge_fetch.py46
1 files changed, 44 insertions, 2 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index b0b6ce6..fb5cbb8 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -41,6 +41,43 @@ def do_reportstatus(node, baseurl, own_key, target, variable, status):
sys.stderr.flush()
raise e
+report_timing_point_counter = 0
+
+def report_timing_point(timer_dict, statusservers, own_key, target):
+ if timer_dict["deltatimes"]:
+ (name, deltatime) = timer_dict["deltatimes"][-1]
+ stoptime = timer_dict["lasttime"]
+ starttime = stoptime - deltatime
+ global report_timing_point_counter
+ report_timing_point_counter += 1
+
+ starttime_unix = starttime * 1000.0
+
+ reportbench(statusservers, own_key, target, name, report_timing_point_counter, starttime_unix, deltatime*1000.0)
+
+def reportbench(statusservers, own_key, target, tag, seq, starttime, elapsed):
+ for statusserver in statusservers:
+ do_reportbench(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, tag, seq, starttime, elapsed)
+
+def do_reportbench(node, baseurl, own_key, target, tag, seq, starttime, elapsed):
+ try:
+ (ownname, _) = own_key
+ result = http_request(baseurl + "plop/v1/bench/merge_fetch",
+ json.dumps([{"source":ownname, "target":target, "tag": tag, "seq": seq, "starttime": starttime, "elapsed": elapsed}]), key=own_key,
+ verifynode=node)
+ return json.loads(result)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: reportstatus", e.response
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, target, tag, seq, starttime, elapsed
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, e.response
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def merge_fetch(args, config, localconfig, currentsizefile):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
@@ -53,12 +90,15 @@ def merge_fetch(args, config, localconfig, currentsizefile):
assert(localconfig["nodename"] == config["primarymergenode"])
statusservers = config.get("statusservers")
timing = timing_point()
+ report_timing_point(timing, statusservers, own_key, None)
logorder = get_logorder(logorderfile)
- timing_point(timing, "get logorder")
certsinlog = set(logorder)
+ timing_point(timing, "get logorder")
+ report_timing_point(timing, statusservers, own_key, None)
+
new_entries_per_node = {}
new_entries = set()
entries_to_fetch = {}
@@ -75,7 +115,8 @@ def merge_fetch(args, config, localconfig, currentsizefile):
entries_to_fetch[storagenode["name"]] = []
except requests.exceptions.ConnectionError:
pass
- timing_point(timing, "get new entries")
+ timing_point(timing, "get new entries")
+ report_timing_point(timing, statusservers, own_key, storagenode["name"])
new_entries -= certsinlog
print >>sys.stderr, "adding", len(new_entries), "entries"
@@ -132,6 +173,7 @@ def merge_fetch(args, config, localconfig, currentsizefile):
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
+ report_timing_point(timing, statusservers, own_key, None)
print >>sys.stderr, "added", added_entries, "entries"
sys.stderr.flush()