summaryrefslogtreecommitdiff
path: root/tools/mergetools.py
diff options
context:
space:
mode:
authorLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
committerLinus Nordberg <linus@nordu.net>2016-11-23 17:09:48 +0100
commit19a2a611a839c0318f58347e2d93943c8e2401a5 (patch)
tree18cd302161a88d4546b39792a4bff6b1ade95c77 /tools/mergetools.py
parent27e368196ce65e109c027987c706a697356f7bc5 (diff)
WIP
Merge can run as four separate processes, plus a fifth controlling proces 'merge'. Tests are limited to testcase1.py and they're failing because of the test with the dead merge secondary. Tests are also time consuming because they're waiting for 60s each time a merge needs to be verified. This could be improved by peeking at the control files, for example.
Diffstat (limited to 'tools/mergetools.py')
-rw-r--r--tools/mergetools.py110
1 files changed, 69 insertions, 41 deletions
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 80fbf0b..5471fe4 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -10,11 +10,15 @@ import json
import yaml
import argparse
import requests
+import time
+import fcntl
+import errno
+import logging
try:
import permdb
except ImportError:
pass
-from certtools import get_leaf_hash, http_request, get_leaf_hash
+from certtools import get_leaf_hash, http_request
def parselogrow(row):
return base64.b16decode(row, casefold=True)
@@ -33,7 +37,7 @@ def get_nfetched(currentsizefile, logorderfile):
try:
limit = json.loads(open(currentsizefile).read())
except (IOError, ValueError):
- return -1
+ return 0
if limit['index'] >= 0:
with open(logorderfile, 'r') as f:
f.seek(limit['index']*65)
@@ -207,12 +211,10 @@ def get_curpos(node, baseurl, own_key, paths):
publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"position"]
- print >>sys.stderr, "ERROR: currentposition", parsed_result
- sys.exit(1)
+ return True, parsed_result[u"position"]
+ return False, parsed_result
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: currentposition", e.response
- sys.exit(1)
+ return False, e.response
def get_verifiedsize(node, baseurl, own_key, paths):
try:
@@ -234,19 +236,16 @@ def sendlog(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendlog",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.response
- sys.stderr.flush()
- return None
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def backup_sendlog(node, baseurl, own_key, paths, submission):
try:
@@ -274,18 +273,16 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash):
json.dumps({"entry":base64.b64encode(entry),
"treeleafhash":base64.b64encode(ehash)}),
key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.reponse
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(ehash)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)])
@@ -304,7 +301,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
+ print >>sys.stderr, hash
print >>sys.stderr, "======= RESPONSE ======="
print >>sys.stderr, result
print >>sys.stderr, "========================"
@@ -316,18 +313,16 @@ def sendsth(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendsth",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.response
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def verifyroot(node, baseurl, own_key, paths, treesize):
try:
@@ -403,10 +398,17 @@ def parse_args():
required=True)
parser.add_argument('--localconfig', help="Local configuration",
required=True)
- parser.add_argument('--interval', type=int, metavar="n",
- help="Repeate every N seconds")
+ # FIXME: verify that 0 < N < 1d
+ parser.add_argument('--mergeinterval', type=int, metavar="n",
+ help="Merge every N seconds")
parser.add_argument("--timing", action='store_true',
help="Print timing information")
+ parser.add_argument("--pidfile", type=str, metavar="file",
+ help="Store PID in FILE")
+ parser.add_argument("--logdir", type=str, default=".", metavar="dir",
+ help="Write logfiles in DIR [default: .]")
+ parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level",
+ help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]")
args = parser.parse_args()
config = yaml.load(open(args.config))
@@ -421,6 +423,32 @@ def perm(dbtype, path):
return PermDB(path)
assert False
+def waitforfile(path):
+ statinfo = None
+ while statinfo is None:
+ try:
+ statinfo = os.stat(path)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ time.sleep(1)
+ return statinfo
+
+def flock_ex_or_fail(path):
+ try:
+ fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno != errno.EWOULDBLOCK:
+ raise
+ return False
+ return True
+
+class Status:
+ def __init__(self, path):
+ self.path = path
+ def status(self, s):
+ open(self.path, 'w').write(s)
+
class FileDB:
def __init__(self, path):
self.path = path