summaryrefslogtreecommitdiff
path: root/tools/mergetools.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/mergetools.py')
-rw-r--r--tools/mergetools.py100
1 files changed, 90 insertions, 10 deletions
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 94901a9..d5d5f75 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -10,11 +10,16 @@ import json
import yaml
import argparse
import requests
+import time
+import fcntl
+import errno
+import multiprocessing
+import logging
try:
import permdb
except ImportError:
pass
-from certtools import get_leaf_hash, http_request, get_leaf_hash
+from certtools import get_leaf_hash, http_request
def parselogrow(row):
return base64.b16decode(row, casefold=True)
@@ -33,7 +38,7 @@ def get_nfetched(currentsizefile, logorderfile):
try:
limit = json.loads(open(currentsizefile).read())
except (IOError, ValueError):
- return -1
+ return 0
if limit['index'] >= 0:
with open(logorderfile, 'r') as f:
f.seek(limit['index']*65)
@@ -292,7 +297,7 @@ def backup_sendlog(node, baseurl, own_key, paths, submission):
def sendentries(node, baseurl, own_key, paths, entries, session=None):
try:
- json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
+ json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries]
result = http_request(
baseurl + "plop/v1/frontend/sendentry",
json.dumps(json_entries),
@@ -310,13 +315,13 @@ def sendentries(node, baseurl, own_key, paths, entries, session=None):
print >>sys.stderr, "========================"
sys.stderr.flush()
raise e
- except requests.exceptions.ConnectionError, e:
- print >>sys.stderr, "ERROR: sendentries", baseurl, e.request, e.response
+ except requests.exceptions.ConnectionError, e2:
+ print >>sys.stderr, "ERROR: sendentries", baseurl, e2.request, e2.response
sys.exit(1)
def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
try:
- json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(hash)} for hash, entry in entries]
+ json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries]
result = http_request(
baseurl + "plop/v1/merge/sendentry",
json.dumps(json_entries),
@@ -334,8 +339,8 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
print >>sys.stderr, "========================"
sys.stderr.flush()
raise e
- except requests.exceptions.ConnectionError, e:
- print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e.request, e.response
+ except requests.exceptions.ConnectionError, e2:
+ print >>sys.stderr, "ERROR: sendentries_merge", baseurl, e2.request, e2.response
sys.exit(1)
def publish_sth(node, baseurl, own_key, paths, submission):
@@ -430,10 +435,17 @@ def parse_args():
required=True)
parser.add_argument('--localconfig', help="Local configuration",
required=True)
- parser.add_argument('--interval', type=int, metavar="n",
- help="Repeate every N seconds")
+ # FIXME: verify that 0 < N < 1d
+ parser.add_argument('--mergeinterval', type=int, metavar="n",
+ help="Merge every N seconds")
parser.add_argument("--timing", action='store_true',
help="Print timing information")
+ parser.add_argument("--pidfile", type=str, metavar="file",
+ help="Store PID in FILE")
+ parser.add_argument("--logdir", type=str, default=".", metavar="dir",
+ help="Write logfiles in DIR [default: .]")
+ parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level",
+ help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]")
args = parser.parse_args()
config = yaml.load(open(args.config))
@@ -448,13 +460,74 @@ def perm(dbtype, path):
return PermDB(path)
assert False
+def waitforfile(path):
+ statinfo = None
+ while statinfo is None:
+ try:
+ statinfo = os.stat(path)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ time.sleep(1)
+ return statinfo
+
+def flock_ex_or_fail(path):
+ """
+ To be used at most once per process. Will otherwise leak file
+ descriptors.
+ """
+ try:
+ fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno != errno.EWOULDBLOCK:
+ raise
+ return False
+ return True
+
+def flock_ex_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for exclusive lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_EX)
+ logging.debug("taken exclusive lock on %s", fd)
+ return fd
+
+def flock_sh_wait(path):
+ fd = os.open(path, os.O_CREAT)
+ logging.debug("waiting for shared lock on %s (%s)", fd, path)
+ fcntl.flock(fd, fcntl.LOCK_SH)
+ logging.debug("taken shared lock on %s", fd)
+ return fd
+
+def flock_release(fd):
+ logging.debug("releasing lock on %s", fd)
+ fcntl.flock(fd, fcntl.LOCK_UN)
+ os.close(fd)
+
+def terminate_child_procs():
+ for p in multiprocessing.active_children():
+ #print >>sys.stderr, "DEBUG: terminating pid", p.pid
+ p.terminate()
+
+class Status:
+ def __init__(self, path):
+ self.path = path
+ def status(self, s):
+ open(self.path, 'w').write(s)
+
class FileDB:
def __init__(self, path):
self.path = path
+ self.lockfile = None
def get(self, key):
return read_chain(self.path, key)
def add(self, key, value):
return write_chain(key, value, self.path)
+ def lock_sh(self):
+ self.lockfile = flock_sh_wait(self.path + "/.lock")
+ def lock_ex(self):
+ self.lockfile = flock_ex_wait(self.path + "/.lock")
+ def release_lock(self):
+ flock_release(self.lockfile)
def commit(self):
pass
@@ -465,5 +538,12 @@ class PermDB:
return permdb.getvalue(self.permdbobj, key)
def add(self, key, value):
return permdb.addvalue(self.permdbobj, key, value)
+ def lock_sh(self):
+ assert False # NYI
+ def lock_ex(self):
+ assert False # NYI
+ def release_lock(self):
+ assert False # NYI
def commit(self):
permdb.committree(self.permdbobj)
+