summaryrefslogtreecommitdiff
path: root/lib/aclcheck.py
blob: 3e36a993d9818687efc38a1f9029dd21830ea1a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/python
#
# Copyright 2011 Google Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Check where hosts, ports and protocols are matched in a capirca policy."""

__author__ = 'watson@google.com (Tony Watson)'

import logging
import sys
import nacaddr
import policy
import port


class Error(Exception):
  """Base error class."""


class AddressError(Error):
  """Incorrect IP address or format."""


class BadPolicy(Error):
  """Item is not a valid policy object."""


class NoTargetError(Error):
  """Specified target platform not available in specified policy."""


class AclCheck(object):
  """Check where hosts, ports and protocols match in a NAC policy.

  Args:
    pol:
      policy.Policy object
    src:
      string, the source address
    dst:
      string: the destination address.
    sport:
      string, the source port.
    dport:
      string, the destination port.
    proto:
      string, the protocol.

  Returns:
    An AclCheck Object

  Raises:
    port.BarPortValue: An invalid source port is used
    port.BadPortRange: A port is outside of the acceptable range 0-65535
    AddressError: Incorrect ip address or format

  """

  def __init__(self,
               pol,
               src='any',
               dst='any',
               sport='any',
               dport='any',
               proto='any',
              ):

    self.pol_obj = pol
    self.proto = proto

    # validate source port
    if sport == 'any':
      self.sport = sport
    else:
      self.sport = port.Port(sport)

    # validate destination port
    if dport == 'any':
      self.dport = dport
    else:
      self.dport = port.Port(dport)

    # validate source address
    if src == 'any':
      self.src = src
    else:
      try:
        self.src = nacaddr.IP(src)
      except ValueError:
        raise AddressError('bad source address: %s\n' % src)

    # validate destination address
    if dst == 'any':
      self.dst = dst
    else:
      try:
        self.dst = nacaddr.IP(dst)
      except ValueError:
        raise AddressError('bad destination address: %s\n' % dst)

    if type(self.pol_obj) is not policy.Policy:
      raise BadPolicy('Policy object is not valid.')

    self.matches = []
    self.exact_matches = []
    for header, terms in self.pol_obj.filters:
      filtername = header.target[0].options[0]
      for term in terms:
        possible = []
        logging.debug('checking term: %s', term.name)
        if not self._AddrInside(self.src, term.source_address):
          logging.debug('srcaddr does not match')
          continue
        logging.debug('srcaddr matches: %s', self.src)
        if not self._AddrInside(self.dst, term.destination_address):
          logging.debug('dstaddr does not match')
          continue
        logging.debug('dstaddr matches: %s', self.dst)
        if (self.sport != 'any' and term.source_port and not
            self._PortInside(self.sport, term.source_port)):
          logging.debug('sport does not match')
          continue
        logging.debug('sport matches: %s', self.sport)
        if (self.dport != 'any' and term.destination_port and not
            self._PortInside(self.dport, term.destination_port)):
          logging.debug('dport does not match')
          continue
        logging.debug('dport matches: %s', self.dport)
        if (self.proto != 'any' and term.protocol and
            self.proto not in term.protocol):
          logging.debug('proto does not match')
          continue
        logging.debug('proto matches: %s', self.proto)
        if term.protocol_except and self.proto in term.protocol_except:
          logging.debug('protocol excepted by term, no match.')
          continue
        logging.debug('proto not excepted: %s', self.proto)
        if not term.action:  # avoid any verbatim
          logging.debug('term had no action (verbatim?), no match.')
          continue
        logging.debug('term has an action')
        possible = self._PossibleMatch(term)
        self.matches.append(Match(filtername, term.name, possible, term.action,
                                  term.qos))
        if possible:
          logging.debug('term has options: %s, not treating as exact match',
                        possible)
          continue

        # if we get here then we have a match, and if the action isn't next and
        # there are no possibles, then this is a "definite" match and we needn't
        # look for any further matches (i.e. later terms may match, but since
        # we'll never get there we shouldn't report them)
        if 'next' not in term.action:
          self.exact_matches.append(Match(filtername, term.name, [],
                                          term.action, term.qos))
          break

  def Matches(self):
    """Return list of matched terms."""
    return self.matches

  def ExactMatches(self):
    """Return matched terms, but not terms with possibles or action next."""
    return self.exact_matches

  def ActionMatch(self, action='any'):
    """Return list of matched terms with specified actions."""
    match_list = []
    for next in self.matches:
      if next.action:
        if not next.possibles:
          if action is 'any' or action in next.action:
            match_list.append(next)
    return match_list

  def DescribeMatches(self):
    """Provide sentence descriptions of matches.

    Returns:
      ret_str: text sentences describing matches
    """
    ret_str = []
    for next in self.matches:
      text = str(next)
      ret_str.append(text)
    return '\n'.join(ret_str)

  def __str__(self):
    text = []
    last_filter = ''
    for next in self.matches:
      if next.filter != last_filter:
        last_filter = next.filter
        text.append('  filter: ' + next.filter)
      if next.possibles:
        text.append(' ' * 10 + 'term: ' + next.term + ' (possible match)')
      else:
        text.append(' ' * 10 + 'term: ' + next.term)
      if next.possibles:
        text.append(' ' * 16 + next.action + ' if ' + str(next.possibles))
      else:
        text.append(' ' * 16 + next.action)
    return '\n'.join(text)

  def _PossibleMatch(self, term):
    """Ignore some options and keywords that are edge cases.

    Args:
      term: term object to examine for edge-cases

    Returns:
      ret_str: a list of reasons this term may possible match
    """
    ret_str = []
    if 'first-fragment' in term.option:
      ret_str.append('first-frag')
    if term.fragment_offset:
      ret_str.append('frag-offset')
    if term.packet_length:
      ret_str.append('packet-length')
    if 'established' in term.option:
      ret_str.append('est')
    if 'tcp-established' in term.option and 'tcp' in term.protocol:
      ret_str.append('tcp-est')
    return ret_str

  def _AddrInside(self, addr, addresses):
    """Check if address is matched in another address or group of addresses.

    Args:
      addr: An ipaddr network or host address or text 'any'
      addresses: A list of ipaddr network or host addresses

    Returns:
      bool: True of false
    """
    if addr is 'any': return True   # always true if we match for any addr
    if not addresses: return True   # always true if term has nothing to match
    for next in addresses:
      # ipaddr can incorrectly report ipv4 as contained with ipv6 addrs
      if type(addr) is type(next):
        if addr in next:
          return True
    return False

  def _PortInside(self, myport, port_list):
    """Check if port matches in a port or group of ports.

    Args:
      myport: port number
      port_list: list of ports

    Returns:
      bool: True of false
    """
    if myport == 'any': return True
    if [x for x in port_list if x[0] <= myport <= x[1]]:
      return True
    return False


class Match(object):
  """A matching term and its associate values."""

  def __init__(self, filtername, term, possibles, action, qos=None):
    self.filter = filtername
    self.term = term
    self.possibles = possibles
    self.action = action[0]
    self.qos = qos

  def __str__(self):
    text = ''
    if self.possibles:
      text += 'possible ' + self.action
    else:
      text += self.action
    text += ' in term ' + self.term + ' of filter ' + self.filter
    if self.possibles:
      text += ' with factors: ' + str(', '.join(self.possibles))
    return text


def main():
  pass

if __name__ == '__main__':
  main()