summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--tools/certtools.py6
-rwxr-xr-xtools/compileconfig.py11
-rwxr-xr-xtools/merge_fetch.py46
3 files changed, 58 insertions, 5 deletions
diff --git a/tools/certtools.py b/tools/certtools.py
index cb56a8d..84d0bd9 100644
--- a/tools/certtools.py
+++ b/tools/certtools.py
@@ -18,6 +18,7 @@ import shutil
import requests
import warnings
import logging
+import time
from time import sleep
from certkeys import publickeys
@@ -446,13 +447,12 @@ def get_leaf_hash(merkle_tree_leaf):
return leaf_hash.digest()
def timing_point(timer_dict=None, name=None):
- t = datetime.datetime.now()
+ t = time.time()
if timer_dict:
starttime = timer_dict["lasttime"]
stoptime = t
deltatime = stoptime - starttime
- microseconds = deltatime.seconds * 1000000 + deltatime.microseconds
- timer_dict["deltatimes"].append((name, microseconds))
+ timer_dict["deltatimes"].append((name, deltatime))
timer_dict["lasttime"] = t
#print name, microseconds/1000000.0
return None
diff --git a/tools/compileconfig.py b/tools/compileconfig.py
index c2463df..de790ba 100755
--- a/tools/compileconfig.py
+++ b/tools/compileconfig.py
@@ -365,6 +365,7 @@ def gen_config(nodename, config, localconfig):
frontendnodeaddresses = ["https://%s/plop/v1/frontend/" % node["address"] for node in config["frontendnodes"]]
statusservernames = [node["name"] for node in config["statusservers"]]
statusserveraddresses = ["https://%s/plop/v1/status/" % node["address"] for node in config["statusservers"]]
+ benchserveraddresses = ["https://%s/plop/v1/bench/" % node["address"] for node in config["statusservers"]]
allowed_clients = []
allowed_servers = []
@@ -440,6 +441,7 @@ def gen_config(nodename, config, localconfig):
]
reloadableplopconfig.append((Symbol("statusservers"), statusserveraddresses))
+ reloadableplopconfig.append((Symbol("benchservers"), benchserveraddresses))
allowed_servers += [
("/plop/v1/status/merge_dist", statusservernames),
("/plop/v1/status/merge_backup", statusservernames),
@@ -448,6 +450,10 @@ def gen_config(nodename, config, localconfig):
("/plop/v1/status/storage", statusservernames),
("/plop/v1/status/merge_errors", statusservernames),
("/plop/v1/status/heartbeat", statusservernames),
+ ("/plop/v1/bench/merge_dist", statusservernames),
+ ("/plop/v1/bench/merge_backup", statusservernames),
+ ("/plop/v1/bench/merge_sth", statusservernames),
+ ("/plop/v1/bench/merge_fetch", statusservernames),
]
if "statusservers" in nodetype:
@@ -459,7 +465,12 @@ def gen_config(nodename, config, localconfig):
("/plop/v1/status/merge_errors", mergenodenames),
("/plop/v1/status/storage", storagenodenames),
("/plop/v1/status/heartbeat", list(allnodenames)),
+ ("/plop/v1/bench/merge_dist", mergenodenames),
+ ("/plop/v1/bench/merge_backup", mergenodenames),
+ ("/plop/v1/bench/merge_sth", mergenodenames),
+ ("/plop/v1/bench/merge_fetch", mergenodenames),
("/status", Symbol("noauth")),
+ ("/bench", Symbol("noauth")),
]
reloadableplopconfig += [
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index b0b6ce6..fb5cbb8 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -41,6 +41,43 @@ def do_reportstatus(node, baseurl, own_key, target, variable, status):
sys.stderr.flush()
raise e
+report_timing_point_counter = 0
+
+def report_timing_point(timer_dict, statusservers, own_key, target):
+ if timer_dict["deltatimes"]:
+ (name, deltatime) = timer_dict["deltatimes"][-1]
+ stoptime = timer_dict["lasttime"]
+ starttime = stoptime - deltatime
+ global report_timing_point_counter
+ report_timing_point_counter += 1
+
+ starttime_unix = starttime * 1000.0
+
+ reportbench(statusservers, own_key, target, name, report_timing_point_counter, starttime_unix, deltatime*1000.0)
+
+def reportbench(statusservers, own_key, target, tag, seq, starttime, elapsed):
+ for statusserver in statusservers:
+ do_reportbench(statusserver["name"], "https://%s/" % statusserver["address"], own_key, target, tag, seq, starttime, elapsed)
+
+def do_reportbench(node, baseurl, own_key, target, tag, seq, starttime, elapsed):
+ try:
+ (ownname, _) = own_key
+ result = http_request(baseurl + "plop/v1/bench/merge_fetch",
+ json.dumps([{"source":ownname, "target":target, "tag": tag, "seq": seq, "starttime": starttime, "elapsed": elapsed}]), key=own_key,
+ verifynode=node)
+ return json.loads(result)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: reportstatus", e.response
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, target, tag, seq, starttime, elapsed
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, e.response
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
+
def merge_fetch(args, config, localconfig, currentsizefile):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
@@ -53,12 +90,15 @@ def merge_fetch(args, config, localconfig, currentsizefile):
assert(localconfig["nodename"] == config["primarymergenode"])
statusservers = config.get("statusservers")
timing = timing_point()
+ report_timing_point(timing, statusservers, own_key, None)
logorder = get_logorder(logorderfile)
- timing_point(timing, "get logorder")
certsinlog = set(logorder)
+ timing_point(timing, "get logorder")
+ report_timing_point(timing, statusservers, own_key, None)
+
new_entries_per_node = {}
new_entries = set()
entries_to_fetch = {}
@@ -75,7 +115,8 @@ def merge_fetch(args, config, localconfig, currentsizefile):
entries_to_fetch[storagenode["name"]] = []
except requests.exceptions.ConnectionError:
pass
- timing_point(timing, "get new entries")
+ timing_point(timing, "get new entries")
+ report_timing_point(timing, statusservers, own_key, storagenode["name"])
new_entries -= certsinlog
print >>sys.stderr, "adding", len(new_entries), "entries"
@@ -132,6 +173,7 @@ def merge_fetch(args, config, localconfig, currentsizefile):
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
+ report_timing_point(timing, statusservers, own_key, None)
print >>sys.stderr, "added", added_entries, "entries"
sys.stderr.flush()