diff options
Diffstat (limited to 'lib/nacaddr.py')
-rw-r--r-- | lib/nacaddr.py | 250 |
1 files changed, 250 insertions, 0 deletions
diff --git a/lib/nacaddr.py b/lib/nacaddr.py new file mode 100644 index 0000000..fc06f17 --- /dev/null +++ b/lib/nacaddr.py @@ -0,0 +1,250 @@ +#!/usr/bin/python +# +# Copyright 2011 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A subclass of the ipaddr library that includes comments for ipaddr objects.""" + +__author__ = 'watson@google.com (Tony Watson)' + +from third_party import ipaddr + +def IP(ipaddress, comment='', token=''): + """Take an ip string and return an object of the correct type. + + Args: + ip_string: the ip address. + comment:: option comment field + token:: option token name where this address was extracted from + + Returns: + ipaddr.IPv4 or ipaddr.IPv6 object or raises ValueError. + + Raises: + ValueError: if the string passed isn't either a v4 or a v6 address. + + Notes: + this is sort of a poor-mans factory method. + """ + a = ipaddr.IPNetwork(ipaddress) + if a.version == 4: + return IPv4(ipaddress, comment, token) + elif a.version == 6: + return IPv6(ipaddress, comment, token) + +class IPv4(ipaddr.IPv4Network): + """This subclass allows us to keep text comments related to each object.""" + + def __init__(self, ip_string, comment='', token=''): + ipaddr.IPv4Network.__init__(self, ip_string) + self.text = comment + self.token = token + self.parent_token = token + + def AddComment(self, comment=''): + """Append comment to self.text, comma seperated. + + Don't add the comment if it's the same as self.text. + + Args: comment + """ + if self.text: + if comment and comment not in self.text: + self.text += ', ' + comment + else: + self.text = comment + + def supernet(self, prefixlen_diff=1): + """Override ipaddr.IPv4 supernet so we can maintain comments. + + See ipaddr.IPv4.Supernet for complete documentation. + """ + if self.prefixlen == 0: + return self + if self.prefixlen - prefixlen_diff < 0: + raise PrefixlenDiffInvalidError( + 'current prefixlen is %d, cannot have a prefixlen_diff of %d' % ( + self.prefixlen, prefixlen_diff)) + ret_addr = IPv4(ipaddr.IPv4Network.supernet(self, prefixlen_diff), + comment=self.text, token=self.token) + return ret_addr + + # Backwards compatibility name from v1. + Supernet = supernet + + +class IPv6(ipaddr.IPv6Network): + """This subclass allows us to keep text comments related to each object.""" + + def __init__(self, ip_string, comment='', token=''): + ipaddr.IPv6Network.__init__(self, ip_string) + self.text = comment + self.token = token + self.parent_token = token + + def supernet(self, prefixlen_diff=1): + """Override ipaddr.IPv6Network supernet so we can maintain comments. + + See ipaddr.IPv6Network.Supernet for complete documentation. + """ + if self.prefixlen == 0: + return self + if self.prefixlen - prefixlen_diff < 0: + raise PrefixlenDiffInvalidError( + 'current prefixlen is %d, cannot have a prefixlen_diff of %d' % ( + self.prefixlen, prefixlen_diff)) + ret_addr = IPv6(ipaddr.IPv6Network.supernet(self, prefixlen_diff), + comment=self.text, token=self.token) + return ret_addr + + # Backwards compatibility name from v1. + Supernet = supernet + + def AddComment(self, comment=''): + """Append comment to self.text, comma seperated. + + Don't add the comment if it's the same as self.text. + + Args: comment + """ + if self.text: + if comment and comment not in self.text: + self.text += ', ' + comment + else: + self.text = comment + + +def CollapseAddrListRecursive(addresses): + """Recursively loops through the addresses, collapsing concurent netblocks. + + Example: + + ip1 = ipaddr.IPv4Network('1.1.0.0/24') + ip2 = ipaddr.IPv4Network('1.1.1.0/24') + ip3 = ipaddr.IPv4Network('1.1.2.0/24') + ip4 = ipaddr.IPv4Network('1.1.3.0/24') + ip5 = ipaddr.IPv4Network('1.1.4.0/24') + ip6 = ipaddr.IPv4Network('1.1.0.1/22') + + CollapseAddrRecursive([ip1, ip2, ip3, ip4, ip5, ip6]) -> + [IPv4Network('1.1.0.0/22'), IPv4Network('1.1.4.0/24')] + + Note, this shouldn't be called directly, but is called via + CollapseAddr([]) + + Args: + addresses: List of IPv4 or IPv6 objects + + Returns: + List of IPv4 or IPv6 objects (depending on what we were passed) + """ + ret_array = [] + optimized = False + + for cur_addr in addresses: + if not ret_array: + ret_array.append(cur_addr) + continue + if ret_array[-1].Contains(cur_addr): + # save the comment from the subsumed address + ret_array[-1].AddComment(cur_addr.text) + optimized = True + elif cur_addr == ret_array[-1].Supernet().Subnet()[1]: + ret_array.append(ret_array.pop().Supernet()) + # save the text from the subsumed address + ret_array[-1].AddComment(cur_addr.text) + optimized = True + else: + ret_array.append(cur_addr) + + if optimized: + return CollapseAddrListRecursive(ret_array) + return ret_array + + +def CollapseAddrList(addresses): + """Collapse an array of IP objects. + + Example: CollapseAddr( + [IPv4('1.1.0.0/24'), IPv4('1.1.1.0/24')]) -> [IPv4('1.1.0.0/23')] + Note: this works just as well with IPv6 addresses too. + + Args: + addresses: list of ipaddr.IPNetwork objects + + Returns: + list of ipaddr.IPNetwork objects + """ + return CollapseAddrListRecursive( + sorted(addresses, key=ipaddr._BaseNet._get_networks_key)) + + +def SortAddrList(addresses): + """Return a sorted list of nacaddr objects.""" + return sorted(addresses, key=ipaddr._BaseNet._get_networks_key) + + +def RemoveAddressFromList(superset, exclude): + """Remove a single address from a list of addresses. + + Args: + superset: a List of nacaddr IPv4 or IPv6 addresses + exclude: a single nacaddr IPv4 or IPv6 address + + Returns: + a List of nacaddr IPv4 or IPv6 addresses + """ + ret_array = [] + for addr in superset: + if exclude == addr or addr in exclude: + # this is a bug in ipaddr v1. IP('1.1.1.1').AddressExclude(IP('1.1.1.1')) + # raises an error. Not tested in v2 yet. + pass + elif exclude.version == addr.version and exclude in addr: + ret_array.extend([IP(x) for x in addr.AddressExclude(exclude)]) + else: + ret_array.append(addr) + return ret_array + + +def AddressListExclude(superset, excludes): + """Remove a list of addresses from another list of addresses. + + Args: + superset: a List of nacaddr IPv4 or IPv6 addresses + excludes: a List nacaddr IPv4 or IPv6 addresses + + Returns: + a List of nacaddr IPv4 or IPv6 addresses + """ + superset = CollapseAddrList(superset) + excludes = CollapseAddrList(excludes) + + ret_array = [] + + for ex in excludes: + superset = RemoveAddressFromList(superset, ex) + return CollapseAddrList(superset) + + +ExcludeAddrs = AddressListExclude + + +class PrefixlenDiffInvalidError(ipaddr.NetmaskValueError): + """Holdover from ipaddr v1.""" + + +if __name__ == '__main__': + pass |