diff options
Diffstat (limited to 'tools/mergetools.py')
-rw-r--r-- | tools/mergetools.py | 100 |
1 files changed, 90 insertions, 10 deletions
diff --git a/tools/mergetools.py b/tools/mergetools.py index 94901a9..d5d5f75 100644 --- a/tools/mergetools.py +++ b/tools/mergetools.py @@ -10,11 +10,16 @@ import json import yaml import argparse import requests +import time +import fcntl +import errno +import multiprocessing +import logging try: import permdb except ImportError: pass -from certtools import get_leaf_hash, http_request, get_leaf_hash +from certtools import get_leaf_hash, http_request def parselogrow(row): return base64.b16decode(row, casefold=True) @@ -33,7 +38,7 @@ def get_nfetched(currentsizefile, logorderfile): try: limit = json.loads(open(currentsizefile).read()) except (IOError, ValueError): - return -1 + return 0 if limit['index'] >= 0: with open(logorderfile, 'r') as f: f.seek(limit['index']*65) @@ -292,7 +297,7 @@ def backup_sendlog(node, baseurl, own_key, paths, submission): def sendentries(node, baseurl, own_key, paths, entries, session=None): try: - json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] + json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries] result = http_request( baseurl + "plop/v1/frontend/sendentry", json.dumps(json_entries), @@ -310,13 +315,13 @@ def sendentries(node, baseurl, own_key, paths, entries, session=None): print >>sys.stderr, "========================" sys.stderr.flush() raise e - except requests.exceptions.ConnectionError, e: - print >>sys.stderr, "ERROR: sendentries", baseurl, e.request, e.response + except requests.exceptions.ConnectionError, e2: + print >>sys.stderr, "ERROR: sendentries", baseurl, e2.request, e2.response sys.exit(1) def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: - json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries] + json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries] result = http_request( baseurl + "plop/v1/merge/sendentry", json.dumps(json_entries), @@ -334,8 +339,8 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): print >>sys.stderr, "========================" sys.stderr.flush() raise e - except requests.exceptions.ConnectionError, e: - print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e.request, e.response + except requests.exceptions.ConnectionError, e2: + print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e2.request, e2.response sys.exit(1) def publish_sth(node, baseurl, own_key, paths, submission): @@ -430,10 +435,17 @@ def parse_args(): required=True) parser.add_argument('--localconfig', help="Local configuration", required=True) - parser.add_argument('--interval', type=int, metavar="n", - help="Repeate every N seconds") + # FIXME: verify that 0 < N < 1d + parser.add_argument('--mergeinterval', type=int, metavar="n", + help="Merge every N seconds") parser.add_argument("--timing", action='store_true', help="Print timing information") + parser.add_argument("--pidfile", type=str, metavar="file", + help="Store PID in FILE") + parser.add_argument("--logdir", type=str, default=".", metavar="dir", + help="Write logfiles in DIR [default: .]") + parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level", + help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]") args = parser.parse_args() config = yaml.load(open(args.config)) @@ -448,13 +460,74 @@ def perm(dbtype, path): return PermDB(path) assert False +def waitforfile(path): + statinfo = None + while statinfo is None: + try: + statinfo = os.stat(path) + except OSError, e: + if e.errno != errno.ENOENT: + raise + time.sleep(1) + return statinfo + +def flock_ex_or_fail(path): + """ + To be used at most once per process. Will otherwise leak file + descriptors. + """ + try: + fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) + except IOError, e: + if e.errno != errno.EWOULDBLOCK: + raise + return False + return True + +def flock_ex_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for exclusive lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_EX) + logging.debug("taken exclusive lock on %s", fd) + return fd + +def flock_sh_wait(path): + fd = os.open(path, os.O_CREAT) + logging.debug("waiting for shared lock on %s (%s)", fd, path) + fcntl.flock(fd, fcntl.LOCK_SH) + logging.debug("taken shared lock on %s", fd) + return fd + +def flock_release(fd): + logging.debug("releasing lock on %s", fd) + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + +def terminate_child_procs(): + for p in multiprocessing.active_children(): + #print >>sys.stderr, "DEBUG: terminating pid", p.pid + p.terminate() + +class Status: + def __init__(self, path): + self.path = path + def status(self, s): + open(self.path, 'w').write(s) + class FileDB: def __init__(self, path): self.path = path + self.lockfile = None def get(self, key): return read_chain(self.path, key) def add(self, key, value): return write_chain(key, value, self.path) + def lock_sh(self): + self.lockfile = flock_sh_wait(self.path + "/.lock") + def lock_ex(self): + self.lockfile = flock_ex_wait(self.path + "/.lock") + def release_lock(self): + flock_release(self.lockfile) def commit(self): pass @@ -465,5 +538,12 @@ class PermDB: return permdb.getvalue(self.permdbobj, key) def add(self, key, value): return permdb.addvalue(self.permdbobj, key, value) + def lock_sh(self): + assert False # NYI + def lock_ex(self): + assert False # NYI + def release_lock(self): + assert False # NYI def commit(self): permdb.committree(self.permdbobj) + |