summaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rwxr-xr-xtools/merge62
-rwxr-xr-xtools/merge_backup.py141
-rwxr-xr-xtools/merge_dist.py141
-rwxr-xr-xtools/merge_fetch.py59
-rwxr-xr-xtools/merge_sth.py81
-rw-r--r--tools/mergetools.py110
-rwxr-xr-xtools/testcase1.py16
7 files changed, 396 insertions, 214 deletions
diff --git a/tools/merge b/tools/merge
index b5a50d5..0d1bf0e 100755
--- a/tools/merge
+++ b/tools/merge
@@ -1,10 +1,58 @@
-#! /bin/sh
+#! /usr/bin/env python
+"""merge"""
-set -o errexit
+import os
+import sys
+import signal
+from time import sleep
+from mergetools import parse_args
+from multiprocessing import Process
+import merge_fetch, merge_backup, merge_sth, merge_dist
-BINDIR=$(dirname $0)
+def run_once():
+ """Merge once."""
+ ret = merge_fetch.main()
+ if ret == 0:
+ ret = merge_backup.main()
+ if ret == 0:
+ ret = merge_sth.main()
+ if ret == 0:
+ ret = merge_dist.main()
+ return ret
-$BINDIR/merge_fetch.py "$@"
-$BINDIR/merge_backup.py "$@"
-$BINDIR/merge_sth.py "$@"
-$BINDIR/merge_dist.py "$@"
+def term(signal, arg):
+ sys.exit(1)
+
+def run_continuously(pidfilepath):
+ """Run continuously."""
+ parts = ('fetch', merge_fetch), ('backup', merge_backup), ('sth', merge_sth), ('dist', merge_dist)
+ procs = {}
+ for part, mod in parts:
+ procs[part] = Process(target=mod.main, name='merge_%s' % part)
+ procs[part].daemon = True
+ procs[part].start()
+
+ if pidfilepath:
+ open(pidfilepath, 'w').write(str(os.getpid()) + '\n')
+
+ signal.signal(signal.SIGTERM, term)
+ while True:
+ for name, p in procs.items():
+ if not p.is_alive():
+ print >>sys.stderr, "\nERROR:", name, "process is gone; exiting"
+ return 1
+ sleep(1)
+
+def main():
+ """Main"""
+ args, _, _ = parse_args()
+
+ if args.mergeinterval is None:
+ ret = run_once()
+ else:
+ ret = run_continuously(args.pidfile)
+
+ return ret
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/tools/merge_backup.py b/tools/merge_backup.py
index 4f688c3..723fc7a 100755
--- a/tools/merge_backup.py
+++ b/tools/merge_backup.py
@@ -11,13 +11,16 @@ import sys
import base64
import select
import requests
+import errno
+import logging
from time import sleep
+from os import stat
from certtools import timing_point, build_merkle_tree, write_file, \
create_ssl_context
from mergetools import chunks, backup_sendlog, get_logorder, \
get_verifiedsize, get_missingentriesforbackup, \
hexencode, setverifiedsize, sendentries_merge, verifyroot, \
- get_nfetched, parse_args, perm
+ get_nfetched, parse_args, perm, waitforfile, flock_ex_or_fail, Status
def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
for trynumber in range(5, 0, -1):
@@ -28,12 +31,10 @@ def backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk):
if trynumber == 1:
return None
select.select([], [], [], 10.0)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
+ logging.info("tries left: %d", trynumber)
continue
return sendlogresult
-
def merge_backup(args, config, localconfig, secondaries):
paths = localconfig["paths"]
own_key = (localconfig["nodename"],
@@ -43,6 +44,8 @@ def merge_backup(args, config, localconfig, secondaries):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
currentsizefile = mergedb + "/fetched"
+ statusfile = mergedb + "/merge_backup.status"
+ s = Status(statusfile)
timing = timing_point()
nfetched = get_nfetched(currentsizefile, logorderfile)
@@ -61,64 +64,60 @@ def merge_backup(args, config, localconfig, secondaries):
nodeaddress = "https://%s/" % secondary["address"]
nodename = secondary["name"]
timing = timing_point()
- print >>sys.stderr, "backing up to node", nodename
- sys.stderr.flush()
- verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ logging.info("backing up to node %s", nodename)
+ try:
+ verifiedsize = get_verifiedsize(nodename, nodeaddress, own_key, paths)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when getting verified size from %s", nodename)
+ return 1
timing_point(timing, "get verified size")
- print >>sys.stderr, "verified size", verifiedsize
- sys.stderr.flush()
+ logging.info("verified size %d", verifiedsize)
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "determining end of log:",
+ logging.info("determining end of log")
for chunk in chunks(entries, 100000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk[:10])
if sendlogresult == None:
- print >>sys.stderr, "sendlog result was None"
- sys.exit(1)
+ logging.error("sendlog result was None")
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
+ s.status("INFO: determining end of log: %d" % verifiedsize)
if verifiedsize > 100000:
verifiedsize -= 100000
else:
verifiedsize = 0
+ logging.info("end of log determined")
timing_point(timing, "checklog")
entries = [base64.b64encode(entry) for entry in logorder[verifiedsize:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log")
for chunk in chunks(entries, 1000):
sendlogresult = backup_loop(nodename, nodeaddress, own_key, paths, verifiedsize, chunk)
if sendlogresult == None:
- sys.exit(1)
+ return 1
if sendlogresult["result"] != "ok":
- print >>sys.stderr, "backup_sendlog:", sendlogresult
- sys.exit(1)
+ logging.error("backup_sendlog: %s", sendlogresult)
+ return 1
verifiedsize += len(chunk)
- print >>sys.stderr, verifiedsize,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sending log: %d" % verifiedsize)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ logging.info("log sent")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
while missingentries:
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("missing entries: %d", len(missingentries))
fetched_entries = 0
- print >>sys.stderr, "fetching missing entries",
- sys.stderr.flush()
+ logging.info("fetching missing entries")
with requests.sessions.Session() as session:
for missingentry_chunk in chunks(missingentries, 100):
missingentry_hashes = [base64.b64decode(missingentry) for missingentry in missingentry_chunk]
@@ -127,59 +126,79 @@ def merge_backup(args, config, localconfig, secondaries):
own_key, paths,
hashes_and_entries, session)
if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry_merge:", sendentryresult
- sys.exit(1)
+ logging.error("sendentry_merge: %s", sendentryresult)
+ return 1
fetched_entries += len(missingentry_hashes)
- print >>sys.stderr, fetched_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: fetching missing entries: %d" % fetched_entries)
timing_point(timing, "send missing")
missingentries = get_missingentriesforbackup(nodename, nodeaddress,
own_key, paths)
timing_point(timing, "get missing")
- verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
- tree_size)
+ try:
+ verifyrootresult = verifyroot(nodename, nodeaddress, own_key, paths,
+ tree_size)
+ except requests.exceptions.ConnectionError, e:
+ logging.error("connection error when verifying root at %s", nodename)
+ return 1
if verifyrootresult["result"] != "ok":
- print >>sys.stderr, "verifyroot:", verifyrootresult
- sys.exit(1)
+ logging.error("verifyroot: %s", verifyrootresult)
+ return 1
secondary_root_hash = base64.b64decode(verifyrootresult["root_hash"])
if root_hash != secondary_root_hash:
- print >>sys.stderr, "secondary root hash was", \
- hexencode(secondary_root_hash)
- print >>sys.stderr, " expected", hexencode(root_hash)
- sys.exit(1)
+ logging.error("secondary root hash was %s, expected %s",
+ hexencode(secondary_root_hash),
+ hexencode(root_hash))
+ return 1
timing_point(timing, "verifyroot")
setverifiedsize(nodename, nodeaddress, own_key, paths, tree_size)
backuppath = mergedb + "/verified." + nodename
backupdata = {"tree_size": tree_size,
"sha256_root_hash": hexencode(root_hash)}
- #print >>sys.stderr, "DEBUG: writing to", backuppath, ":", backupdata
+ logging.debug("writing to %s: %s", backuppath, backupdata)
write_file(backuppath, backupdata)
if args.timing:
- print >>sys.stderr, "timing: merge_backup:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_backup: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read logorder file up until what's indicated by fetched file and
- build the tree.
+ Wait until 'fetched' exists and read it.
+
+ Read 'logorder' up until what's indicated by 'fetched' and build the
+ tree.
Distribute entries to all secondaries, write tree size and tree head
- to backup.<secondary> files as each secondary is verified to have
+ to 'backup.<secondary>' files as each secondary is verified to have
the entries.
- Sleep some and start over.
+ If `--mergeinterval', wait until 'fetched' is updated and read it
+ and start over from the point where 'logorder' is read.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_backup.lock"
+ fetched_path = mergedb + "/fetched"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_backup.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
all_secondaries = \
[n for n in config.get('mergenodes', []) if \
n['name'] != config['primarymergenode']]
- paths = localconfig["paths"]
create_ssl_context(cafile=paths["https_cacertfile"])
if len(args.node) == 0:
@@ -187,12 +206,20 @@ def main():
else:
nodes = [n for n in all_secondaries if n["name"] in args.node]
+ if args.mergeinterval is None:
+ return merge_backup(args, config, localconfig, nodes)
+
+ fetched_statinfo = waitforfile(fetched_path)
+
while True:
- merge_backup(args, config, localconfig, nodes)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ err = merge_backup(args, config, localconfig, nodes)
+ if err != 0:
+ return err
+ fetched_statinfo_old = fetched_statinfo
+ while fetched_statinfo == fetched_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ fetched_statinfo = stat(fetched_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_dist.py b/tools/merge_dist.py
index a9b5c60..7a13bfa 100755
--- a/tools/merge_dist.py
+++ b/tools/merge_dist.py
@@ -9,12 +9,15 @@
#
import sys
import json
+import logging
from time import sleep
from base64 import b64encode, b64decode
+from os import stat
from certtools import timing_point, \
create_ssl_context
from mergetools import get_curpos, get_logorder, chunks, get_missingentries, \
- sendsth, sendlog, sendentry, parse_args, perm
+ sendsth, sendlog, sendentry, parse_args, perm, waitforfile, \
+ flock_ex_or_fail, Status
def merge_dist(args, localconfig, frontendnodes, timestamp):
paths = localconfig["paths"]
@@ -25,17 +28,19 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
logorderfile = mergedb + "/logorder"
sthfile = mergedb + "/sth"
+ statusfile = mergedb + "/merge_dist.status"
+ s = Status(statusfile)
create_ssl_context(cafile=paths["https_cacertfile"])
timing = timing_point()
try:
sth = json.loads(open(sthfile, 'r').read())
except (IOError, ValueError):
- print >>sys.stderr, "No valid STH file found in", sthfile
+ logging.warning("No valid STH file found in %s", sthfile)
return timestamp
if sth['timestamp'] < timestamp:
- print >>sys.stderr, "New STH file older than the previous one:", \
- sth['timestamp'], "<", timestamp
+ logging.warning("New STH file older than the previous one: %d < %d",
+ sth['timestamp'], timestamp)
return timestamp
if sth['timestamp'] == timestamp:
return timestamp
@@ -49,96 +54,122 @@ def merge_dist(args, localconfig, frontendnodes, timestamp):
nodename = frontendnode["name"]
timing = timing_point()
- print >>sys.stderr, "distributing for node", nodename
- sys.stderr.flush()
- curpos = get_curpos(nodename, nodeaddress, own_key, paths)
+ logging.info("distributing for node %s", nodename)
+ ok, curpos = get_curpos(nodename, nodeaddress, own_key, paths)
+ if not ok:
+ logging.error("get_curpos: %s", curpos)
+ continue
timing_point(timing, "get curpos")
- print >>sys.stderr, "current position", curpos
- sys.stderr.flush()
+ logging.info("current position %d", curpos)
entries = [b64encode(entry) for entry in logorder[curpos:]]
- print >>sys.stderr, "sending log:",
- sys.stderr.flush()
+ logging.info("sending log: %d", len(entries))
+ sendlog_fail = False
for chunk in chunks(entries, 1000):
for trynumber in range(5, 0, -1):
- sendlogresult = sendlog(nodename, nodeaddress,
- own_key, paths,
- {"start": curpos, "hashes": chunk})
- if sendlogresult == None:
- if trynumber == 1:
- sys.exit(1)
- sleep(10)
- print >>sys.stderr, "tries left:", trynumber
- sys.stderr.flush()
- continue
+ ok, sendlogresult = sendlog(nodename, nodeaddress, own_key, paths,
+ {"start": curpos, "hashes": chunk})
+ if ok:
+ break
+ sleep(10)
+ logging.warning("tries left: %d", trynumber)
+ if not ok or sendlogresult.get("result") != "ok":
+ logging.error("sendlog: %s", sendlogresult)
+ sendlog_fail = True
break
- if sendlogresult["result"] != "ok":
- print >>sys.stderr, "sendlog:", sendlogresult
- sys.exit(1)
curpos += len(chunk)
- print >>sys.stderr, curpos,
- sys.stderr.flush()
- print >>sys.stderr
+ s.status("INFO: sendlog %d" % curpos)
timing_point(timing, "sendlog")
- print >>sys.stderr, "log sent"
- sys.stderr.flush()
+ if sendlog_fail:
+ logging.error("sendlog failed for %s", nodename)
+ continue
missingentries = get_missingentries(nodename, nodeaddress, own_key,
paths)
timing_point(timing, "get missing")
- print >>sys.stderr, "missing entries:", len(missingentries)
- sys.stderr.flush()
+ logging.info("sending missing entries: %d", len(missingentries))
sent_entries = 0
- print >>sys.stderr, "send missing entries",
- sys.stderr.flush()
+ sendentry_fail = False
for missingentry in missingentries:
ehash = b64decode(missingentry)
- sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
- chainsdb.get(ehash), ehash)
- if sendentryresult["result"] != "ok":
- print >>sys.stderr, "sendentry:", sendentryresult
- sys.exit(1)
+ ok, sendentryresult = sendentry(nodename, nodeaddress, own_key, paths,
+ chainsdb.get(ehash), ehash)
+ if not ok or sendentryresult.get("result") != "ok":
+ logging.error("sendentry: %s", sendentryresult)
+ sendentry_fail = True
+ break
sent_entries += 1
if sent_entries % 1000 == 0:
- print >>sys.stderr, sent_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: sendentry %d" % sent_entries)
timing_point(timing, "send missing")
+ if sendentry_fail:
+ logging.error("sendentry failed for %s", nodename)
+ continue
- print >>sys.stderr, "sending sth to node", nodename
- sys.stderr.flush()
- sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
- if sendsthresult["result"] != "ok":
- print >>sys.stderr, "sendsth:", sendsthresult
- sys.exit(1)
+ logging.info("sending sth to node %s", nodename)
+ sendsth_fail = False
+ ok, sendsthresult = sendsth(nodename, nodeaddress, own_key, paths, sth)
+ if not ok or sendsthresult.get("result") != "ok":
+ logging.error("sendsth: %s", sendsthresult)
+ sendsth_fail = True
timing_point(timing, "send sth")
if args.timing:
- print >>sys.stderr, "timing: merge_dist:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_dist: %s", timing["deltatimes"])
+
+ if sendsth_fail:
+ logging.error("sendsth failed for %s", nodename)
+ continue
return timestamp
def main():
"""
+ Wait until 'sth' exists and read it.
+
Distribute missing entries and the STH to all frontend nodes.
+
+ If `--mergeinterval', wait until 'sth' is updated and read it and
+ start distributing again.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_dist.lock"
timestamp = 0
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_dist.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
if len(args.node) == 0:
nodes = config["frontendnodes"]
else:
nodes = [n for n in config["frontendnodes"] if n["name"] in args.node]
+ if args.mergeinterval is None:
+ if merge_dist(args, localconfig, nodes, timestamp) < 0:
+ return 1
+ return 0
+
+ sth_path = localconfig["paths"]["mergedb"] + "/sth"
+ sth_statinfo = waitforfile(sth_path)
while True:
- timestamp = merge_dist(args, localconfig, nodes, timestamp)
- if args.interval is None:
- break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ if merge_dist(args, localconfig, nodes, timestamp) < 0:
+ return 1
+ sth_statinfo_old = sth_statinfo
+ while sth_statinfo == sth_statinfo_old:
+ sleep(args.mergeinterval / 30)
+ sth_statinfo = stat(sth_path)
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_fetch.py b/tools/merge_fetch.py
index db274a3..7973fae 100755
--- a/tools/merge_fetch.py
+++ b/tools/merge_fetch.py
@@ -10,10 +10,11 @@
import sys
import struct
import subprocess
+import logging
from time import sleep
from mergetools import get_logorder, verify_entry, get_new_entries, \
chunks, fsync_logorder, get_entries, add_to_logorder, \
- hexencode, parse_args, perm
+ hexencode, parse_args, perm, flock_ex_or_fail, Status
from certtools import timing_point, write_file, create_ssl_context
def merge_fetch(args, config, localconfig):
@@ -21,6 +22,8 @@ def merge_fetch(args, config, localconfig):
storagenodes = config["storagenodes"]
mergedb = paths["mergedb"]
logorderfile = mergedb + "/logorder"
+ statusfile = mergedb + "/merge_fetch.status"
+ s = Status(statusfile)
chainsdb = perm(localconfig.get("dbbackend", "filedb"), mergedb + "/chains")
own_key = (localconfig["nodename"],
"%s/%s-private.pem" % (paths["privatekeys"],
@@ -37,8 +40,7 @@ def merge_fetch(args, config, localconfig):
entries_to_fetch = {}
for storagenode in storagenodes:
- print >>sys.stderr, "getting new entries from", storagenode["name"]
- sys.stderr.flush()
+ logging.info("getting new entries from %s", storagenode["name"])
new_entries_per_node[storagenode["name"]] = \
set(get_new_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -48,8 +50,7 @@ def merge_fetch(args, config, localconfig):
timing_point(timing, "get new entries")
new_entries -= certsinlog
- print >>sys.stderr, "adding", len(new_entries), "entries"
- sys.stderr.flush()
+ logging.info("adding %d entries", len(new_entries))
for ehash in new_entries:
for storagenode in storagenodes:
@@ -63,9 +64,8 @@ def merge_fetch(args, config, localconfig):
added_entries = 0
for storagenode in storagenodes:
- print >>sys.stderr, "getting %d entries from %s:" % \
- (len(entries_to_fetch[storagenode["name"]]), storagenode["name"]),
- sys.stderr.flush()
+ nentries = len(entries_to_fetch[storagenode["name"]])
+ logging.info("getting %d entries from %s", nentries, storagenode["name"])
for chunk in chunks(entries_to_fetch[storagenode["name"]], 100):
entries = get_entries(storagenode["name"],
"https://%s/" % storagenode["address"],
@@ -78,21 +78,17 @@ def merge_fetch(args, config, localconfig):
logorder.append(ehash)
certsinlog.add(ehash)
added_entries += 1
- print >>sys.stderr, added_entries,
- sys.stderr.flush()
- print >>sys.stderr
- sys.stderr.flush()
+ s.status("INFO: getting %d entries from %s: %d" %
+ (nentries, storagenode["name"], added_entries))
chainsdb.commit()
fsync_logorder(logorderfile)
timing_point(timing, "add entries")
- print >>sys.stderr, "added", added_entries, "entries"
- sys.stderr.flush()
+ logging.info("added %d entries", added_entries)
verifycert.communicate(struct.pack("I", 0))
if args.timing:
- print >>sys.stderr, "timing: merge_fetch:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_fetch: %s", timing["deltatimes"])
tree_size = len(logorder)
if tree_size == 0:
@@ -102,28 +98,43 @@ def merge_fetch(args, config, localconfig):
def main():
"""
- Fetch new entries from all storage nodes.
+ Fetch new entries from all storage nodes, in sequence.
- Indicate current position by writing the index in the logorder file
- (0-based) to the 'fetched' file.
+ Indicate the current position by writing the hash and its 'logorder'
+ index, 0-based, to 'fetched'.
- Sleep some and start over.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
paths = localconfig["paths"]
mergedb = paths["mergedb"]
currentsizefile = mergedb + "/fetched"
+ lockfile = mergedb + "/.merge_fetch.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_fetch.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
+
create_ssl_context(cafile=paths["https_cacertfile"])
while True:
logsize, last_hash = merge_fetch(args, config, localconfig)
currentsize = {"index": logsize - 1, "hash": hexencode(last_hash)}
- #print >>sys.stderr, "DEBUG: writing to", currentsizefile, ":", currentsize
+ logging.debug("writing to %s: %s", currentsizefile, currentsize)
write_file(currentsizefile, currentsize)
- if args.interval is None:
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- sleep(args.interval)
+ logging.debug("sleeping %d seconds", args.mergeinterval / 10)
+ sleep(args.mergeinterval / 10)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/merge_sth.py b/tools/merge_sth.py
index f4aec53..97f6e24 100755
--- a/tools/merge_sth.py
+++ b/tools/merge_sth.py
@@ -9,12 +9,13 @@
#
import sys
import json
-import urllib2
import time
import requests
+import logging
from base64 import b64encode
+from datetime import datetime, timedelta
from mergetools import parse_args, get_nfetched, hexencode, hexdecode, \
- get_logorder, get_sth
+ get_logorder, get_sth, flock_ex_or_fail
from certtools import create_ssl_context, get_public_key_from_file, \
timing_point, create_sth_signature, write_file, check_sth_signature, \
build_merkle_tree
@@ -39,6 +40,7 @@ def merge_sth(args, config, localconfig):
trees = [{'tree_size': get_nfetched(currentsizefile, logorderfile),
'sha256_root_hash': ''}]
+ logging.debug("starting point, trees: %s", trees)
for mergenode in mergenodes:
if mergenode["name"] == config["primarymergenode"]:
continue
@@ -49,28 +51,29 @@ def merge_sth(args, config, localconfig):
tree = {'tree_size': 0, "sha256_root_hash": ''}
trees.append(tree)
trees.sort(key=lambda e: e['tree_size'], reverse=True)
- #print >>sys.stderr, "DEBUG: trees:", trees
+ logging.debug("trees: %s", trees)
if backupquorum > len(trees) - 1:
- print >>sys.stderr, "backup quorum > number of secondaries:", \
- backupquorum, ">", len(trees) - 1
- return
+ logging.error("backup quorum > number of secondaries: %d > %d",
+ backupquorum, len(trees) - 1)
+ return -1
tree_size = trees[backupquorum]['tree_size']
root_hash = hexdecode(trees[backupquorum]['sha256_root_hash'])
- #print >>sys.stderr, "DEBUG: tree size candidate at backupquorum", backupquorum, ":", tree_size
+ logging.debug("tree size candidate at backupquorum %d: %d", backupquorum,
+ tree_size)
cur_sth = get_sth(sthfile)
if tree_size < cur_sth['tree_size']:
- print >>sys.stderr, "candidate tree < current tree:", \
- tree_size, "<", cur_sth['tree_size']
- return
+ logging.info("candidate tree < current tree: %d < %d",
+ tree_size, cur_sth['tree_size'])
+ return 0
assert tree_size >= 0 # Don't read logorder without limit.
logorder = get_logorder(logorderfile, tree_size)
timing_point(timing, "get logorder")
if tree_size == -1:
tree_size = len(logorder)
- print >>sys.stderr, "new tree size will be", tree_size
+ logging.info("new tree size will be %d", tree_size)
root_hash_calc = build_merkle_tree(logorder)[-1][0]
assert root_hash == '' or root_hash == root_hash_calc
@@ -87,11 +90,10 @@ def merge_sth(args, config, localconfig):
key=own_key)
break
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, e.response
- sys.stderr.flush()
+ logging.warning("create_sth_signature error: %s", e.response)
if tree_head_signature == None:
- print >>sys.stderr, "Could not contact any signing nodes"
- sys.exit(1)
+ logging.error("Could not contact any signing nodes")
+ return 0
sth = {"tree_size": tree_size, "timestamp": timestamp,
"sha256_root_hash": b64encode(root_hash),
@@ -100,34 +102,59 @@ def merge_sth(args, config, localconfig):
check_sth_signature(ctbaseurl, sth, publickey=logpublickey)
timing_point(timing, "build sth")
- print hexencode(root_hash), timestamp, tree_size
- sys.stdout.flush()
+ logging.info("new root: %s %d %d", hexencode(root_hash), timestamp, tree_size)
write_file(sthfile, sth)
if args.timing:
- print >>sys.stderr, "timing: merge_sth:", timing["deltatimes"]
- sys.stderr.flush()
+ logging.debug("timing: merge_sth: %s", timing["deltatimes"])
+
+ return 0
def main():
"""
- Read file 'sth' to get current tree size, assuming zero if file not
+ Read 'sth' to get the current tree size, assuming zero if file not
found.
Read tree sizes from the backup.<secondary> files, put them in a
- list and sort it. Let new tree size equal list[backup-quorum]. Barf
- on a new tree size smaller than the currently published tree size.
+ list and sort the list. Let new tree size be list[backup-quorum]. If
+ the new tree size is smaller than the currently published tree size,
+ stop here.
+
+ Decide on a timestamp, build an STH and write it to 'sth'.
- Decide on a timestamp, build an STH and write it to file 'sth'.
+ Sleep some and start over, or exit if there's no `--mergeinterval'.
"""
args, config, localconfig = parse_args()
+ paths = localconfig["paths"]
+ mergedb = paths["mergedb"]
+ lockfile = mergedb + "/.merge_sth.lock"
+
+ loglevel = getattr(logging, args.loglevel.upper())
+ if args.mergeinterval is None:
+ logging.basicConfig(level=loglevel)
+ else:
+ logging.basicConfig(filename=args.logdir + "/merge_sth.log",
+ level=loglevel)
+
+ if not flock_ex_or_fail(lockfile):
+ logging.critical("unable to take lock %s", lockfile)
+ return 1
while True:
- merge_sth(args, config, localconfig)
- if args.interval is None:
+ merge_start_time = datetime.now()
+ ret = merge_sth(args, config, localconfig)
+ if ret < 0:
+ return 1
+ if args.mergeinterval is None:
break
- print >>sys.stderr, "sleeping", args.interval, "seconds"
- time.sleep(args.interval)
+ sleep = (merge_start_time + timedelta(seconds=args.mergeinterval) -
+ datetime.now()).seconds
+ if sleep > 0:
+ logging.debug("sleeping %d seconds", sleep)
+ time.sleep(sleep)
+
+ return 0
if __name__ == '__main__':
sys.exit(main())
diff --git a/tools/mergetools.py b/tools/mergetools.py
index 80fbf0b..5471fe4 100644
--- a/tools/mergetools.py
+++ b/tools/mergetools.py
@@ -10,11 +10,15 @@ import json
import yaml
import argparse
import requests
+import time
+import fcntl
+import errno
+import logging
try:
import permdb
except ImportError:
pass
-from certtools import get_leaf_hash, http_request, get_leaf_hash
+from certtools import get_leaf_hash, http_request
def parselogrow(row):
return base64.b16decode(row, casefold=True)
@@ -33,7 +37,7 @@ def get_nfetched(currentsizefile, logorderfile):
try:
limit = json.loads(open(currentsizefile).read())
except (IOError, ValueError):
- return -1
+ return 0
if limit['index'] >= 0:
with open(logorderfile, 'r') as f:
f.seek(limit['index']*65)
@@ -207,12 +211,10 @@ def get_curpos(node, baseurl, own_key, paths):
publickeydir=paths["publickeys"])
parsed_result = json.loads(result)
if parsed_result.get(u"result") == u"ok":
- return parsed_result[u"position"]
- print >>sys.stderr, "ERROR: currentposition", parsed_result
- sys.exit(1)
+ return True, parsed_result[u"position"]
+ return False, parsed_result
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: currentposition", e.response
- sys.exit(1)
+ return False, e.response
def get_verifiedsize(node, baseurl, own_key, paths):
try:
@@ -234,19 +236,16 @@ def sendlog(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendlog",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendlog", e.response
- sys.stderr.flush()
- return None
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def backup_sendlog(node, baseurl, own_key, paths, submission):
try:
@@ -274,18 +273,16 @@ def sendentry(node, baseurl, own_key, paths, entry, ehash):
json.dumps({"entry":base64.b64encode(entry),
"treeleafhash":base64.b64encode(ehash)}),
key=own_key, verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendentry", e.reponse
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(ehash)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def sendentry_merge(node, baseurl, own_key, paths, entry, ehash):
return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)])
@@ -304,7 +301,7 @@ def sendentries_merge(node, baseurl, own_key, paths, entries, session=None):
sys.exit(1)
except ValueError, e:
print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, ehash
+ print >>sys.stderr, hash
print >>sys.stderr, "======= RESPONSE ======="
print >>sys.stderr, result
print >>sys.stderr, "========================"
@@ -316,18 +313,16 @@ def sendsth(node, baseurl, own_key, paths, submission):
result = http_request(baseurl + "plop/v1/frontend/sendsth",
json.dumps(submission), key=own_key,
verifynode=node, publickeydir=paths["publickeys"])
- return json.loads(result)
+ return True, json.loads(result)
except requests.exceptions.HTTPError, e:
- print >>sys.stderr, "ERROR: sendsth", e.response
- sys.exit(1)
+ return False, e.response
except ValueError, e:
- print >>sys.stderr, "==== FAILED REQUEST ===="
- print >>sys.stderr, submission
- print >>sys.stderr, "======= RESPONSE ======="
- print >>sys.stderr, result
- print >>sys.stderr, "========================"
- sys.stderr.flush()
- raise e
+ logging.error("==== FAILED REQUEST ====")
+ logging.error(submission)
+ logging.error("======= RESPONSE =======")
+ logging.error(result)
+ logging.error("========================")
+ return False, e
def verifyroot(node, baseurl, own_key, paths, treesize):
try:
@@ -403,10 +398,17 @@ def parse_args():
required=True)
parser.add_argument('--localconfig', help="Local configuration",
required=True)
- parser.add_argument('--interval', type=int, metavar="n",
- help="Repeate every N seconds")
+ # FIXME: verify that 0 < N < 1d
+ parser.add_argument('--mergeinterval', type=int, metavar="n",
+ help="Merge every N seconds")
parser.add_argument("--timing", action='store_true',
help="Print timing information")
+ parser.add_argument("--pidfile", type=str, metavar="file",
+ help="Store PID in FILE")
+ parser.add_argument("--logdir", type=str, default=".", metavar="dir",
+ help="Write logfiles in DIR [default: .]")
+ parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level",
+ help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]")
args = parser.parse_args()
config = yaml.load(open(args.config))
@@ -421,6 +423,32 @@ def perm(dbtype, path):
return PermDB(path)
assert False
+def waitforfile(path):
+ statinfo = None
+ while statinfo is None:
+ try:
+ statinfo = os.stat(path)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ time.sleep(1)
+ return statinfo
+
+def flock_ex_or_fail(path):
+ try:
+ fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB)
+ except IOError, e:
+ if e.errno != errno.EWOULDBLOCK:
+ raise
+ return False
+ return True
+
+class Status:
+ def __init__(self, path):
+ self.path = path
+ def status(self, s):
+ open(self.path, 'w').write(s)
+
class FileDB:
def __init__(self, path):
self.path = path
diff --git a/tools/testcase1.py b/tools/testcase1.py
index 81d589a..98b9179 100755
--- a/tools/testcase1.py
+++ b/tools/testcase1.py
@@ -13,6 +13,7 @@ import struct
import hashlib
import itertools
import os.path
+from time import sleep
from certtools import *
baseurls = [sys.argv[1]]
@@ -20,6 +21,9 @@ logpublickeyfile = sys.argv[2]
cacertfile = sys.argv[3]
toolsdir = os.path.dirname(sys.argv[0])
testdir = sys.argv[4]
+do_merge = True
+if len(sys.argv) > 5 and sys.argv[5] == '--nomerge':
+ do_merge = False
certfiles = [toolsdir + ("/testcerts/cert%d.txt" % e) for e in range(1, 6)]
@@ -121,7 +125,7 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl):
assert_equal(len(entries), 1, "get_entries", quiet=True)
fetched_entry = entries["entries"][0]
merkle_tree_leaf = pack_mtl(timestamp, chain[0])
- leaf_input = base64.decodestring(fetched_entry["leaf_input"])
+ leaf_input = base64.decodestring(fetched_entry["leaf_input"])
assert_equal(leaf_input, merkle_tree_leaf, "entry", nodata=True, quiet=True)
extra_data = base64.decodestring(fetched_entry["extra_data"])
certchain = decode_certificate_chain(extra_data)
@@ -148,8 +152,14 @@ def get_and_check_entry(timestamp, chain, leaf_index, baseurl):
len(submittedcertchain))
def merge():
- return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",
- "--localconfig", testdir + "/catlfish-test-local-merge.cfg"])
+ if do_merge:
+ return subprocess.call([toolsdir + "/merge", "--config", testdir + "/catlfish-test.cfg",
+ "--localconfig", testdir + "/catlfish-test-local-merge.cfg"])
+ else:
+ n = 60
+ print "testcase1.py: sleeping", n, "seconds waiting for merge"
+ sleep(n)
+ return 0
mergeresult = merge()
assert_equal(mergeresult, 0, "merge", quiet=True, fatal=True)