diff options
author | Johan Lundberg <lundberg@nordu.net> | 2015-04-02 10:43:33 +0200 |
---|---|---|
committer | Johan Lundberg <lundberg@nordu.net> | 2015-04-02 10:43:33 +0200 |
commit | bd611ac59f7c4db885a2f8631ef0bcdcd1901ca0 (patch) | |
tree | e60f5333a7699cd021b33c7f5292af55b774001b /lib/ciscoasa.py |
Diffstat (limited to 'lib/ciscoasa.py')
-rw-r--r-- | lib/ciscoasa.py | 454 |
1 files changed, 454 insertions, 0 deletions
diff --git a/lib/ciscoasa.py b/lib/ciscoasa.py new file mode 100644 index 0000000..f3f92b5 --- /dev/null +++ b/lib/ciscoasa.py @@ -0,0 +1,454 @@ +#!/usr/bin/python + + + +"""Cisco ASA renderer.""" + +__author__ = 'antony@slac.stanford.edu (Antonio Ceseracciu)' + +import datetime +import socket +import logging +import re + +from third_party import ipaddr +import aclgenerator +import nacaddr + + +_ACTION_TABLE = { + 'accept': 'permit', + 'deny': 'deny', + 'reject': 'deny', + 'next': '! next', + 'reject-with-tcp-rst': 'deny', # tcp rst not supported + } + + +# generic error class +class Error(Exception): + """Generic error class.""" + pass + + +class UnsupportedCiscoAccessListError(Error): + """Raised when we're give a non named access list.""" + pass + + +class StandardAclTermError(Error): + """Raised when there is a problem in a standard access list.""" + pass + + +class NoCiscoPolicyError(Error): + """Raised when a policy is errantly passed to this module for rendering.""" + pass + + +class Term(aclgenerator.Term): + """A single ACL Term.""" + + + def __init__(self, term, filter_name, af=4): + self.term = term + self.filter_name = filter_name + self.options = [] + assert af in (4, 6) + self.af = af + + def __str__(self): + # Verify platform specific terms. Skip whole term if platform does not + # match. + if self.term.platform: + if 'ciscoasa' not in self.term.platform: + return '' + if self.term.platform_exclude: + if 'ciscoasa' in self.term.platform_exclude: + return '' + + ret_str = ['\n'] + + # Don't render icmpv6 protocol terms under inet, or icmp under inet6 + if ((self.af == 6 and 'icmp' in self.term.protocol) or + (self.af == 4 and 'icmpv6' in self.term.protocol)): + ret_str.append('remark Term %s' % self.term.name) + ret_str.append('remark not rendered due to protocol/AF mismatch.') + return '\n'.join(ret_str) + + ret_str.append('access-list %s remark %s' % (self.filter_name, + self.term.name)) + if self.term.owner: + self.term.comment.append('Owner: %s' % self.term.owner) + for comment in self.term.comment: + for line in comment.split('\n'): + ret_str.append('access-list %s remark %s' % (self.filter_name, + str(line)[:100])) + + # Term verbatim output - this will skip over normal term creation + # code by returning early. Warnings provided in policy.py. + if self.term.verbatim: + for next in self.term.verbatim: + if next.value[0] == 'ciscoasa': + ret_str.append(str(next.value[1])) + return '\n'.join(ret_str) + + # protocol + if not self.term.protocol: + protocol = ['ip'] + else: + # fix the protocol + protocol = self.term.protocol + + # source address + if self.term.source_address: + source_address = self.term.GetAddressOfVersion('source_address', self.af) + source_address_exclude = self.term.GetAddressOfVersion( + 'source_address_exclude', self.af) + if source_address_exclude: + source_address = nacaddr.ExcludeAddrs( + source_address, + source_address_exclude) + else: + # source address not set + source_address = ['any'] + + # destination address + if self.term.destination_address: + destination_address = self.term.GetAddressOfVersion( + 'destination_address', self.af) + destination_address_exclude = self.term.GetAddressOfVersion( + 'destination_address_exclude', self.af) + if destination_address_exclude: + destination_address = nacaddr.ExcludeAddrs( + destination_address, + destination_address_exclude) + else: + # destination address not set + destination_address = ['any'] + + # options + extra_options = [] + for opt in [str(x) for x in self.term.option]: + if opt.find('tcp-established') == 0 and 6 in protocol: + extra_options.append('established') + elif opt.find('established') == 0 and 6 in protocol: + # only needed for TCP, for other protocols policy.py handles high-ports + extra_options.append('established') + self.options.extend(extra_options) + + # ports + source_port = [()] + destination_port = [()] + if self.term.source_port: + source_port = self.term.source_port + if self.term.destination_port: + destination_port = self.term.destination_port + + # logging + if self.term.logging: + self.options.append('log') + if 'disable' in [x.value for x in self.term.logging]: + self.options.append('disable') + + # icmp-types + icmp_types = [''] + if self.term.icmp_type: + icmp_types = self.NormalizeIcmpTypes(self.term.icmp_type, + self.term.protocol, self.af) + + for saddr in source_address: + for daddr in destination_address: + for sport in source_port: + for dport in destination_port: + for proto in protocol: + for icmp_type in icmp_types: + # only output address family appropriate IP addresses + do_output = False + if self.af == 4: + if (((type(saddr) is nacaddr.IPv4) or (saddr == 'any')) and + ((type(daddr) is nacaddr.IPv4) or (daddr == 'any'))): + do_output = True + if self.af == 6: + if (((type(saddr) is nacaddr.IPv6) or (saddr == 'any')) and + ((type(daddr) is nacaddr.IPv6) or (daddr == 'any'))): + do_output = True + if do_output: + ret_str.extend(self._TermletToStr( + self.filter_name, + _ACTION_TABLE.get(str(self.term.action[0])), + proto, + saddr, + sport, + daddr, + dport, + icmp_type, + self.options)) + + return '\n'.join(ret_str) + + def _TermPortToProtocol (self,portNumber,proto): + + _ASA_PORTS_TCP = { +5190: "aol", +179: "bgp", +19: "chargen", +1494: "citrix-ica", +514: "cmd", +2748: "ctiqbe", +13: "daytime", +9: "discard", +53: "domain", +7: "echo", +512: "exec", +79: "finger", +21: "ftp", +20: "ftp-data", +70: "gopher", +443: "https", +1720: "h323", +101: "hostname", +113: "ident", +143: "imap4", +194: "irc", +750: "kerberos", +543: "klogin", +544: "kshell", +389: "ldap", +636: "ldaps", +515: "lpd", +513: "login", +1352: "lotusnotes", +139: "netbios-ssn", +119: "nntp", +5631: "pcanywhere-data", +496: "pim-auto-rp", +109: "pop2", +110: "pop3", +1723: "pptp", +25: "smtp", +1521: "sqlnet", +22: "ssh", +111: "sunrpc", +49: "tacacs", +517: "talk", +23: "telnet", +540: "uucp", +43: "whois", +80: "www", +2049: "nfs" + } + _ASA_PORTS_UDP = { +512: "biff", +68: "bootpc", +67: "bootps", +9: "discard", +53: "domain", +195: "dnsix", +7: "echo", +500: "isakmp", +750: "kerberos", +434: "mobile-ip", +42: "nameserver", +137: "netbios-ns", +138: "netbios-dgm", +123: "ntp", +5632: "pcanywhere-status", +496: "pim-auto-rp", +1645: "radius", +1646: "radius-acct", +520: "rip", +5510: "secureid-udp", +161: "snmp", +162: "snmptrap", +111: "sunrpc", +514: "syslog", +49: "tacacs", +517: "talk", +69: "tftp", +37: "time", +513: "who", +177: "xdmcp", +2049: "nfs" + } + + _ASA_TYPES_ICMP = { +6: "alternate-address", +31: "conversion-error", +8: "echo", +0: "echo-reply", +16: "information-reply", +15: "information-request", +18: "mask-reply", +17: "mask-request", +32: "mobile-redirect", +12: "parameter-problem", +5: "redirect", +9: "router-advertisement", +10: "router-solicitation", +4: "source-quench", +11: "time-exceeded", +14: "timestamp-reply", +13: "timestamp-request", +30: "traceroute", +3: "unreachable" + } + + + if proto == "tcp": + if portNumber in _ASA_PORTS_TCP: + return _ASA_PORTS_TCP[portNumber] + elif proto == "udp": + if portNumber in _ASA_PORTS_UDP: + return _ASA_PORTS_UDP[portNumber] + elif proto == "icmp": + if portNumber in _ASA_TYPES_ICMP: + return _ASA_TYPES_ICMP[portNumber] + return portNumber + + def _TermletToStr(self, filter_name, action, proto, saddr, sport, daddr, dport, + icmp_type, option): + """Take the various compenents and turn them into a cisco acl line. + + Args: + action: str, action + proto: str, protocl + saddr: str or ipaddr, source address + sport: str list or none, the source port + daddr: str or ipaddr, the destination address + dport: str list or none, the destination port + icmp_type: icmp-type numeric specification (if any) + option: list or none, optional, eg. 'logging' tokens. + + Returns: + string of the cisco acl line, suitable for printing. + """ + + + # inet4 + if type(saddr) is nacaddr.IPv4 or type(saddr) is ipaddr.IPv4Network: + if saddr.numhosts > 1: + saddr = '%s %s' % (saddr.ip, saddr.netmask) + else: + saddr = 'host %s' % (saddr.ip) + if type(daddr) is nacaddr.IPv4 or type(daddr) is ipaddr.IPv4Network: + if daddr.numhosts > 1: + daddr = '%s %s' % (daddr.ip, daddr.netmask) + else: + daddr = 'host %s' % (daddr.ip) + # inet6 + if type(saddr) is nacaddr.IPv6 or type(saddr) is ipaddr.IPv6Network: + if saddr.numhosts > 1: + saddr = '%s/%s' % (saddr.ip, saddr.prefixlen) + else: + saddr = 'host %s' % (saddr.ip) + if type(daddr) is nacaddr.IPv6 or type(daddr) is ipaddr.IPv6Network: + if daddr.numhosts > 1: + daddr = '%s/%s' % (daddr.ip, daddr.prefixlen) + else: + daddr = 'host %s' % (daddr.ip) + + # fix ports + if not sport: + sport = '' + elif sport[0] != sport[1]: + sport = ' range %s %s' % (self._TermPortToProtocol(sport[0],proto), self._TermPortToProtocol(sport[1],proto)) + else: + sport = ' eq %s' % (self._TermPortToProtocol(sport[0],proto)) + + if not dport: + dport = '' + elif dport[0] != dport[1]: + dport = ' range %s %s' % (self._TermPortToProtocol(dport[0],proto), self._TermPortToProtocol(dport[1],proto)) + else: + dport = ' eq %s' % (self._TermPortToProtocol(dport[0],proto)) + + if not option: + option = [''] + + # Prevent UDP from appending 'established' to ACL line + sane_options = list(option) + if proto == 'udp' and 'established' in sane_options: + sane_options.remove('established') + + ret_lines = [] + + # str(icmp_type) is needed to ensure 0 maps to '0' instead of FALSE + icmp_type = str(self._TermPortToProtocol(icmp_type,"icmp")) + + ret_lines.append('access-list %s extended %s %s %s %s %s %s %s %s' % + (filter_name, action, proto, saddr, + sport, daddr, dport, + icmp_type, + ' '.join(sane_options) + )) + + # remove any trailing spaces and replace multiple spaces with singles + stripped_ret_lines = [re.sub('\s+', ' ', x).rstrip() for x in ret_lines] + return stripped_ret_lines + +# return 'access-list %s extended %s %s %s%s %s%s %s' % ( +# filter_name, action, proto, saddr, sport, daddr, dport, ' '.join(option)) + + +class CiscoASA(aclgenerator.ACLGenerator): + """A cisco ASA policy object.""" + + _PLATFORM = 'ciscoasa' + _DEFAULT_PROTOCOL = 'ip' + _SUFFIX = '.asa' + + _OPTIONAL_SUPPORTED_KEYWORDS = set(['expiration', + 'logging', + 'owner', + ]) + + def _TranslatePolicy(self, pol, exp_info): + self.ciscoasa_policies = [] + current_date = datetime.date.today() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + for header, terms in self.policy.filters: + filter_options = header.FilterOptions('ciscoasa') + filter_name = header.FilterName('ciscoasa') + + new_terms = [] + # now add the terms + for term in terms: + if term.expiration: + if term.expiration <= exp_info_date: + logging.info('INFO: Term %s in policy %s expires ' + 'in less than two weeks.', term.name, filter_name) + if term.expiration <= current_date: + logging.warn('WARNING: Term %s in policy %s is expired and ' + 'will not be rendered.', term.name, filter_name) + continue + + new_terms.append(str(Term(term,filter_name))) + + self.ciscoasa_policies.append((header, filter_name, new_terms)) + + def __str__(self): + target_header = [] + target = [] + + for (header, filter_name, terms) in self.ciscoasa_policies: + + target.append('clear configure access-list %s' % filter_name) + + # add the p4 tags + target.extend(aclgenerator.AddRepositoryTags('access-list %s remark ' + % filter_name)) + + # add a header comment if one exists + for comment in header.comment: + for line in comment.split('\n'): + target.append('access-list %s remark %s' % (filter_name,line)) + + # now add the terms + for term in terms: + target.append(str(term)) + + # end for header, filter_name, filter_type... + return '\n'.join(target) + |