1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# -*- coding: utf-8 -*-
__author__ = 'lundberg'
import sys
import argparse
import logging
from socket import gethostbyname, gethostbyaddr, gaierror
import dns.resolver
logger = logging.getLogger('dnscheck_nsd')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
VERBOSE = False
REFERENCE_NS = 'sunic.sunet.se'
def get_resolver(nameserver=None):
if not nameserver:
return dns.resolver.Resolver()
else:
resolver = dns.resolver.Resolver()
try:
resolver.nameservers=[nameserver]
except gaierror:
logger.error('Could not find nameserver: %s' % nameserver)
sys.exit(1)
return resolver
def compare_soa(zone, resolvers):
answers = []
for resolver in resolvers:
try:
answer = resolver.query(zone, 'SOA')[0]
if VERBOSE:
if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1':
logger.info('NS localhost: %s' % answer)
else:
logger.info('NS %s: %s' % (gethostbyaddr(resolver.nameservers[0])[0], answer))
except dns.exception.Timeout:
logger.error('%s timed out.' % gethostbyaddr(resolver.nameservers[0])[0])
return 'timeout'
except dns.resolver.NXDOMAIN:
logger.error('NS %s responded NXDOMAIN for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone))
return 'no match'
if answer:
answers.append(answer)
if len(set(answers)) == 1:
return 'match'
return 'no match'
def print_soa(zone, resolvers):
for resolver in resolvers:
try:
answer = resolver.query(zone, 'SOA')[0]
if resolver.nameservers[0] == '127.0.0.1' or resolver.nameservers[0] == '::1':
print 'NS localhost: %s' % answer
else:
print 'NS %s: %s' % (gethostbyaddr(resolver.nameservers[0])[0], answer)
except dns.exception.Timeout:
print '%s timed out. Print SOA for %s failed.' % (gethostbyaddr(resolver.nameservers[0])[0], zone)
except dns.resolver.NXDOMAIN:
logger.error('NS %s responded NXDOMAIN for %s.' % (gethostbyaddr(resolver.nameservers[0])[0], zone))
def parse_file(f):
result = []
in_zone,domain,ns_address = False,'',''
for line in f:
if not line.startswith('#'):
if line.startswith('zone'):
in_zone = True
if in_zone and line.find('name') != -1:
domain = line.split()[1]
if in_zone and line.find('request-xfr') != -1:
ns_address = line.split()[1]
if not line.strip(): # Blank line
if domain:
logger.error('Misconfigured zone: %s in %s.' % (domain, f.name))
if ns_address:
logger.error('Misconfigured zone with NS address: %s in %s.' % (ns_address, f.name))
in_zone,domain,ns_address = False,'',''
if in_zone and domain and ns_address:
result.append({
'domain': domain.strip('"'),
'ns_address': ns_address
})
in_zone,domain,ns_address = False,'',''
return result
def main():
# User friendly usage output
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='store_true', default=False)
parser.add_argument(
'file',
nargs='?',
type=argparse.FileType('r'),
default=sys.stdin,
help="NSD configuration file"
)
args = parser.parse_args()
if args.verbose:
global VERBOSE
VERBOSE = True
ref_resolver = get_resolver(nameserver=gethostbyname(REFERENCE_NS))
for item in parse_file(args.file):
resolver = get_resolver(nameserver=item['ns_address'])
result = compare_soa(item['domain'], [ref_resolver, resolver])
if result == 'timeout':
print 'Check for zone %s timed out.' % item['domain']
if result == 'no match':
print 'SOA did not match:'
print_soa(item['domain'], [ref_resolver, resolver])
if result == 'match' and VERBOSE:
print 'Check complete for zone %s.\n' % item['domain']
return 0
if __name__ == '__main__':
main()
|