summaryrefslogtreecommitdiff
path: root/lib/nacaddr.py
blob: fc06f176005b79e1c4e5f83d4af8c3da2abb3c74 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
#!/usr/bin/python
#
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A subclass of the ipaddr library that includes comments for ipaddr objects."""

__author__ = 'watson@google.com (Tony Watson)'

from third_party import ipaddr

def IP(ipaddress, comment='', token=''):
  """Take an ip string and return an object of the correct type.

  Args:
    ip_string: the ip address.
    comment:: option comment field
    token:: option token name where this address was extracted from

  Returns:
    ipaddr.IPv4 or ipaddr.IPv6 object or raises ValueError.

  Raises:
    ValueError: if the string passed isn't either a v4 or a v6 address.

  Notes:
    this is sort of a poor-mans factory method.
  """
  a = ipaddr.IPNetwork(ipaddress)
  if a.version == 4:
    return IPv4(ipaddress, comment, token)
  elif a.version == 6:
    return IPv6(ipaddress, comment, token)

class IPv4(ipaddr.IPv4Network):
  """This subclass allows us to keep text comments related to each object."""

  def __init__(self, ip_string, comment='', token=''):
    ipaddr.IPv4Network.__init__(self, ip_string)
    self.text = comment
    self.token = token
    self.parent_token = token

  def AddComment(self, comment=''):
    """Append comment to self.text, comma seperated.

    Don't add the comment if it's the same as self.text.

    Args: comment
    """
    if self.text:
      if comment and comment not in self.text:
        self.text += ', ' + comment
    else:
      self.text = comment

  def supernet(self, prefixlen_diff=1):
    """Override ipaddr.IPv4 supernet so we can maintain comments.

    See ipaddr.IPv4.Supernet for complete documentation.
    """
    if self.prefixlen == 0:
      return self
    if self.prefixlen - prefixlen_diff < 0:
      raise PrefixlenDiffInvalidError(
          'current prefixlen is %d, cannot have a prefixlen_diff of %d' % (
              self.prefixlen, prefixlen_diff))
    ret_addr = IPv4(ipaddr.IPv4Network.supernet(self, prefixlen_diff),
                    comment=self.text, token=self.token)
    return ret_addr

  # Backwards compatibility name from v1.
  Supernet = supernet


class IPv6(ipaddr.IPv6Network):
  """This subclass allows us to keep text comments related to each object."""

  def __init__(self, ip_string, comment='', token=''):
    ipaddr.IPv6Network.__init__(self, ip_string)
    self.text = comment
    self.token = token
    self.parent_token = token

  def supernet(self, prefixlen_diff=1):
    """Override ipaddr.IPv6Network supernet so we can maintain comments.

    See ipaddr.IPv6Network.Supernet for complete documentation.
    """
    if self.prefixlen == 0:
      return self
    if self.prefixlen - prefixlen_diff < 0:
      raise PrefixlenDiffInvalidError(
          'current prefixlen is %d, cannot have a prefixlen_diff of %d' % (
              self.prefixlen, prefixlen_diff))
    ret_addr = IPv6(ipaddr.IPv6Network.supernet(self, prefixlen_diff),
                    comment=self.text, token=self.token)
    return ret_addr

  # Backwards compatibility name from v1.
  Supernet = supernet

  def AddComment(self, comment=''):
    """Append comment to self.text, comma seperated.

    Don't add the comment if it's the same as self.text.

    Args: comment
    """
    if self.text:
      if comment and comment not in self.text:
        self.text += ', ' + comment
    else:
      self.text = comment


def CollapseAddrListRecursive(addresses):
  """Recursively loops through the addresses, collapsing concurent netblocks.

   Example:

   ip1 = ipaddr.IPv4Network('1.1.0.0/24')
   ip2 = ipaddr.IPv4Network('1.1.1.0/24')
   ip3 = ipaddr.IPv4Network('1.1.2.0/24')
   ip4 = ipaddr.IPv4Network('1.1.3.0/24')
   ip5 = ipaddr.IPv4Network('1.1.4.0/24')
   ip6 = ipaddr.IPv4Network('1.1.0.1/22')

   CollapseAddrRecursive([ip1, ip2, ip3, ip4, ip5, ip6]) ->
   [IPv4Network('1.1.0.0/22'), IPv4Network('1.1.4.0/24')]

   Note, this shouldn't be called directly, but is called via
   CollapseAddr([])

  Args:
    addresses: List of IPv4 or IPv6 objects

  Returns:
    List of IPv4 or IPv6 objects (depending on what we were passed)
  """
  ret_array = []
  optimized = False

  for cur_addr in addresses:
    if not ret_array:
      ret_array.append(cur_addr)
      continue
    if ret_array[-1].Contains(cur_addr):
      # save the comment from the subsumed address
      ret_array[-1].AddComment(cur_addr.text)
      optimized = True
    elif cur_addr == ret_array[-1].Supernet().Subnet()[1]:
      ret_array.append(ret_array.pop().Supernet())
      # save the text from the subsumed address
      ret_array[-1].AddComment(cur_addr.text)
      optimized = True
    else:
      ret_array.append(cur_addr)

  if optimized:
    return CollapseAddrListRecursive(ret_array)
  return ret_array


def CollapseAddrList(addresses):
  """Collapse an array of IP objects.

  Example:  CollapseAddr(
    [IPv4('1.1.0.0/24'), IPv4('1.1.1.0/24')]) -> [IPv4('1.1.0.0/23')]
    Note: this works just as well with IPv6 addresses too.

  Args:
     addresses: list of ipaddr.IPNetwork objects

  Returns:
    list of ipaddr.IPNetwork objects
  """
  return CollapseAddrListRecursive(
      sorted(addresses, key=ipaddr._BaseNet._get_networks_key))


def SortAddrList(addresses):
  """Return a sorted list of nacaddr objects."""
  return sorted(addresses, key=ipaddr._BaseNet._get_networks_key)


def RemoveAddressFromList(superset, exclude):
  """Remove a single address from a list of addresses.

  Args:
    superset: a List of nacaddr IPv4 or IPv6 addresses
    exclude: a single nacaddr IPv4 or IPv6 address

  Returns:
    a List of nacaddr IPv4 or IPv6 addresses
  """
  ret_array = []
  for addr in superset:
    if exclude == addr or addr in exclude:
      # this is a bug in ipaddr v1. IP('1.1.1.1').AddressExclude(IP('1.1.1.1'))
      # raises an error.  Not tested in v2 yet.
      pass
    elif exclude.version == addr.version and exclude in addr:
      ret_array.extend([IP(x) for x in addr.AddressExclude(exclude)])
    else:
      ret_array.append(addr)
  return ret_array


def AddressListExclude(superset, excludes):
  """Remove a list of addresses from another list of addresses.

  Args:
    superset: a List of nacaddr IPv4 or IPv6 addresses
    excludes: a List nacaddr IPv4 or IPv6 addresses

  Returns:
    a List of nacaddr IPv4 or IPv6 addresses
  """
  superset = CollapseAddrList(superset)
  excludes = CollapseAddrList(excludes)

  ret_array = []

  for ex in excludes:
    superset = RemoveAddressFromList(superset, ex)
  return CollapseAddrList(superset)


ExcludeAddrs = AddressListExclude


class PrefixlenDiffInvalidError(ipaddr.NetmaskValueError):
  """Holdover from ipaddr v1."""


if __name__ == '__main__':
  pass