summaryrefslogtreecommitdiff
path: root/tools/merge_fetch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/merge_fetch.py')
-rwxr-xr-xtools/merge_fetch.py59
1 files changed, 35 insertions, 24 deletions
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index db274a3..7973fae 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -10,10 +10,11 @@
import sys
import struct
import subprocess
+import logging
from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
- hexencode, parse_args, perm
+ hexencode, parse_args, perm, flock_ex_or_fail, Status
from certtools import timing_point, write_file, create_ssl_context
def merge_fetch(args, config, localconfig):
@@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig):
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
logorderfile = mergedb + "/logorder"
+ statusfile = mergedb + "/merge_fetch.status"
+ s = Status(statusfile)
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig):
entries_to_fetch = {}
for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
+ logging.info("getting new entries from %s", storagenode["name"])
new_entries_per_node[storagenode["name"]] = \
set(get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig):
timing_point(timing, "get new entries")
new_entries -= certsinlog
- print >>sys.stderr, "adding", len(new_entries), "entries"
- sys.stderr.flush()
+ logging.info("adding %d entries", len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig):
added_entries = 0
for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % \
- (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
+ nentries = len(entries_to_fetch[storagenode["name"]])
+ logging.info("getting %d entries from %s", nentries, storagenode["name"])
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig):
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: getting %d entries from %s: %d" %
+ (nentries, storagenode["name"], added_entries))
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
- print >>sys.stderr, "added", added_entries, "entries"
- sys.stderr.flush()
+ logging.info("added %d entries", added_entries)
verifycert.communicate(struct.pack("I", 0))
if args.timing:
- print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_fetch: %s", timing["deltatimes"])
tree_size = len(logorder)
if tree_size == 0:
@@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig):
def main():
"""
- Fetch new entries from all storage nodes.
+ Fetch new entries from all storage nodes, in sequence.
- Indicate current position by writing the index in the logorder file
- (0-based) to the 'fetched' file.
+ Indicate the current position by writing the hash and its 'logorder'
+ index, 0-based, to 'fetched'.
- Sleep some and start over.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
mergedb = paths["mergedb"]
currentsizefile = mergedb + "/fetched"
+ lockfile = mergedb + "/.merge_fetch.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_fetch.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
logsize, last_hash = merge_fetch(args, config, localconfig)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
- #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
+ logging.debug("writing to %s: %s", currentsizefile, currentsize)
write_file(currentsizefile, currentsize)
- if args.interval is None:
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ logging.debug("sleeping %d seconds", args.mergeinterval / 10)
+ sleep(args.mergeinterval / 10)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())