# Copyright (c) 2015, NORDUnet A/S. # See LICENSE for licensing information. import os import base64 import hashlib import sys import struct import json import yaml import argparse import requests import time import fcntl import errno import multiprocessing import logging try: import permdb except ImportError: pass from certtools import get_leaf_hash, http_request def parselogrow(row): return base64.b16decode(row, casefold=True) def get_logorder(filename, items=-1): logorder = [] n = 0 for row in open(filename, "r"): if n == items: break logorder.append(parselogrow(row.rstrip())) n += 1 return logorder def get_nfetched(currentsizefile, logorderfile): try: limit = json.loads(open(currentsizefile).read()) except (IOError, ValueError): return 0 if limit['index'] >= 0: with open(logorderfile, 'r') as f: f.seek(limit['index']*65) assert f.read(64).lower() == limit['hash'] return limit['index'] + 1 def get_sth(filename): try: sth = json.loads(open(filename, 'r').read()) except (IOError, ValueError): sth = {'tree_size': -1, 'timestamp': 0, 'sha256_root_hash': '', 'tree_head_signature': ''} return sth def read_chain_open(chainsdir, filename): path = chainsdir + "/" + \ filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] f = open(path + "/" + filename, "r") return f def read_chain(chainsdir, key): filename = base64.b16encode(key).upper() try: f = read_chain_open(chainsdir, filename) except IOError: f = read_chain_open(chainsdir, filename.lower()) value = f.read() f.close() return value def tlv_decode(data): (length,) = struct.unpack(">I", data[0:4]) dtype = data[4:8] value = data[8:length] rest = data[length:] return (dtype, value, rest) def tlv_encode(dtype, value): assert len(dtype) == 4 data = struct.pack(">I", len(value) + 8) + dtype + value return data def tlv_decodelist(data): l = [] while len(data): (dtype, value, rest) = tlv_decode(data) l.append((dtype, value)) data = rest return l def tlv_encodelist(l): data = "" for (dtype, value) in l: data += tlv_encode(dtype, value) return data def unwrap_entry(entry): ploplevel = tlv_decodelist(entry) assert len(ploplevel) == 2 (ploptype, plopdata) = ploplevel[0] (plopchecksumtype, plopchecksum) = ploplevel[1] assert ploptype == "PLOP" assert plopchecksumtype == "S256" computedchecksum = hashlib.sha256(plopdata).digest() assert computedchecksum == plopchecksum return plopdata def wrap_entry(entry): return tlv_encodelist([("PLOP", entry), ("S256", hashlib.sha256(entry).digest())]) def verify_entry(verifycert, entry, ehash): packed = unwrap_entry(entry) unpacked = tlv_decodelist(packed) (mtltype, mtl) = unpacked[0] assert ehash == get_leaf_hash(mtl) assert mtltype == "MTL1" s = struct.pack(">I", len(packed)) + packed try: verifycert.stdin.write(s) except IOError: sys.stderr.write("merge: unable to write to verifycert process: ") while 1: line = verifycert.stdout.readline() if line: sys.stderr.write(line) else: sys.exit(1) result_length_packed = verifycert.stdout.read(4) (result_length,) = struct.unpack(">I", result_length_packed) result = verifycert.stdout.read(result_length) assert len(result) == result_length (error_code,) = struct.unpack("B", result[0:1]) if error_code != 0: print >>sys.stderr, result[1:] sys.exit(1) def hexencode(key): return base64.b16encode(key).lower() def hexdecode(s): return base64.b16decode(s.upper()) def write_chain(key, value, chainsdir, hashed_dir=True): filename = hexencode(key) if hashed_dir: path = chainsdir + "/" \ + filename[0:2] + "/" + filename[2:4] + "/" + filename[4:6] try: os.makedirs(path) except Exception: pass else: path = chainsdir f = open(path + "/" + filename, "w") f.write(value) f.close() def add_to_logorder(logorderfile, key): f = open(logorderfile, "a") f.write(hexencode(key) + "\n") f.close() def fsync_logorder(logorderfile): f = open(logorderfile, "a") os.fsync(f.fileno()) f.close() def get_new_entries(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/storage/fetchnewentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return [base64.b64decode(entry) for \ entry in parsed_result[u"entries"]] print >>sys.stderr, "ERROR: fetchnewentries", parsed_result sys.exit(1) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: fetchnewentries", e.response sys.exit(1) def get_entries(node, baseurl, own_key, paths, hashes): try: params = {"hash":[base64.b64encode(ehash) for ehash in hashes]} result = http_request(baseurl + "plop/v1/storage/getentry", params=params, key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": entries = dict([(base64.b64decode(entry["hash"]), base64.b64decode(entry["entry"])) for \ entry in parsed_result[u"entries"]]) assert len(entries) == len(hashes) assert set(entries.keys()) == set(hashes) return entries print >>sys.stderr, "ERROR: getentry", parsed_result sys.exit(1) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: getentry", e.request.url, e.response sys.exit(1) def get_curpos(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/frontend/currentposition", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return True, parsed_result[u"position"] return False, parsed_result except requests.exceptions.HTTPError, e: return False, e.response def get_verifiedsize(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/merge/verifiedsize", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return parsed_result[u"size"] print >>sys.stderr, "ERROR: verifiedsize", parsed_result sys.exit(1) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: verifiedsize", e.response sys.exit(1) def sendlog(node, baseurl, own_key, paths, submission): try: result = http_request(baseurl + "plop/v1/frontend/sendlog", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return True, json.loads(result) except requests.exceptions.HTTPError, e: return False, e.response except ValueError, e: logging.error("==== FAILED REQUEST ====") logging.error(submission) logging.error("======= RESPONSE =======") logging.error(result) logging.error("========================") return False, e def backup_sendlog(node, baseurl, own_key, paths, submission): try: result = http_request(baseurl + "plop/v1/merge/sendlog", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: backup_sendlog", e.response sys.stderr.flush() return None except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, submission print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" sys.stderr.flush() raise e def sendentry(node, baseurl, own_key, paths, entry, ehash): try: result = http_request( baseurl + "plop/v1/frontend/sendentry", json.dumps({"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return True, json.loads(result) except requests.exceptions.HTTPError, e: return False, e.response except ValueError, e: logging.error("==== FAILED REQUEST ====") logging.error(ehash) logging.error("======= RESPONSE =======") logging.error(result) logging.error("========================") return False, e def sendentry_merge(node, baseurl, own_key, paths, entry, ehash): return sendentries_merge(node, baseurl, own_key, paths, [(ehash, entry)]) def sendentries_merge(node, baseurl, own_key, paths, entries, session=None): try: json_entries = [{"entry":base64.b64encode(entry), "treeleafhash":base64.b64encode(ehash)} for ehash, entry in entries] result = http_request( baseurl + "plop/v1/merge/sendentry", json.dumps(json_entries), key=own_key, verifynode=node, publickeydir=paths["publickeys"], session=session) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: sendentry_merge", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, ehash print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" sys.stderr.flush() raise e def sendsth(node, baseurl, own_key, paths, submission): try: result = http_request(baseurl + "plop/v1/frontend/sendsth", json.dumps(submission), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return True, json.loads(result) except requests.exceptions.HTTPError, e: return False, e.response except ValueError, e: logging.error("==== FAILED REQUEST ====") logging.error(submission) logging.error("======= RESPONSE =======") logging.error(result) logging.error("========================") return False, e def verifyroot(node, baseurl, own_key, paths, treesize): try: result = http_request(baseurl + "plop/v1/merge/verifyroot", json.dumps({"tree_size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: verifyroot", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, treesize print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" sys.stderr.flush() raise e def setverifiedsize(node, baseurl, own_key, paths, treesize): try: result = http_request(baseurl + "plop/v1/merge/setverifiedsize", json.dumps({"size":treesize}), key=own_key, verifynode=node, publickeydir=paths["publickeys"]) return json.loads(result) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: setverifiedsize", e.response sys.exit(1) except ValueError, e: print >>sys.stderr, "==== FAILED REQUEST ====" print >>sys.stderr, treesize print >>sys.stderr, "======= RESPONSE =======" print >>sys.stderr, result print >>sys.stderr, "========================" sys.stderr.flush() raise e def get_missingentries(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/frontend/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return parsed_result[u"entries"] print >>sys.stderr, "ERROR: missingentries", parsed_result sys.exit(1) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: missingentries", e.response sys.exit(1) def get_missingentriesforbackup(node, baseurl, own_key, paths): try: result = http_request(baseurl + "plop/v1/merge/missingentries", key=own_key, verifynode=node, publickeydir=paths["publickeys"]) parsed_result = json.loads(result) if parsed_result.get(u"result") == u"ok": return parsed_result[u"entries"] print >>sys.stderr, "ERROR: missingentriesforbackup", parsed_result sys.exit(1) except requests.exceptions.HTTPError, e: print >>sys.stderr, "ERROR: missingentriesforbackup", e.response sys.exit(1) def chunks(l, n): return [l[i:i+n] for i in range(0, len(l), n)] def parse_args(): parser = argparse.ArgumentParser(description="") parser.add_argument('node', nargs='*', help="Node to operate on") parser.add_argument('--config', help="System configuration", required=True) parser.add_argument('--localconfig', help="Local configuration", required=True) # FIXME: verify that 0 < N < 1d parser.add_argument('--mergeinterval', type=int, metavar="n", help="Merge every N seconds") parser.add_argument("--timing", action='store_true', help="Print timing information") parser.add_argument("--pidfile", type=str, metavar="file", help="Store PID in FILE") parser.add_argument("--logdir", type=str, default=".", metavar="dir", help="Write logfiles in DIR [default: .]") parser.add_argument("--loglevel", type=str, default="DEBUG", metavar="level", help="Log level, one of DEBUG, INFO, WARNING, ERROR, CRITICAL [default: DEBUG]") args = parser.parse_args() config = yaml.load(open(args.config)) localconfig = yaml.load(open(args.localconfig)) return (args, config, localconfig) def perm(dbtype, path): if dbtype == "filedb": return FileDB(path) elif dbtype == "permdb": return PermDB(path) assert False def waitforfile(path): statinfo = None while statinfo is None: try: statinfo = os.stat(path) except OSError, e: if e.errno != errno.ENOENT: raise time.sleep(1) return statinfo def flock_ex_or_fail(path): """ To be used at most once per process. Will otherwise leak file descriptors. """ try: fcntl.flock(os.open(path, os.O_CREAT), fcntl.LOCK_EX + fcntl.LOCK_NB) except IOError, e: if e.errno != errno.EWOULDBLOCK: raise return False return True def flock_ex_wait(path): fd = os.open(path, os.O_CREAT) logging.debug("waiting for exclusive lock on %s (%s)", fd, path) fcntl.flock(fd, fcntl.LOCK_EX) logging.debug("taken exclusive lock on %s", fd) return fd def flock_sh_wait(path): fd = os.open(path, os.O_CREAT) logging.debug("waiting for shared lock on %s (%s)", fd, path) fcntl.flock(fd, fcntl.LOCK_SH) logging.debug("taken shared lock on %s", fd) return fd def flock_release(fd): logging.debug("releasing lock on %s", fd) fcntl.flock(fd, fcntl.LOCK_UN) os.close(fd) def terminate_child_procs(): for p in multiprocessing.active_children(): #print >>sys.stderr, "DEBUG: terminating pid", p.pid p.terminate() class Status: def __init__(self, path): self.path = path def status(self, s): open(self.path, 'w').write(s) class FileDB: def __init__(self, path): self.path = path self.lockfile = None def get(self, key): return read_chain(self.path, key) def add(self, key, value): return write_chain(key, value, self.path) def lock_sh(self): self.lockfile = flock_sh_wait(self.path + "/.lock") def lock_ex(self): self.lockfile = flock_ex_wait(self.path + "/.lock") def commit(self): flock_release(self.lockfile) class PermDB: def __init__(self, path): self.permdbobj = permdb.alloc(path) def get(self, key): return permdb.getvalue(self.permdbobj, key) def add(self, key, value): return permdb.addvalue(self.permdbobj, key, value) def lock_sh(self): assert False # NYI def lock_ex(self): assert False # NYI def commit(self): permdb.committree(self.permdbobj)