import io import ecdsa import hashlib import yaml import base64 import sys def render_path(path): if path: return "'" + ", ".join(path) + "'" else: return "the top level" class ErrorHandlingDict(dict): def __init__(self, filename, path): self._filename = filename self._path = path dict.__init__({}) def __missing__(self, key): path = render_path(self._path) print >>sys.stderr, "error: could not find configuration key '%s' at %s in %s" % (key, path, self._filename) sys.exit(1) def errorhandlify(term, filename, path=[]): if isinstance(term, basestring): return term elif isinstance(term, int): return term elif isinstance(term, dict): result = ErrorHandlingDict(filename, path) for k, v in term.items(): result[k] = errorhandlify(v, filename, path + [k]) return result elif isinstance(term, list): return [errorhandlify(e, filename, path + ["item %d" % i]) for i, e in enumerate(term, start=1)] else: print "unknown type", type(term) sys.exit(1) def verify_config(rawconfig, signature, publickey_base64, filename): publickey = base64.decodestring(publickey_base64) try: vk = ecdsa.VerifyingKey.from_der(publickey) vk.verify(signature, rawconfig, hashfunc=hashlib.sha256, sigdecode=ecdsa.util.sigdecode_der) except ecdsa.keys.BadSignatureError: print >>sys.stderr, "error: configuration file %s did not have a correct signature" % (filename,) sys.exit(1) return errorhandlify(yaml.load(io.BytesIO(rawconfig), yaml.SafeLoader), filename) def verify_and_read_config(filename, publickey_base64, schema=None): rawconfig = open(filename).read() signature = open(filename + ".sig").read() verify_config(rawconfig, signature, publickey_base64, filename) config = yaml.load(io.BytesIO(rawconfig), yaml.SafeLoader) if schema: check_config_schema(config, schema) return errorhandlify(config, filename) def insert_schema_path(schema, path, datatype, highleveldatatype): if len(path) == 1: schema[path[0]] = (datatype, highleveldatatype) else: if path[0] not in schema: schema[path[0]] = {} insert_schema_path(schema[path[0]], path[1:], datatype, highleveldatatype) def transform_schema(in_schema): schema = {} for (rawpath, datatype, highleveldatatype) in in_schema: path = rawpath.split("/") insert_schema_path(schema, path, datatype, highleveldatatype) return schema def check_config_schema(config, schema): transformed_schema = transform_schema(schema) error = check_config_schema_part(config, transformed_schema) if error: print >>sys.stderr, "error:", error sys.exit(1) def check_config_schema_part(term, schema, path=[]): joined_path = render_path(path) if isinstance(term, basestring): (schema_lowlevel, schema_highlevel) = schema if schema_lowlevel != "string": return "expected %s at %s, not a string" % (schema_lowlevel, joined_path,) return None elif isinstance(term, int): (schema_lowlevel, schema_highlevel) = schema if schema_lowlevel != "integer": return "expected %s at %s, not an integer" % (schema_lowlevel, joined_path,) return None elif isinstance(term, dict): if not isinstance(schema, dict): return "expected %s at %s, not a key" % (schema, joined_path,) for k, v in term.items(): schema_part = schema.get(k) if schema_part == None and len(schema.keys()) == 1 and schema.keys()[0].startswith("*"): schema_part = schema[schema.keys()[0]] if schema_part == None: return "configuration key '%s' at %s unknown" % (k, joined_path) result = check_config_schema_part(v, schema_part, path + [k]) if result: return result return None elif isinstance(term, list): if not isinstance(schema, dict): return "expected %s at %s, not a list" % (schema, joined_path,) schema_part = schema.get("[]") if schema_part == None: return "expected dict at %s, not a list" % (joined_path,) for i, e in enumerate(term, start=1): result = check_config_schema_part(e, schema_part, path + ["item %d" % i]) if result: return result return None else: print >>sys.stderr, "unknown type", type(term) sys.exit(1) def read_config(filename, schema=None): config = yaml.load(open(filename), yaml.SafeLoader) if schema: check_config_schema(config, schema) return errorhandlify(config, filename)