# -*- coding: utf-8 -*- __author__ = 'lundberg' import sys import argparse import logging from socket import gethostbyname, gethostbyaddr, gaierror import dns.resolver logger = logging.getLogger('dnscheck_nsd') logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) VERBOSE = False REFERENCE_NS = 'sunic.sunet.se' def get_resolver(nameserver=None): if not nameserver: return dns.resolver.Resolver() else: resolver = dns.resolver.Resolver() try: resolver.nameservers=[nameserver] except gaierror: logger.error('Could not find nameserver: %s' % nameserver) sys.exit(1) return resolver def compare_soa(zone, resolvers): answers = [] for resolver in resolvers: try: answer = resolver.query(zone, 'SOA')[0] if VERBOSE: if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1': logger.info('NS localhost: %s' % answer) else: logger.info('NS %s: %s' % (gethostbyaddr(resolver.nameservers[0])[0], answer)) except dns.exception.Timeout: logger.error('%s timed out.' % gethostbyaddr(resolver.nameservers[0])[0]) return 'timeout' except dns.resolver.NXDOMAIN: logger.error('NS %s responded NXDOMAIN for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)) return 'no match' if answer: answers.append(answer) if len(set(answers)) == 1: return 'match' return 'no match' def print_soa(zone, resolvers): for resolver in resolvers: try: answer = resolver.query(zone, 'SOA')[0] if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1': print 'NS localhost: %s' % answer else: print 'NS %s: %s' % (gethostbyaddr(resolver.nameservers[0])[0], answer) except dns.exception.Timeout: print '%s timed out. Print SOA for %s failed.' % (gethostbyaddr(resolver.nameservers[0])[0], zone) except dns.resolver.NXDOMAIN: logger.error('NS %s responded NXDOMAIN for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)) def parse_file(f): result = [] in_zone,domain,ns_address = False,'','' for line in f: if not line.startswith('#'): if line.startswith('zone'): in_zone = True if in_zone and line.find('name') != -1: domain = line.split()[1] if in_zone and line.find('request-xfr') != -1: ns_address = line.split()[1] if not line.strip(): # Blank line if domain: logger.error('Misconfigured zone: %s in %s.' % (domain, f.name)) if ns_address: logger.error('Misconfigured zone with NS address: %s in %s.' % (ns_address, f.name)) in_zone,domain,ns_address = False,'','' if in_zone and domain and ns_address: result.append({ 'domain': domain.strip('"'), 'ns_address': ns_address }) in_zone,domain,ns_address = False,'','' return result def main(): # User friendly usage output parser = argparse.ArgumentParser() parser.add_argument('--verbose', '-v', action='store_true', default=False) parser.add_argument( 'file', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help="NSD configuration file" ) args = parser.parse_args() if args.verbose: global VERBOSE VERBOSE = True ref_resolver = get_resolver(nameserver=gethostbyname(REFERENCE_NS)) for item in parse_file(args.file): resolver = get_resolver(nameserver=item['ns_address']) result = compare_soa(item['domain'], [ref_resolver, resolver]) if result == 'timeout': print 'Check for zone %s timed out.' % item['domain'] if result == 'no match': print 'SOA did not match:' print_soa(item['domain'], [ref_resolver, resolver]) if result == 'match' and VERBOSE: print 'Check complete for zone %s.\n' % item['domain'] return 0 if __name__ == '__main__': main()