summaryrefslogtreecommitdiff
path: root/lib/aclcheck.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/aclcheck.py')
-rwxr-xr-xlib/aclcheck.py302
1 files changed, 302 insertions, 0 deletions
diff --git a/lib/aclcheck.py b/lib/aclcheck.py
new file mode 100755
index 0000000..3e36a99
--- /dev/null
+++ b/lib/aclcheck.py
@@ -0,0 +1,302 @@
+#!/usr/bin/python
+#
+# Copyright 2011 Google Inc. All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Check where hosts, ports and protocols are matched in a capirca policy."""
+
+__author__ = 'watson@google.com (Tony Watson)'
+
+import logging
+import sys
+import nacaddr
+import policy
+import port
+
+
+class Error(Exception):
+ """Base error class."""
+
+
+class AddressError(Error):
+ """Incorrect IP address or format."""
+
+
+class BadPolicy(Error):
+ """Item is not a valid policy object."""
+
+
+class NoTargetError(Error):
+ """Specified target platform not available in specified policy."""
+
+
+class AclCheck(object):
+ """Check where hosts, ports and protocols match in a NAC policy.
+
+ Args:
+ pol:
+ policy.Policy object
+ src:
+ string, the source address
+ dst:
+ string: the destination address.
+ sport:
+ string, the source port.
+ dport:
+ string, the destination port.
+ proto:
+ string, the protocol.
+
+ Returns:
+ An AclCheck Object
+
+ Raises:
+ port.BarPortValue: An invalid source port is used
+ port.BadPortRange: A port is outside of the acceptable range 0-65535
+ AddressError: Incorrect ip address or format
+
+ """
+
+ def __init__(self,
+ pol,
+ src='any',
+ dst='any',
+ sport='any',
+ dport='any',
+ proto='any',
+ ):
+
+ self.pol_obj = pol
+ self.proto = proto
+
+ # validate source port
+ if sport == 'any':
+ self.sport = sport
+ else:
+ self.sport = port.Port(sport)
+
+ # validate destination port
+ if dport == 'any':
+ self.dport = dport
+ else:
+ self.dport = port.Port(dport)
+
+ # validate source address
+ if src == 'any':
+ self.src = src
+ else:
+ try:
+ self.src = nacaddr.IP(src)
+ except ValueError:
+ raise AddressError('bad source address: %s\n' % src)
+
+ # validate destination address
+ if dst == 'any':
+ self.dst = dst
+ else:
+ try:
+ self.dst = nacaddr.IP(dst)
+ except ValueError:
+ raise AddressError('bad destination address: %s\n' % dst)
+
+ if type(self.pol_obj) is not policy.Policy:
+ raise BadPolicy('Policy object is not valid.')
+
+ self.matches = []
+ self.exact_matches = []
+ for header, terms in self.pol_obj.filters:
+ filtername = header.target[0].options[0]
+ for term in terms:
+ possible = []
+ logging.debug('checking term: %s', term.name)
+ if not self._AddrInside(self.src, term.source_address):
+ logging.debug('srcaddr does not match')
+ continue
+ logging.debug('srcaddr matches: %s', self.src)
+ if not self._AddrInside(self.dst, term.destination_address):
+ logging.debug('dstaddr does not match')
+ continue
+ logging.debug('dstaddr matches: %s', self.dst)
+ if (self.sport != 'any' and term.source_port and not
+ self._PortInside(self.sport, term.source_port)):
+ logging.debug('sport does not match')
+ continue
+ logging.debug('sport matches: %s', self.sport)
+ if (self.dport != 'any' and term.destination_port and not
+ self._PortInside(self.dport, term.destination_port)):
+ logging.debug('dport does not match')
+ continue
+ logging.debug('dport matches: %s', self.dport)
+ if (self.proto != 'any' and term.protocol and
+ self.proto not in term.protocol):
+ logging.debug('proto does not match')
+ continue
+ logging.debug('proto matches: %s', self.proto)
+ if term.protocol_except and self.proto in term.protocol_except:
+ logging.debug('protocol excepted by term, no match.')
+ continue
+ logging.debug('proto not excepted: %s', self.proto)
+ if not term.action: # avoid any verbatim
+ logging.debug('term had no action (verbatim?), no match.')
+ continue
+ logging.debug('term has an action')
+ possible = self._PossibleMatch(term)
+ self.matches.append(Match(filtername, term.name, possible, term.action,
+ term.qos))
+ if possible:
+ logging.debug('term has options: %s, not treating as exact match',
+ possible)
+ continue
+
+ # if we get here then we have a match, and if the action isn't next and
+ # there are no possibles, then this is a "definite" match and we needn't
+ # look for any further matches (i.e. later terms may match, but since
+ # we'll never get there we shouldn't report them)
+ if 'next' not in term.action:
+ self.exact_matches.append(Match(filtername, term.name, [],
+ term.action, term.qos))
+ break
+
+ def Matches(self):
+ """Return list of matched terms."""
+ return self.matches
+
+ def ExactMatches(self):
+ """Return matched terms, but not terms with possibles or action next."""
+ return self.exact_matches
+
+ def ActionMatch(self, action='any'):
+ """Return list of matched terms with specified actions."""
+ match_list = []
+ for next in self.matches:
+ if next.action:
+ if not next.possibles:
+ if action is 'any' or action in next.action:
+ match_list.append(next)
+ return match_list
+
+ def DescribeMatches(self):
+ """Provide sentence descriptions of matches.
+
+ Returns:
+ ret_str: text sentences describing matches
+ """
+ ret_str = []
+ for next in self.matches:
+ text = str(next)
+ ret_str.append(text)
+ return '\n'.join(ret_str)
+
+ def __str__(self):
+ text = []
+ last_filter = ''
+ for next in self.matches:
+ if next.filter != last_filter:
+ last_filter = next.filter
+ text.append(' filter: ' + next.filter)
+ if next.possibles:
+ text.append(' ' * 10 + 'term: ' + next.term + ' (possible match)')
+ else:
+ text.append(' ' * 10 + 'term: ' + next.term)
+ if next.possibles:
+ text.append(' ' * 16 + next.action + ' if ' + str(next.possibles))
+ else:
+ text.append(' ' * 16 + next.action)
+ return '\n'.join(text)
+
+ def _PossibleMatch(self, term):
+ """Ignore some options and keywords that are edge cases.
+
+ Args:
+ term: term object to examine for edge-cases
+
+ Returns:
+ ret_str: a list of reasons this term may possible match
+ """
+ ret_str = []
+ if 'first-fragment' in term.option:
+ ret_str.append('first-frag')
+ if term.fragment_offset:
+ ret_str.append('frag-offset')
+ if term.packet_length:
+ ret_str.append('packet-length')
+ if 'established' in term.option:
+ ret_str.append('est')
+ if 'tcp-established' in term.option and 'tcp' in term.protocol:
+ ret_str.append('tcp-est')
+ return ret_str
+
+ def _AddrInside(self, addr, addresses):
+ """Check if address is matched in another address or group of addresses.
+
+ Args:
+ addr: An ipaddr network or host address or text 'any'
+ addresses: A list of ipaddr network or host addresses
+
+ Returns:
+ bool: True of false
+ """
+ if addr is 'any': return True # always true if we match for any addr
+ if not addresses: return True # always true if term has nothing to match
+ for next in addresses:
+ # ipaddr can incorrectly report ipv4 as contained with ipv6 addrs
+ if type(addr) is type(next):
+ if addr in next:
+ return True
+ return False
+
+ def _PortInside(self, myport, port_list):
+ """Check if port matches in a port or group of ports.
+
+ Args:
+ myport: port number
+ port_list: list of ports
+
+ Returns:
+ bool: True of false
+ """
+ if myport == 'any': return True
+ if [x for x in port_list if x[0] <= myport <= x[1]]:
+ return True
+ return False
+
+
+class Match(object):
+ """A matching term and its associate values."""
+
+ def __init__(self, filtername, term, possibles, action, qos=None):
+ self.filter = filtername
+ self.term = term
+ self.possibles = possibles
+ self.action = action[0]
+ self.qos = qos
+
+ def __str__(self):
+ text = ''
+ if self.possibles:
+ text += 'possible ' + self.action
+ else:
+ text += self.action
+ text += ' in term ' + self.term + ' of filter ' + self.filter
+ if self.possibles:
+ text += ' with factors: ' + str(', '.join(self.possibles))
+ return text
+
+
+def main():
+ pass
+
+if __name__ == '__main__':
+ main()