Source code for ucsmsdk.ucsfilter

# Copyright 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import re

import pyparsing as pp

from . import ucsgenutils
from . import ucscoreutils
from .ucsfiltertype import OrFilter, AndFilter, NotFilter
from .ucsbasetype import FilterFilter

types = {"eq": "EqFilter",
         "ne": "NeFilter",
         "ge": "GeFilter",
         "gt": "GtFilter",
         "le": "LeFilter",
         "lt": "LtFilter",
         "re": "WcardFilter"
         }


[docs]class ParseFilter(object): """ Supporting class to parse filter expression. """ def __init__(self, class_id, is_meta_classid): self.class_id = class_id self.is_meta_classid = is_meta_classid
[docs] def parse_filter_obj(self, toks): """ Supporting class to parse filter expression. """ # print toks[0] #logger prop_ = toks[0]["prop"] value_ = toks[0]["value"] type_ = "re" if "type_exp" in toks[0]: type_ = toks[0]["type_exp"]["types"] flag_ = "C" if "flag_exp" in toks[0]: flag_ = toks[0]["flag_exp"]["flags"] # print prop_, value_, type_, flag_ #logger if flag_ == "I": value_ = re.sub( r"[a-zA-Z]", lambda x: "[" + x.group().upper() + x.group().lower() + "]", value_) if self.is_meta_classid: class_obj = ucscoreutils.load_class(self.class_id) prop_mo_meta = class_obj.prop_meta[prop_] if prop_mo_meta: prop_ = prop_mo_meta.xml_attribute sub_filter = create_basic_filter(types[type_], class_=ucsgenutils.word_l( self.class_id), property=prop_, value=value_) return sub_filter
@staticmethod
[docs] def and_operator(toks): """ method to support logical 'and' operator expression """ # print str, loc, toks # print toks[0][0::2] and_filter = AndFilter() for op_filter in toks[0][0::2]: and_filter.child_add(op_filter) return and_filter
@staticmethod
[docs] def or_operator(toks): """ method to support logical 'or' operator expression """ # print str, loc, toks # print toks[0][0::2] or_filter = OrFilter() for op_filter in toks[0][0::2]: or_filter.child_add(op_filter) return or_filter
@staticmethod
[docs] def not_operator(toks): """ method to support logical 'and' operator expression """ not_filter = NotFilter() for op_filter in toks[0][1:]: not_filter.child_add(op_filter) return not_filter
[docs] def parse_filter_str(self, filter_str): """ method to parse filter string """ prop = pp.WordStart(pp.alphas) + pp.Word(pp.alphanums + "_").setResultsName("prop") value = (pp.QuotedString("'") | pp.QuotedString('"') | pp.Word( pp.printables, excludeChars=",")).setResultsName("value") types_ = pp.oneOf("re eq ne gt ge lt le").setResultsName("types") flags = pp.oneOf("C I").setResultsName("flags") comma = pp.Literal(',') quote = (pp.Literal("'") | pp.Literal('"')).setResultsName("quote") type_exp = pp.Group(pp.Literal("type") + pp.Literal( "=") + quote + types_ + quote).setResultsName("type_exp") flag_exp = pp.Group(pp.Literal("flag") + pp.Literal( "=") + quote + flags + quote).setResultsName("flag_exp") semi_expression = pp.Forward() semi_expression << pp.Group(pp.Literal("(") + prop + comma + value + pp.Optional(comma + type_exp) + pp.Optional(comma + flag_exp) + pp.Literal(")") ).setParseAction( self.parse_filter_obj).setResultsName("semi_expression") expr = pp.Forward() expr << pp.operatorPrecedence(semi_expression, [ ("not", 1, pp.opAssoc.RIGHT, self.not_operator), ("and", 2, pp.opAssoc.LEFT, self.and_operator), ("or", 2, pp.opAssoc.LEFT, self.or_operator) ]) result = expr.parseString(filter_str) return result
[docs]def generate_infilter(class_id, filter_str, is_meta_class_id): """ Create FilterFilter object Args: class_id (str): class_id filter_str (str): filter expression is_meta_class_id (bool) Returns: True on successful connect Example: generate_infilter("LsServer", '("usr_lbl, "mysp", type="eq", flag="I)', True) """ parse_filter = ParseFilter(class_id=class_id, is_meta_classid=is_meta_class_id) result = parse_filter.parse_filter_str(filter_str) in_filter = FilterFilter() in_filter.child_add(result[0]) return in_filter
[docs]def handle_filter_max_component_limit(handle, l_filter): """ Method checks the filter count and if the filter count exceeds the max_components(number of filters), then the given filter objects get distributed among small groups and then again binded together in complex filters(like and , or) so that the count of filters can be reduced. """ from .ucscore import AbstractFilter from .ucsfiltertype import AndFilter, OrFilter max_components = 10 if l_filter is None or l_filter.child_count() <= max_components: return l_filter if not isinstance(l_filter, AndFilter) and not isinstance(l_filter, OrFilter): return l_filter if isinstance(l_filter, AndFilter): parent_filter = AndFilter() child_filter = AndFilter() parent_filter.child_add(child_filter) for childf in l_filter.child: if isinstance(childf, AbstractFilter): if child_filter.child_count() == max_components: child_filter = AndFilter() parent_filter.child_add(child_filter) child_filter.child_add(childf) result_filter = parent_filter else: parent_filter = OrFilter() child_filter = OrFilter() parent_filter.child_add(child_filter) for childf in l_filter.child: if isinstance(childf, AbstractFilter): if child_filter.child_count() == max_components: child_filter = OrFilter() parent_filter.child_add(child_filter) child_filter.child_add(childf) result_filter = parent_filter return handle_filter_max_component_limit(handle, result_filter)
[docs]def create_basic_filter(filter_name, **kwargs): """ Loads filter class """ from . import ucsmeta fq_module_name = ucsmeta.OTHER_TYPE_CLASS_ID[filter_name] module_import = __import__(fq_module_name, globals(), locals(), [filter_name], level=1) filter_obj = getattr(module_import, filter_name)() filter_obj.create(**kwargs) return filter_obj