diff options
Diffstat (limited to 'lib/aclcheck.py')
-rwxr-xr-x | lib/aclcheck.py | 302 |
1 files changed, 302 insertions, 0 deletions
diff --git a/lib/aclcheck.py b/lib/aclcheck.py new file mode 100755 index 0000000..3e36a99 --- /dev/null +++ b/lib/aclcheck.py @@ -0,0 +1,302 @@ +#!/usr/bin/python +# +# Copyright 2011 Google Inc. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Check where hosts, ports and protocols are matched in a capirca policy.""" + +__author__ = 'watson@google.com (Tony Watson)' + +import logging +import sys +import nacaddr +import policy +import port + + +class Error(Exception): + """Base error class.""" + + +class AddressError(Error): + """Incorrect IP address or format.""" + + +class BadPolicy(Error): + """Item is not a valid policy object.""" + + +class NoTargetError(Error): + """Specified target platform not available in specified policy.""" + + +class AclCheck(object): + """Check where hosts, ports and protocols match in a NAC policy. + + Args: + pol: + policy.Policy object + src: + string, the source address + dst: + string: the destination address. + sport: + string, the source port. + dport: + string, the destination port. + proto: + string, the protocol. + + Returns: + An AclCheck Object + + Raises: + port.BarPortValue: An invalid source port is used + port.BadPortRange: A port is outside of the acceptable range 0-65535 + AddressError: Incorrect ip address or format + + """ + + def __init__(self, + pol, + src='any', + dst='any', + sport='any', + dport='any', + proto='any', + ): + + self.pol_obj = pol + self.proto = proto + + # validate source port + if sport == 'any': + self.sport = sport + else: + self.sport = port.Port(sport) + + # validate destination port + if dport == 'any': + self.dport = dport + else: + self.dport = port.Port(dport) + + # validate source address + if src == 'any': + self.src = src + else: + try: + self.src = nacaddr.IP(src) + except ValueError: + raise AddressError('bad source address: %s\n' % src) + + # validate destination address + if dst == 'any': + self.dst = dst + else: + try: + self.dst = nacaddr.IP(dst) + except ValueError: + raise AddressError('bad destination address: %s\n' % dst) + + if type(self.pol_obj) is not policy.Policy: + raise BadPolicy('Policy object is not valid.') + + self.matches = [] + self.exact_matches = [] + for header, terms in self.pol_obj.filters: + filtername = header.target[0].options[0] + for term in terms: + possible = [] + logging.debug('checking term: %s', term.name) + if not self._AddrInside(self.src, term.source_address): + logging.debug('srcaddr does not match') + continue + logging.debug('srcaddr matches: %s', self.src) + if not self._AddrInside(self.dst, term.destination_address): + logging.debug('dstaddr does not match') + continue + logging.debug('dstaddr matches: %s', self.dst) + if (self.sport != 'any' and term.source_port and not + self._PortInside(self.sport, term.source_port)): + logging.debug('sport does not match') + continue + logging.debug('sport matches: %s', self.sport) + if (self.dport != 'any' and term.destination_port and not + self._PortInside(self.dport, term.destination_port)): + logging.debug('dport does not match') + continue + logging.debug('dport matches: %s', self.dport) + if (self.proto != 'any' and term.protocol and + self.proto not in term.protocol): + logging.debug('proto does not match') + continue + logging.debug('proto matches: %s', self.proto) + if term.protocol_except and self.proto in term.protocol_except: + logging.debug('protocol excepted by term, no match.') + continue + logging.debug('proto not excepted: %s', self.proto) + if not term.action: # avoid any verbatim + logging.debug('term had no action (verbatim?), no match.') + continue + logging.debug('term has an action') + possible = self._PossibleMatch(term) + self.matches.append(Match(filtername, term.name, possible, term.action, + term.qos)) + if possible: + logging.debug('term has options: %s, not treating as exact match', + possible) + continue + + # if we get here then we have a match, and if the action isn't next and + # there are no possibles, then this is a "definite" match and we needn't + # look for any further matches (i.e. later terms may match, but since + # we'll never get there we shouldn't report them) + if 'next' not in term.action: + self.exact_matches.append(Match(filtername, term.name, [], + term.action, term.qos)) + break + + def Matches(self): + """Return list of matched terms.""" + return self.matches + + def ExactMatches(self): + """Return matched terms, but not terms with possibles or action next.""" + return self.exact_matches + + def ActionMatch(self, action='any'): + """Return list of matched terms with specified actions.""" + match_list = [] + for next in self.matches: + if next.action: + if not next.possibles: + if action is 'any' or action in next.action: + match_list.append(next) + return match_list + + def DescribeMatches(self): + """Provide sentence descriptions of matches. + + Returns: + ret_str: text sentences describing matches + """ + ret_str = [] + for next in self.matches: + text = str(next) + ret_str.append(text) + return '\n'.join(ret_str) + + def __str__(self): + text = [] + last_filter = '' + for next in self.matches: + if next.filter != last_filter: + last_filter = next.filter + text.append(' filter: ' + next.filter) + if next.possibles: + text.append(' ' * 10 + 'term: ' + next.term + ' (possible match)') + else: + text.append(' ' * 10 + 'term: ' + next.term) + if next.possibles: + text.append(' ' * 16 + next.action + ' if ' + str(next.possibles)) + else: + text.append(' ' * 16 + next.action) + return '\n'.join(text) + + def _PossibleMatch(self, term): + """Ignore some options and keywords that are edge cases. + + Args: + term: term object to examine for edge-cases + + Returns: + ret_str: a list of reasons this term may possible match + """ + ret_str = [] + if 'first-fragment' in term.option: + ret_str.append('first-frag') + if term.fragment_offset: + ret_str.append('frag-offset') + if term.packet_length: + ret_str.append('packet-length') + if 'established' in term.option: + ret_str.append('est') + if 'tcp-established' in term.option and 'tcp' in term.protocol: + ret_str.append('tcp-est') + return ret_str + + def _AddrInside(self, addr, addresses): + """Check if address is matched in another address or group of addresses. + + Args: + addr: An ipaddr network or host address or text 'any' + addresses: A list of ipaddr network or host addresses + + Returns: + bool: True of false + """ + if addr is 'any': return True # always true if we match for any addr + if not addresses: return True # always true if term has nothing to match + for next in addresses: + # ipaddr can incorrectly report ipv4 as contained with ipv6 addrs + if type(addr) is type(next): + if addr in next: + return True + return False + + def _PortInside(self, myport, port_list): + """Check if port matches in a port or group of ports. + + Args: + myport: port number + port_list: list of ports + + Returns: + bool: True of false + """ + if myport == 'any': return True + if [x for x in port_list if x[0] <= myport <= x[1]]: + return True + return False + + +class Match(object): + """A matching term and its associate values.""" + + def __init__(self, filtername, term, possibles, action, qos=None): + self.filter = filtername + self.term = term + self.possibles = possibles + self.action = action[0] + self.qos = qos + + def __str__(self): + text = '' + if self.possibles: + text += 'possible ' + self.action + else: + text += self.action + text += ' in term ' + self.term + ' of filter ' + self.filter + if self.possibles: + text += ' with factors: ' + str(', '.join(self.possibles)) + return text + + +def main(): + pass + +if __name__ == '__main__': + main() |