diff options
Diffstat (limited to 'lib/junipersrx.py')
-rw-r--r-- | lib/junipersrx.py | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/lib/junipersrx.py b/lib/junipersrx.py new file mode 100644 index 0000000..c2e0676 --- /dev/null +++ b/lib/junipersrx.py @@ -0,0 +1,448 @@ +#!/usr/bin/python +# +# Copyright 2011 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""SRX generator.""" +# pylint: disable-msg=W0231 + +__author__ = 'robankeny@google.com (Robert Ankeny)' + +import collections +import datetime +import logging + +import aclgenerator +import nacaddr + + +class Error(Exception): + """generic error class.""" + + +class UnsupportedFilterError(Error): + pass + + +class UnsupportedHeader(Error): + pass + + +class SRXDuplicateTermError(Error): + pass + + +class SRXVerbatimError(Error): + pass + + +class SRXOptionError(Error): + pass + + +class Term(aclgenerator.Term): + """Representation of an individual SRX term. + + This is mostly useful for the __str__() method. + + Args: + obj: a policy.Term object + term_type: type of filter to generate, e.g. inet or inet6 + filter_options: list of remaining target options (zones) + """ + + _ACTIONS = {'accept': 'permit', + 'deny': 'deny', + 'reject': 'reject', + 'count': 'count', + 'log': 'log'} + + def __init__(self, term, term_type, zones): + self.term = term + self.term_type = term_type + self.from_zone = zones[1] + self.to_zone = zones[3] + self.extra_actions = [] + + def __str__(self): + """Render config output from this term object.""" + # Verify platform specific terms. Skip whole term if platform does not + # match. + if self.term.platform: + if 'srx' not in self.term.platform: + return '' + if self.term.platform_exclude: + if 'srx' in self.term.platform_exclude: + return '' + ret_str = [] + + # COMMENTS + comment_max_width = 68 + if self.term.owner: + self.term.comment.append('Owner: %s' % self.term.owner) + comments = aclgenerator.WrapWords(self.term.comment, comment_max_width) + if comments and comments[0]: + ret_str.append(JuniperSRX.INDENT * 3 + '/*') + for line in comments: + ret_str.append(JuniperSRX.INDENT * 3 + line) + ret_str.append(JuniperSRX.INDENT * 3 + '*/') + + ret_str.append(JuniperSRX.INDENT * 3 + 'policy ' + self.term.name + ' {') + ret_str.append(JuniperSRX.INDENT * 4 + 'match {') + + # SOURCE-ADDRESS + if self.term.source_address: + saddr_check = set() + for saddr in self.term.source_address: + saddr_check.add(saddr.parent_token) + saddr_check = sorted(saddr_check) + source_address_string = '' + for addr in saddr_check: + source_address_string += addr + ' ' + ret_str.append(JuniperSRX.INDENT * 5 + 'source-address [ ' + + source_address_string + '];') + else: + ret_str.append(JuniperSRX.INDENT * 5 + 'source-address any;') + + # DESTINATION-ADDRESS + if self.term.destination_address: + daddr_check = [] + for daddr in self.term.destination_address: + daddr_check.append(daddr.parent_token) + daddr_check = set(daddr_check) + daddr_check = list(daddr_check) + daddr_check.sort() + destination_address_string = '' + for addr in daddr_check: + destination_address_string += addr + ' ' + ret_str.append(JuniperSRX.INDENT * 5 + 'destination-address [ ' + + destination_address_string + '];') + else: + ret_str.append(JuniperSRX.INDENT * 5 + 'destination-address any;') + + # APPLICATION + if (not self.term.source_port and not self.term.destination_port and not + self.term.icmp_type and not self.term.protocol): + ret_str.append(JuniperSRX.INDENT * 5 + 'application any;') + else: + ret_str.append(JuniperSRX.INDENT * 5 + 'application ' + self.term.name + + '-app;') + + ret_str.append(JuniperSRX.INDENT * 4 + '}') + + # ACTIONS + for action in self.term.action: + ret_str.append(JuniperSRX.INDENT * 4 + 'then {') + ret_str.append(JuniperSRX.INDENT * 5 + self._ACTIONS.get( + str(action)) + ';') + + # LOGGING + if self.term.logging: + ret_str.append(JuniperSRX.INDENT * 5 + 'log {') + ret_str.append(JuniperSRX.INDENT * 6 + 'session-init;') + ret_str.append(JuniperSRX.INDENT * 5 + '}') + ret_str.append(JuniperSRX.INDENT * 4 + '}') + + ret_str.append(JuniperSRX.INDENT * 3 + '}') + + # OPTIONS + if self.term.option: + raise SRXOptionError('Options are not implemented yet, please remove ' + + 'from term %s' % self.term.name) + + # VERBATIM + if self.term.verbatim: + raise SRXVerbatimError('Verbatim is not implemented, please remove ' + + 'the offending term %s.' % self.term.name) + return '\n'.join(ret_str) + + def _Group(self, group): + """If 1 item return it, else return [ item1 item2 ]. + + Args: + group: a list. could be a list of strings (protocols) or a list of + tuples (ports) + + Returns: + rval: a string surrounded by '[' and '];' if len(group) > 1 + or with just ';' appended if len(group) == 1 + """ + + def _FormattedGroup(el): + """Return the actual formatting of an individual element. + + Args: + el: either a string (protocol) or a tuple (ports) + + Returns: + string: either the lower()'ed string or the ports, hyphenated + if they're a range, or by itself if it's not. + """ + if isinstance(el, str): + return el.lower() + elif isinstance(el, int): + return str(el) + # type is a tuple below here + elif el[0] == el[1]: + return '%d' % el[0] + else: + return '%d-%d' % (el[0], el[1]) + + if len(group) > 1: + rval = '[ ' + ' '.join([_FormattedGroup(x) for x in group]) + ' ];' + else: + rval = _FormattedGroup(group[0]) + ';' + return rval + + +class JuniperSRX(aclgenerator.ACLGenerator): + """SRX rendering class. + + This class takes a policy object and renders the output into a syntax + which is understood by SRX firewalls. + + Args: + pol: policy.Policy object + """ + + _PLATFORM = 'srx' + _SUFFIX = '.srx' + _SUPPORTED_AF = set(('inet',)) + _OPTIONAL_SUPPORTED_KEYWORDS = set(['expiration', + 'logging', + 'owner', + 'routing_instance', # safe to skip + 'timeout' + ]) + INDENT = ' ' + + def _TranslatePolicy(self, pol, exp_info): + """Transform a policy object into a JuniperSRX object. + + Args: + pol: policy.Policy object + exp_info: print a info message when a term is set to expire + in that many weeks + + Raises: + UnsupportedFilterError: An unsupported filter was specified + UnsupportedHeader: A header option exists that is not understood/usable + SRXDuplicateTermError: Two terms were found with same name in same filter + """ + self.srx_policies = [] + self.addressbook = collections.OrderedDict() + self.applications = [] + self.ports = [] + self.from_zone = '' + self.to_zone = '' + + current_date = datetime.date.today() + exp_info_date = current_date + datetime.timedelta(weeks=exp_info) + + for header, terms in pol.filters: + if self._PLATFORM not in header.platforms: + continue + + filter_options = header.FilterOptions(self._PLATFORM) + + if (len(filter_options) < 4 or filter_options[0] != 'from-zone' or + filter_options[2] != 'to-zone'): + raise UnsupportedFilterError( + 'SRX filter arguments must specify from-zone and to-zone.') + self.from_zone = filter_options[1] + self.to_zone = filter_options[3] + + if len(filter_options) > 4: + filter_type = filter_options[4] + else: + filter_type = 'inet' + if filter_type not in self._SUPPORTED_AF: + raise UnsupportedHeader( + 'SRX Generator currently does not support %s as a header option' % + (filter_type)) + + term_dup_check = set() + new_terms = [] + for term in terms: + term.name = self.FixTermLength(term.name) + if term.name in term_dup_check: + raise SRXDuplicateTermError('You have a duplicate term: %s' + % term.name) + term_dup_check.add(term.name) + + if term.expiration: + if term.expiration <= exp_info_date: + logging.info('INFO: Term %s in policy %s>%s expires ' + 'in less than two weeks.', term.name, self.from_zone, + self.to_zone) + if term.expiration <= current_date: + logging.warn('WARNING: Term %s in policy %s>%s is expired.', + term.name, self.from_zone, self.to_zone) + + for i in term.source_address_exclude: + term.source_address = nacaddr.RemoveAddressFromList( + term.source_address, i) + for i in term.destination_address_exclude: + term.destination_address = nacaddr.RemoveAddressFromList( + term.destination_address, i) + + for addr in term.source_address: + self._BuildAddressBook(self.from_zone, addr) + for addr in term.destination_address: + self._BuildAddressBook(self.to_zone, addr) + + new_term = Term(term, filter_type, filter_options) + new_terms.append(new_term) + tmp_icmptype = new_term.NormalizeIcmpTypes( + term.icmp_type, term.protocol, filter_type) + # NormalizeIcmpTypes returns [''] for empty, convert to [] for eval + normalized_icmptype = tmp_icmptype if tmp_icmptype != [''] else [] + # rewrites the protocol icmpv6 to icmp6 + if 'icmpv6' in term.protocol: + protocol = list(term.protocol) + protocol[protocol.index('icmpv6')] = 'icmp6' + else: + protocol = term.protocol + self.applications.append({'sport': self._BuildPort(term.source_port), + 'dport': self._BuildPort( + term.destination_port), + 'name': term.name, + 'protocol': protocol, + 'icmp-type': normalized_icmptype, + 'timeout': term.timeout}) + self.srx_policies.append((header, new_terms, filter_options)) + + def _BuildAddressBook(self, zone, address): + """Create the address book configuration entries. + + Args: + zone: the zone these objects will reside in + address: a naming library address object + """ + if zone not in self.addressbook: + self.addressbook[zone] = collections.OrderedDict() + if address.parent_token not in self.addressbook[zone]: + self.addressbook[zone][address.parent_token] = [] + name = address.parent_token + for ip in self.addressbook[zone][name]: + if str(address) == str(ip[0]): + return + counter = len(self.addressbook[zone][address.parent_token]) + name = '%s_%s' % (name, str(counter)) + self.addressbook[zone][address.parent_token].append((address, name)) + + def _BuildPort(self, ports): + """Transform specified ports into list and ranges. + + Args: + ports: a policy terms list of ports + + Returns: + port_list: list of ports and port ranges + """ + port_list = [] + for i in ports: + if i[0] == i[1]: + port_list.append(str(i[0])) + else: + port_list.append('%s-%s' % (str(i[0]), str(i[1]))) + return port_list + + def __str__(self): + """Render the output of the JuniperSRX policy into config.""" + target = [] + target.append('security {') + target.append(self.INDENT + 'zones {') + for zone in self.addressbook: + target.append(self.INDENT * 2 + 'security-zone ' + zone + ' {') + target.append(self.INDENT * 3 + 'replace: address-book {') + for group in self.addressbook[zone]: + for address, name in self.addressbook[zone][group]: + target.append(self.INDENT * 4 + 'address ' + name + ' ' + + str(address) + ';') + for group in self.addressbook[zone]: + target.append(self.INDENT * 4 + 'address-set ' + group + ' {') + for address, name in self.addressbook[zone][group]: + target.append(self.INDENT * 5 + 'address ' + name + ';') + + target.append(self.INDENT * 4 + '}') + target.append(self.INDENT * 3 + '}') + target.append(self.INDENT * 2 + '}') + target.append(self.INDENT + '}') + + target.append(self.INDENT + 'replace: policies {') + + target.append(self.INDENT * 2 + '/*') + target.extend(aclgenerator.AddRepositoryTags(self.INDENT * 2)) + target.append(self.INDENT * 2 + '*/') + + for (_, terms, filter_options) in self.srx_policies: + target.append(self.INDENT * 2 + 'from-zone ' + filter_options[1] + + ' to-zone ' + filter_options[3] + ' {') + for term in terms: + target.append(str(term)) + target.append(self.INDENT * 2 +'}') + target.append(self.INDENT + '}') + target.append('}') + + # APPLICATIONS + target.append('replace: applications {') + done_apps = [] + for app in self.applications: + app_list = [] + if app in done_apps: + continue + if app['protocol'] or app['sport'] or app['dport'] or app['icmp-type']: + if app['icmp-type']: + target.append(self.INDENT + 'application ' + app['name'] + '-app {') + if app['timeout']: + timeout = app['timeout'] + else: + timeout = 60 + for i, code in enumerate(app['icmp-type']): + target.append( + self.INDENT * 2 + + 'term t%d protocol icmp icmp-type %s inactivity-timeout %d;' % + (i+1, str(code), int(timeout))) + else: + i = 1 + target.append(self.INDENT + + 'application-set ' + app['name'] + '-app {') + + for proto in (app['protocol'] or ['']): + for sport in (app['sport'] or ['']): + for dport in (app['dport'] or ['']): + chunks = [] + if proto: chunks.append(' protocol %s' % proto) + if sport: chunks.append(' source-port %s' % sport) + if dport: chunks.append(' destination-port %s' % dport) + if app['timeout']: + chunks.append(' inactivity-timeout %d' % int(app['timeout'])) + if chunks: + target.append(self.INDENT * 2 + + 'application ' + app['name'] + '-app%d;' % i) + app_list.append(self.INDENT + 'application ' + app['name'] + + '-app%d {' % i) + app_list.append(self.INDENT * 2 + 'term t%d' % i + + ''.join(chunks) + ';') + app_list.append(self.INDENT + '}') + i += 1 + target.append(self.INDENT + '}') + done_apps.append(app) + if app_list: + target.extend(app_list) + + target.append('}') + return '\n'.join(target) |