summaryrefslogtreecommitdiff
path: root/lib/junipersrx.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/junipersrx.py')
-rw-r--r--lib/junipersrx.py448
1 files changed, 448 insertions, 0 deletions
diff --git a/lib/junipersrx.py b/lib/junipersrx.py
new file mode 100644
index 0000000..c2e0676
--- /dev/null
+++ b/lib/junipersrx.py
@@ -0,0 +1,448 @@
+#!/usr/bin/python
+#
+# Copyright 2011 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""SRX generator."""
+# pylint: disable-msg=W0231
+
+__author__ = 'robankeny@google.com (Robert Ankeny)'
+
+import collections
+import datetime
+import logging
+
+import aclgenerator
+import nacaddr
+
+
+class Error(Exception):
+ """generic error class."""
+
+
+class UnsupportedFilterError(Error):
+ pass
+
+
+class UnsupportedHeader(Error):
+ pass
+
+
+class SRXDuplicateTermError(Error):
+ pass
+
+
+class SRXVerbatimError(Error):
+ pass
+
+
+class SRXOptionError(Error):
+ pass
+
+
+class Term(aclgenerator.Term):
+ """Representation of an individual SRX term.
+
+ This is mostly useful for the __str__() method.
+
+ Args:
+ obj: a policy.Term object
+ term_type: type of filter to generate, e.g. inet or inet6
+ filter_options: list of remaining target options (zones)
+ """
+
+ _ACTIONS = {'accept': 'permit',
+ 'deny': 'deny',
+ 'reject': 'reject',
+ 'count': 'count',
+ 'log': 'log'}
+
+ def __init__(self, term, term_type, zones):
+ self.term = term
+ self.term_type = term_type
+ self.from_zone = zones[1]
+ self.to_zone = zones[3]
+ self.extra_actions = []
+
+ def __str__(self):
+ """Render config output from this term object."""
+ # Verify platform specific terms. Skip whole term if platform does not
+ # match.
+ if self.term.platform:
+ if 'srx' not in self.term.platform:
+ return ''
+ if self.term.platform_exclude:
+ if 'srx' in self.term.platform_exclude:
+ return ''
+ ret_str = []
+
+ # COMMENTS
+ comment_max_width = 68
+ if self.term.owner:
+ self.term.comment.append('Owner: %s' % self.term.owner)
+ comments = aclgenerator.WrapWords(self.term.comment, comment_max_width)
+ if comments and comments[0]:
+ ret_str.append(JuniperSRX.INDENT * 3 + '/*')
+ for line in comments:
+ ret_str.append(JuniperSRX.INDENT * 3 + line)
+ ret_str.append(JuniperSRX.INDENT * 3 + '*/')
+
+ ret_str.append(JuniperSRX.INDENT * 3 + 'policy ' + self.term.name + ' {')
+ ret_str.append(JuniperSRX.INDENT * 4 + 'match {')
+
+ # SOURCE-ADDRESS
+ if self.term.source_address:
+ saddr_check = set()
+ for saddr in self.term.source_address:
+ saddr_check.add(saddr.parent_token)
+ saddr_check = sorted(saddr_check)
+ source_address_string = ''
+ for addr in saddr_check:
+ source_address_string += addr + ' '
+ ret_str.append(JuniperSRX.INDENT * 5 + 'source-address [ ' +
+ source_address_string + '];')
+ else:
+ ret_str.append(JuniperSRX.INDENT * 5 + 'source-address any;')
+
+ # DESTINATION-ADDRESS
+ if self.term.destination_address:
+ daddr_check = []
+ for daddr in self.term.destination_address:
+ daddr_check.append(daddr.parent_token)
+ daddr_check = set(daddr_check)
+ daddr_check = list(daddr_check)
+ daddr_check.sort()
+ destination_address_string = ''
+ for addr in daddr_check:
+ destination_address_string += addr + ' '
+ ret_str.append(JuniperSRX.INDENT * 5 + 'destination-address [ ' +
+ destination_address_string + '];')
+ else:
+ ret_str.append(JuniperSRX.INDENT * 5 + 'destination-address any;')
+
+ # APPLICATION
+ if (not self.term.source_port and not self.term.destination_port and not
+ self.term.icmp_type and not self.term.protocol):
+ ret_str.append(JuniperSRX.INDENT * 5 + 'application any;')
+ else:
+ ret_str.append(JuniperSRX.INDENT * 5 + 'application ' + self.term.name +
+ '-app;')
+
+ ret_str.append(JuniperSRX.INDENT * 4 + '}')
+
+ # ACTIONS
+ for action in self.term.action:
+ ret_str.append(JuniperSRX.INDENT * 4 + 'then {')
+ ret_str.append(JuniperSRX.INDENT * 5 + self._ACTIONS.get(
+ str(action)) + ';')
+
+ # LOGGING
+ if self.term.logging:
+ ret_str.append(JuniperSRX.INDENT * 5 + 'log {')
+ ret_str.append(JuniperSRX.INDENT * 6 + 'session-init;')
+ ret_str.append(JuniperSRX.INDENT * 5 + '}')
+ ret_str.append(JuniperSRX.INDENT * 4 + '}')
+
+ ret_str.append(JuniperSRX.INDENT * 3 + '}')
+
+ # OPTIONS
+ if self.term.option:
+ raise SRXOptionError('Options are not implemented yet, please remove ' +
+ 'from term %s' % self.term.name)
+
+ # VERBATIM
+ if self.term.verbatim:
+ raise SRXVerbatimError('Verbatim is not implemented, please remove ' +
+ 'the offending term %s.' % self.term.name)
+ return '\n'.join(ret_str)
+
+ def _Group(self, group):
+ """If 1 item return it, else return [ item1 item2 ].
+
+ Args:
+ group: a list. could be a list of strings (protocols) or a list of
+ tuples (ports)
+
+ Returns:
+ rval: a string surrounded by '[' and '];' if len(group) > 1
+ or with just ';' appended if len(group) == 1
+ """
+
+ def _FormattedGroup(el):
+ """Return the actual formatting of an individual element.
+
+ Args:
+ el: either a string (protocol) or a tuple (ports)
+
+ Returns:
+ string: either the lower()'ed string or the ports, hyphenated
+ if they're a range, or by itself if it's not.
+ """
+ if isinstance(el, str):
+ return el.lower()
+ elif isinstance(el, int):
+ return str(el)
+ # type is a tuple below here
+ elif el[0] == el[1]:
+ return '%d' % el[0]
+ else:
+ return '%d-%d' % (el[0], el[1])
+
+ if len(group) > 1:
+ rval = '[ ' + ' '.join([_FormattedGroup(x) for x in group]) + ' ];'
+ else:
+ rval = _FormattedGroup(group[0]) + ';'
+ return rval
+
+
+class JuniperSRX(aclgenerator.ACLGenerator):
+ """SRX rendering class.
+
+ This class takes a policy object and renders the output into a syntax
+ which is understood by SRX firewalls.
+
+ Args:
+ pol: policy.Policy object
+ """
+
+ _PLATFORM = 'srx'
+ _SUFFIX = '.srx'
+ _SUPPORTED_AF = set(('inet',))
+ _OPTIONAL_SUPPORTED_KEYWORDS = set(['expiration',
+ 'logging',
+ 'owner',
+ 'routing_instance', # safe to skip
+ 'timeout'
+ ])
+ INDENT = ' '
+
+ def _TranslatePolicy(self, pol, exp_info):
+ """Transform a policy object into a JuniperSRX object.
+
+ Args:
+ pol: policy.Policy object
+ exp_info: print a info message when a term is set to expire
+ in that many weeks
+
+ Raises:
+ UnsupportedFilterError: An unsupported filter was specified
+ UnsupportedHeader: A header option exists that is not understood/usable
+ SRXDuplicateTermError: Two terms were found with same name in same filter
+ """
+ self.srx_policies = []
+ self.addressbook = collections.OrderedDict()
+ self.applications = []
+ self.ports = []
+ self.from_zone = ''
+ self.to_zone = ''
+
+ current_date = datetime.date.today()
+ exp_info_date = current_date + datetime.timedelta(weeks=exp_info)
+
+ for header, terms in pol.filters:
+ if self._PLATFORM not in header.platforms:
+ continue
+
+ filter_options = header.FilterOptions(self._PLATFORM)
+
+ if (len(filter_options) < 4 or filter_options[0] != 'from-zone' or
+ filter_options[2] != 'to-zone'):
+ raise UnsupportedFilterError(
+ 'SRX filter arguments must specify from-zone and to-zone.')
+ self.from_zone = filter_options[1]
+ self.to_zone = filter_options[3]
+
+ if len(filter_options) > 4:
+ filter_type = filter_options[4]
+ else:
+ filter_type = 'inet'
+ if filter_type not in self._SUPPORTED_AF:
+ raise UnsupportedHeader(
+ 'SRX Generator currently does not support %s as a header option' %
+ (filter_type))
+
+ term_dup_check = set()
+ new_terms = []
+ for term in terms:
+ term.name = self.FixTermLength(term.name)
+ if term.name in term_dup_check:
+ raise SRXDuplicateTermError('You have a duplicate term: %s'
+ % term.name)
+ term_dup_check.add(term.name)
+
+ if term.expiration:
+ if term.expiration <= exp_info_date:
+ logging.info('INFO: Term %s in policy %s>%s expires '
+ 'in less than two weeks.', term.name, self.from_zone,
+ self.to_zone)
+ if term.expiration <= current_date:
+ logging.warn('WARNING: Term %s in policy %s>%s is expired.',
+ term.name, self.from_zone, self.to_zone)
+
+ for i in term.source_address_exclude:
+ term.source_address = nacaddr.RemoveAddressFromList(
+ term.source_address, i)
+ for i in term.destination_address_exclude:
+ term.destination_address = nacaddr.RemoveAddressFromList(
+ term.destination_address, i)
+
+ for addr in term.source_address:
+ self._BuildAddressBook(self.from_zone, addr)
+ for addr in term.destination_address:
+ self._BuildAddressBook(self.to_zone, addr)
+
+ new_term = Term(term, filter_type, filter_options)
+ new_terms.append(new_term)
+ tmp_icmptype = new_term.NormalizeIcmpTypes(
+ term.icmp_type, term.protocol, filter_type)
+ # NormalizeIcmpTypes returns [''] for empty, convert to [] for eval
+ normalized_icmptype = tmp_icmptype if tmp_icmptype != [''] else []
+ # rewrites the protocol icmpv6 to icmp6
+ if 'icmpv6' in term.protocol:
+ protocol = list(term.protocol)
+ protocol[protocol.index('icmpv6')] = 'icmp6'
+ else:
+ protocol = term.protocol
+ self.applications.append({'sport': self._BuildPort(term.source_port),
+ 'dport': self._BuildPort(
+ term.destination_port),
+ 'name': term.name,
+ 'protocol': protocol,
+ 'icmp-type': normalized_icmptype,
+ 'timeout': term.timeout})
+ self.srx_policies.append((header, new_terms, filter_options))
+
+ def _BuildAddressBook(self, zone, address):
+ """Create the address book configuration entries.
+
+ Args:
+ zone: the zone these objects will reside in
+ address: a naming library address object
+ """
+ if zone not in self.addressbook:
+ self.addressbook[zone] = collections.OrderedDict()
+ if address.parent_token not in self.addressbook[zone]:
+ self.addressbook[zone][address.parent_token] = []
+ name = address.parent_token
+ for ip in self.addressbook[zone][name]:
+ if str(address) == str(ip[0]):
+ return
+ counter = len(self.addressbook[zone][address.parent_token])
+ name = '%s_%s' % (name, str(counter))
+ self.addressbook[zone][address.parent_token].append((address, name))
+
+ def _BuildPort(self, ports):
+ """Transform specified ports into list and ranges.
+
+ Args:
+ ports: a policy terms list of ports
+
+ Returns:
+ port_list: list of ports and port ranges
+ """
+ port_list = []
+ for i in ports:
+ if i[0] == i[1]:
+ port_list.append(str(i[0]))
+ else:
+ port_list.append('%s-%s' % (str(i[0]), str(i[1])))
+ return port_list
+
+ def __str__(self):
+ """Render the output of the JuniperSRX policy into config."""
+ target = []
+ target.append('security {')
+ target.append(self.INDENT + 'zones {')
+ for zone in self.addressbook:
+ target.append(self.INDENT * 2 + 'security-zone ' + zone + ' {')
+ target.append(self.INDENT * 3 + 'replace: address-book {')
+ for group in self.addressbook[zone]:
+ for address, name in self.addressbook[zone][group]:
+ target.append(self.INDENT * 4 + 'address ' + name + ' ' +
+ str(address) + ';')
+ for group in self.addressbook[zone]:
+ target.append(self.INDENT * 4 + 'address-set ' + group + ' {')
+ for address, name in self.addressbook[zone][group]:
+ target.append(self.INDENT * 5 + 'address ' + name + ';')
+
+ target.append(self.INDENT * 4 + '}')
+ target.append(self.INDENT * 3 + '}')
+ target.append(self.INDENT * 2 + '}')
+ target.append(self.INDENT + '}')
+
+ target.append(self.INDENT + 'replace: policies {')
+
+ target.append(self.INDENT * 2 + '/*')
+ target.extend(aclgenerator.AddRepositoryTags(self.INDENT * 2))
+ target.append(self.INDENT * 2 + '*/')
+
+ for (_, terms, filter_options) in self.srx_policies:
+ target.append(self.INDENT * 2 + 'from-zone ' + filter_options[1] +
+ ' to-zone ' + filter_options[3] + ' {')
+ for term in terms:
+ target.append(str(term))
+ target.append(self.INDENT * 2 +'}')
+ target.append(self.INDENT + '}')
+ target.append('}')
+
+ # APPLICATIONS
+ target.append('replace: applications {')
+ done_apps = []
+ for app in self.applications:
+ app_list = []
+ if app in done_apps:
+ continue
+ if app['protocol'] or app['sport'] or app['dport'] or app['icmp-type']:
+ if app['icmp-type']:
+ target.append(self.INDENT + 'application ' + app['name'] + '-app {')
+ if app['timeout']:
+ timeout = app['timeout']
+ else:
+ timeout = 60
+ for i, code in enumerate(app['icmp-type']):
+ target.append(
+ self.INDENT * 2 +
+ 'term t%d protocol icmp icmp-type %s inactivity-timeout %d;' %
+ (i+1, str(code), int(timeout)))
+ else:
+ i = 1
+ target.append(self.INDENT +
+ 'application-set ' + app['name'] + '-app {')
+
+ for proto in (app['protocol'] or ['']):
+ for sport in (app['sport'] or ['']):
+ for dport in (app['dport'] or ['']):
+ chunks = []
+ if proto: chunks.append(' protocol %s' % proto)
+ if sport: chunks.append(' source-port %s' % sport)
+ if dport: chunks.append(' destination-port %s' % dport)
+ if app['timeout']:
+ chunks.append(' inactivity-timeout %d' % int(app['timeout']))
+ if chunks:
+ target.append(self.INDENT * 2 +
+ 'application ' + app['name'] + '-app%d;' % i)
+ app_list.append(self.INDENT + 'application ' + app['name'] +
+ '-app%d {' % i)
+ app_list.append(self.INDENT * 2 + 'term t%d' % i +
+ ''.join(chunks) + ';')
+ app_list.append(self.INDENT + '}')
+ i += 1
+ target.append(self.INDENT + '}')
+ done_apps.append(app)
+ if app_list:
+ target.extend(app_list)
+
+ target.append('}')
+ return '\n'.join(target)