summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
commit19a2a611a839c0318f58347e2d93943c8e2401a5 (patch)
tree18cd302161a88d4546b39792a4bff6b1ade95c77 /tools/merge_fetch.py
parent27e368196ce65e109c027987c706a697356f7bc5 (diff)
WIP
Merge can run as four separate processes, plus a fifth controlling proces 'merge'. Tests are limited to testcase1.py and they're failing because of the test with the dead merge secondary. Tests are also time consuming because they're waiting for 60s each time a merge needs to be verified. This could be improved by peeking at the control files, for example.
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-xtools/merge_fetch.py59
1 files changed, 35 insertions, 24 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index db274a3..7973fae 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -10,10 +10,11 @@
import sys
import struct
import subprocess
+import logging
from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
- hexencode, parse_args, perm
+ hexencode, parse_args, perm, flock_ex_or_fail, Status
from certtools import timing_point, write_file, create_ssl_context
def merge_fetch(args, config, localconfig):
@@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig):
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
logorderfile = mergedb + "/logorder"
+ statusfile = mergedb + "/merge_fetch.status"
+ s = Status(statusfile)
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig):
entries_to_fetch = {}
for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
+ logging.info("getting new entries from %s", storagenode["name"])
new_entries_per_node[storagenode["name"]] = \
set(get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig):
timing_point(timing, "get new entries")
new_entries -= certsinlog
- print >>sys.stderr, "adding", len(new_entries), "entries"
- sys.stderr.flush()
+ logging.info("adding %d entries", len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig):
added_entries = 0
for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % \
- (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
+ nentries = len(entries_to_fetch[storagenode["name"]])
+ logging.info("getting %d entries from %s", nentries, storagenode["name"])
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig):
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: getting %d entries from %s: %d" %
+ (nentries, storagenode["name"], added_entries))
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
- print >>sys.stderr, "added", added_entries, "entries"
- sys.stderr.flush()
+ logging.info("added %d entries", added_entries)
verifycert.communicate(struct.pack("I", 0))
if args.timing:
- print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_fetch: %s", timing["deltatimes"])
tree_size = len(logorder)
if tree_size == 0:
@@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig):
def main():
"""
- Fetch new entries from all storage nodes.
+ Fetch new entries from all storage nodes, in sequence.
- Indicate current position by writing the index in the logorder file
- (0-based) to the 'fetched' file.
+ Indicate the current position by writing the hash and its 'logorder'
+ index, 0-based, to 'fetched'.
- Sleep some and start over.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
mergedb = paths["mergedb"]
currentsizefile = mergedb + "/fetched"
+ lockfile = mergedb + "/.merge_fetch.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_fetch.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
logsize, last_hash = merge_fetch(args, config, localconfig)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
- #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
+ logging.debug("writing to %s: %s", currentsizefile, currentsize)
write_file(currentsizefile, currentsize)
- if args.interval is None:
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ logging.debug("sleeping %d seconds", args.mergeinterval / 10)
+ sleep(args.mergeinterval / 10)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())