From 2b9e8c620006705a9459a61c88c7c0cd8fb02212 Mon Sep 17 00:00:00 2001 From: Magnus Ahltorp Date: Thu, 2 Mar 2017 00:52:46 +0100 Subject: Statusserver --- tools/merge_fetch.py | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) (limited to 'tools/merge_fetch.py') diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py index e71d3f1..165dee8 100755 --- a/tools/merge_fetch.py +++ b/tools/merge_fetch.py @@ -15,9 +15,32 @@ from time import sleep from mergetools import get_logorder, verify_entry, get_new_entries, \ chunks, fsync_logorder, get_entries, add_to_logorder, \ hexencode, parse_args, perm -from certtools import timing_point, write_file, create_ssl_context +from certtools import timing_point, write_file, create_ssl_context, http_request +import json + +def reportstatus(statsservers, own_key, target, variable, status): + for statsserver in statsservers: + do_reportstatus(statsserver["name"], "https://%s/" % statsserver["address"], own_key, target, variable, status) + +def do_reportstatus(node, baseurl, own_key, target, variable, status): + try: + result = http_request(baseurl + "plop/v1/status/merge_fetch", + json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key, + verifynode=node) + return json.loads(result) + except requests.exceptions.HTTPError, e: + print >>sys.stderr, "ERROR: reportstatus", e.response + sys.exit(1) + except ValueError, e: + print >>sys.stderr, "==== FAILED REQUEST ====" + print >>sys.stderr, target, variable, status + print >>sys.stderr, "======= RESPONSE =======" + print >>sys.stderr, e.response + print >>sys.stderr, "========================" + sys.stderr.flush() + raise e -def merge_fetch(args, config, localconfig): +def merge_fetch(args, config, localconfig, currentsizefile): paths = localconfig["paths"] storagenodes = config["storagenodes"] mergedb = paths["mergedb"] @@ -26,6 +49,8 @@ def merge_fetch(args, config, localconfig): own_key = (localconfig["nodename"], "%s/%s-private.pem" % (paths["privatekeys"], localconfig["nodename"])) + assert(localconfig["nodename"] == config["primarymergenode"]) + statsservers = config.get("statsservers") timing = timing_point() logorder = get_logorder(logorderfile) @@ -54,6 +79,7 @@ def merge_fetch(args, config, localconfig): new_entries -= certsinlog print >>sys.stderr, "adding", len(new_entries), "entries" sys.stderr.flush() + reportstatus(statsservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries)) for ehash in new_entries: for storagenode in storagenodes: @@ -67,6 +93,8 @@ def merge_fetch(args, config, localconfig): [paths["verifycert_bin"], paths["knownroots"]], stdin=subprocess.PIPE, stdout=subprocess.PIPE) + noncommitted = 0 + added_entries = 0 for storagenode in storagenodes: if storagenode["name"] not in entries_to_fetch: @@ -83,10 +111,19 @@ def merge_fetch(args, config, localconfig): entry = entries[ehash] verify_entry(verifycert, entry, ehash) chainsdb.add(ehash, entry) + noncommitted += 1 add_to_logorder(logorderfile, ehash) logorder.append(ehash) certsinlog.add(ehash) added_entries += 1 + if noncommitted >= 1000: + chainsdb.commit() + fsync_logorder(logorderfile) + noncommitted = 0 + tree_size = len(logorder) + currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])} + write_file(currentsizefile, currentsize) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) print >>sys.stderr, added_entries, sys.stderr.flush() print >>sys.stderr @@ -104,6 +141,7 @@ def merge_fetch(args, config, localconfig): sys.stderr.flush() tree_size = len(logorder) + reportstatus(statsservers, own_key, "fetch", "fetched", tree_size) if tree_size == 0: return (0, '') else: @@ -125,7 +163,7 @@ def main(): create_ssl_context(cafile=paths["https_cacertfile"]) while True: - logsize, last_hash = merge_fetch(args, config, localconfig) + logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile) currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)} #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize write_file(currentsizefile, currentsize) -- cgit v1.1