From faeb5d277b630e9d43a32cd55b6dce4ca583f620 Mon Sep 17 00:00:00 2001 From: Johan Lundberg Date: Mon, 10 Dec 2012 18:19:44 +0100 Subject: Added authoritative check. --- dnscheck_nsd.py | 79 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/dnscheck_nsd.py b/dnscheck_nsd.py index 17b4362..15760ee 100644 --- a/dnscheck_nsd.py +++ b/dnscheck_nsd.py @@ -18,20 +18,18 @@ logger.addHandler(ch) VERBOSE = False def get_resolver(nameserver=None, lifetime=30): - if not nameserver: - resolver = dns.resolver.Resolver() - else: - resolver = dns.resolver.Resolver() + resolver = dns.resolver.Resolver() + resolver.lifetime = lifetime + if nameserver: try: resolver.nameservers=[gethostbyname(nameserver)] except gaierror: try: - resolver.nameservers=[nameserver] # Maybe it is an IPv6 address? + resolver.nameservers=[nameserver] # It is an IP address except gaierror: logger.error('Could not find nameserver: %s' % nameserver) sys.exit(1) logger.debug('Resolver instance with nameserver %s.' % resolver.nameservers[0]) - resolver.lifetime = lifetime return resolver def compare_soa(zone, resolvers): @@ -79,6 +77,37 @@ def print_soa(zone, resolvers): except dns.resolver.NoNameservers: logger.error('No non-broken nameservers are available to answer the query for %s.' % zone) +def check_auth(zone, resolver): + try: + nameserver = gethostbyaddr(resolver.nameservers[0])[0] + answer = resolver.query(zone, 'NS') + if VERBOSE: + logger.info('Checking if NS %s authoritative for %s...' % (nameserver, zone)) + ns_match = '%s.' % nameserver # hostname. + for auth in answer: + if ns_match == auth.to_text(): + if VERBOSE: + logger.info('NS %s is authoritative for %s...' % (nameserver, zone)) + return True + except dns.exception.Timeout: + logger.error('%s timed out. NS request for %s failed.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)) + return None + except dns.resolver.NoAnswer: + logger.error('%s returned no answer for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)) + return None + except dns.resolver.NXDOMAIN: + logger.error('NS %s responded domain not found (NXDOMAIN) for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)) + return None + except dns.resolver.NoNameservers: + logger.error('No non-broken nameservers are available to answer the query for %s.' % zone) + return None + except gaierror: + logger.error('Could not find nameserver: %s' % resolver.nameservers[0]) + return None + if VERBOSE: + logger.info('NS %s is not authoritative for %s...' % (nameserver, zone)) + return False + def parse_file(f): result = [] in_zone,domain,ns_address = False,'','' @@ -107,8 +136,10 @@ def parse_file(f): def main(): # User friendly usage output parser = argparse.ArgumentParser() - parser.add_argument('--nameserver', '-ns', default=None, help="IP address or hostname, default localhost") + parser.add_argument('--nameserver', '-ns', default=None, help="IP address or hostname of reference NS, default localhosts resolver") parser.add_argument('--timeout', '-t', type=float, default=5, help="timeout in seconds, default 5") + parser.add_argument('--nochecksoa', action='store_true', default=False) + parser.add_argument('--nocheckauth', action='store_true', default=False) parser.add_argument('--verbose', '-v', action='store_true', default=False) parser.add_argument('--debug', action='store_true', default=False) parser.add_argument( @@ -126,19 +157,27 @@ def main(): logger.setLevel(logging.DEBUG) ref_resolver = get_resolver(nameserver=args.nameserver, lifetime=args.timeout) for item in parse_file(args.file): - resolver = get_resolver(nameserver=item['ns_address'], lifetime=args.timeout) - result = compare_soa(item['domain'], [ref_resolver, resolver]) - if not result: - print 'Check for zone %s failed.\n' % item['domain'] - if result == 'timeout': - print 'Check for zone %s timed out.\n' % item['domain'] - if result == 'no match': - print 'SOA did not match:' - print_soa(item['domain'], [ref_resolver, resolver]) - print '' - if result == 'match' and VERBOSE: - print 'Check complete for zone %s.\n' % item['domain'] - + if not args.nochecksoa: + resolver = get_resolver(nameserver=item['ns_address'], lifetime=args.timeout) + soa_result = compare_soa(item['domain'], [ref_resolver, resolver]) + if soa_result == 'match' and VERBOSE: + print 'SOA check complete for zone %s.\n' % item['domain'] + elif not soa_result: + print 'SOA check for zone %s failed.\n' % item['domain'] + elif soa_result == 'timeout': + print 'SOA check for zone %s timed out.\n' % item['domain'] + elif soa_result == 'no match': + print 'SOA did not match:' + print_soa(item['domain'], [ref_resolver, resolver]) + print '' + if not args.nocheckauth: + auth_result = check_auth(item['domain'], ref_resolver) + if auth_result and VERBOSE: + print 'Authority check complete for %s.\n' % item['domain'] + elif auth_result is None: + print 'Authoritative check failed for %s.\n' % item['domain'] + elif not auth_result: + print 'Reference NS is not authoritative for %s.\n' % item['domain'] return 0 if __name__ == '__main__': -- cgit v1.1