summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-xtools/merge_fetch.py44
1 files changed, 41 insertions, 3 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index e71d3f1..165dee8 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -15,9 +15,32 @@ from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
hexencode, parse_args, perm
-from certtools import timing_point, write_file, create_ssl_context
+from certtools import timing_point, write_file, create_ssl_context, http_request
+import json
+
+def reportstatus(statsservers, own_key, target, variable, status):
+ for statsserver in statsservers:
+ do_reportstatus(statsserver["name"], "https://%s/" % statsserver["address"], own_key, target, variable, status)
+
+def do_reportstatus(node, baseurl, own_key, target, variable, status):
+ try:
+ result = http_request(baseurl + "plop/v1/status/merge_fetch",
+ json.dumps([{"target":target, "key": variable, "value": status}]), key=own_key,
+ verifynode=node)
+ return json.loads(result)
+ except requests.exceptions.HTTPError, e:
+ print >>sys.stderr, "ERROR: reportstatus", e.response
+ sys.exit(1)
+ except ValueError, e:
+ print >>sys.stderr, "==== FAILED REQUEST ===="
+ print >>sys.stderr, target, variable, status
+ print >>sys.stderr, "======= RESPONSE ======="
+ print >>sys.stderr, e.response
+ print >>sys.stderr, "========================"
+ sys.stderr.flush()
+ raise e
-def merge_fetch(args, config, localconfig):
+def merge_fetch(args, config, localconfig, currentsizefile):
paths = localconfig["paths"]
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
@@ -26,6 +49,8 @@ def merge_fetch(args, config, localconfig):
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
localconfig["nodename"]))
+ assert(localconfig["nodename"] == config["primarymergenode"])
+ statsservers = config.get("statsservers")
timing = timing_point()
logorder = get_logorder(logorderfile)
@@ -54,6 +79,7 @@ def merge_fetch(args, config, localconfig):
new_entries -= certsinlog
print >>sys.stderr, "adding", len(new_entries), "entries"
sys.stderr.flush()
+ reportstatus(statsservers, own_key, "fetch", "total", len(certsinlog) + len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -67,6 +93,8 @@ def merge_fetch(args, config, localconfig):
[paths["verifycert_bin"], paths["knownroots"]],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ noncommitted = 0
+
added_entries = 0
for storagenode in storagenodes:
if storagenode["name"] not in entries_to_fetch:
@@ -83,10 +111,19 @@ def merge_fetch(args, config, localconfig):
entry = entries[ehash]
verify_entry(verifycert, entry, ehash)
chainsdb.add(ehash, entry)
+ noncommitted += 1
add_to_logorder(logorderfile, ehash)
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
+ if noncommitted >= 1000:
+ chainsdb.commit()
+ fsync_logorder(logorderfile)
+ noncommitted = 0
+ tree_size = len(logorder)
+ currentsize = {"index": tree_size - 1, "hash": hexencode(logorder[tree_size-1])}
+ write_file(currentsizefile, currentsize)
+ reportstatus(statsservers, own_key, "fetch", "fetched", tree_size)
print >>sys.stderr, added_entries,
sys.stderr.flush()
print >>sys.stderr
@@ -104,6 +141,7 @@ def merge_fetch(args, config, localconfig):
sys.stderr.flush()
tree_size = len(logorder)
+ reportstatus(statsservers, own_key, "fetch", "fetched", tree_size)
if tree_size == 0:
return (0, '')
else:
@@ -125,7 +163,7 @@ def main():
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
- logsize, last_hash = merge_fetch(args, config, localconfig)
+ logsize, last_hash = merge_fetch(args, config, localconfig, currentsizefile)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
#print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
write_file(currentsizefile, currentsize)