Source code for ucsmsdk.ucseventhandler

# Copyright 2013 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module is responsible for event handling of the events exposed by
UCSM server.
"""

from __future__ import print_function

try:
    from Queue import Queue
except:
    from queue import Queue

from threading import Condition, Lock, Thread
import datetime
import logging
import time

from . import ucsmo
from . import ucscoreutils
from . import ucsxmlcodec as xc
from .ucsexception import UcsWarning
from .ucsexception import UcsValidationException

log = logging.getLogger('ucs')


[docs]class MoChangeEvent(object): """ This class provides structure to save an event generated for any change, its associated managed object and property change list. This functionality is used during add_event_handler. """ def __init__(self, event_id=None, mo=None, change_list=None): self.event_id = event_id self.mo = mo self.change_list = change_list
[docs]class WatchBlock(object): """ This class handles the functionality about the Event Handling/Watch block. enqueue/dequeue fuctionality of events is handled by this class. This functionality is used during add_event_handler. """ def __init__(self, params, fmce, capacity, callback): self.fmce = fmce self.callback = callback self.capacity = capacity self.params = params self.overflow = False self.error_code = 0 # TODO:error_code to call notify as per PowerTool self.event_q = Queue() # infinite size Queue
[docs] def dequeue(self, miliseconds_timeout): """Internal method to dequeue the events.""" while True: if self.error_code != 0: log.debug("queue error:" + str(self.error_code)) return None if not self.event_q.empty(): mo_chg_event = self.event_q.get() return mo_chg_event else: return None
[docs] def enqueue(self, cmce): """Internal method to enqueue the events.""" if self.event_q.maxsize < self.capacity: self.event_q.put(cmce) else: self.overflow = True
[docs] def dequeue_default_callback(self, mce): """Default callback method.""" tab_size = 8 print("\n") print('EventId'.ljust(tab_size * 2) + ':' + str(mce.event_id)) print('ChangeList'.ljust(tab_size * 2) + ':' + str(mce.change_list)) print('ClassId'.ljust(tab_size * 2) + ':' + str(mce.mo.get_class_id())) print('MoDn'.ljust(tab_size * 2) + ':' + str(mce.mo.dn))
[docs]class UcsEventHandle(object): """This class provides api to add and remove event handler.""" def __init__(self, handle): self.__handle = handle self.__lock_object = None self.__wbs = [] self.__wbs_lock = Lock() self.__enqueue_thread = None self.__condition = Condition() self.__event_chan_resp = None self.__dequeue_thread = None self.__lowest_timeout = None self.__wb_to_remove = [] def __get_mo_elem(self, xml_str): """ Internal method to extract mo elements from xml string """ root = xc.extract_root_elem(xml_str) mo_elems = [] if root.tag == "methodVessel": for in_stimuli in root: for cmce in in_stimuli: for in_config in cmce: for mo_elem in in_config: mo_elems.append( (mo_elem, cmce.attrib.get('inEid'))) elif root.tag == "configMoChangeEvent": for in_config in root: for mo_elem in in_config: mo_elems.append(mo_elem) return mo_elems def __enqueue_function(self): """ Internal method used by add_event_handler. Provides functionality of enqueue/dequeue of the events and triggering callbacks. """ try: xml_query = '<eventSubscribe cookie="%s"/>' % self.__handle.cookie self.__event_chan_resp = self.__handle.post_xml( xml_str=xml_query.encode(), read=False) except Exception: raise try: while self.__event_chan_resp and len(self.__wbs): if self.__handle.cookie is None or \ self.__event_chan_resp is None: break resp = self.__event_chan_resp.readline() resp = self.__event_chan_resp.read(int(resp)) for mo_elem in self.__get_mo_elem(resp): gmo = ucsmo.generic_mo_from_xml_elem(mo_elem[0]) mce = MoChangeEvent(event_id=mo_elem[1], mo=gmo.to_mo(), change_list=gmo.properties.keys()) for watch_block in self.__wbs: if watch_block.fmce(mce): watch_block.enqueue(mce) with self.__condition: self.__condition.notify() if len(self.__wbs) == 0: self.__condition.acquire() self.__condition.notify() self.__condition.release() except: raise def __thread_enqueue_start(self): """ Internal method to start the enqueue thread which adds the events in an internal queue. """ self.__enqueue_thread = Thread(name="enqueue_thread", target=self.__enqueue_function) self.__enqueue_thread.daemon = True self.__enqueue_thread.start() def __time_left(self, watch_block): timeout_sec = watch_block.params["timeout_sec"] start_time = watch_block.params["start_time"] time_diff = datetime.datetime.now() - start_time if time_diff.seconds < timeout_sec: return timeout_sec - time_diff.seconds else: return 0 # return 2147483647 def __dequeue_mce(self, time_left, watch_block): if time_left and time_left > 0: if self.__lowest_timeout is None or \ self.__lowest_timeout > time_left: self.__lowest_timeout = time_left mce = watch_block.dequeue(time_left) else: mce = watch_block.dequeue(2147483647) return mce def __prop_val_exist(self, mo, prop, success_value, failure_value, transient_value, change_list=None): if isinstance(mo, ucsmo.GenericMo): n_prop = prop n_prop_val = mo.properties[n_prop] elif prop not in mo.prop_meta: n_prop = prop n_prop_val = getattr(mo, n_prop) else: n_prop = mo.prop_meta[prop].xml_attribute n_prop_val = getattr(mo, n_prop) if change_list and n_prop not in change_list: return False if (len(success_value) > 0 and n_prop_val in success_value) or \ (len(failure_value) > 0 and n_prop_val in failure_value) or \ (len(transient_value) > 0 and n_prop_val in transient_value): return True return False def __dequeue_mo_prop_poll(self, mo, prop, poll_sec, watch_block, timeout_sec=None, time_left=None): success_value = watch_block.params["success_value"] failure_value = watch_block.params["failure_value"] transient_value = watch_block.params["transient_value"] if not success_value or len(success_value) < 1: raise ValueError("success_value is missing.") pmo = self.__handle.query_dn(mo.dn) if pmo is None: UcsWarning('Mo ' + pmo.dn + ' not found.') return if timeout_sec is not None and time_left is not None and time_left > 0: if time_left < poll_sec: poll_sec = timeout_sec - time_left if self.__lowest_timeout is None or self.__lowest_timeout > poll_sec: self.__lowest_timeout = poll_sec if self.__prop_val_exist(pmo, prop, success_value, failure_value, transient_value): log.info("Successful") self.__wb_to_remove.append(watch_block) def __dequeue_mo_prop_event(self, prop, watch_block, time_left=None): success_value = watch_block.params["success_value"] failure_value = watch_block.params["failure_value"] transient_value = watch_block.params["transient_value"] if not success_value or len(success_value) < 1: raise ValueError("success_value is missing.") # dequeue mce mce = self.__dequeue_mce(time_left, watch_block) if mce is None: return # checks if prop value exist in success or failure or transient values attributes = mce.change_list if self.__prop_val_exist(mce.mo, prop, success_value, failure_value, transient_value, attributes): if watch_block.callback: ctxt = watch_block.params['context'] ctxt["done"] = True watch_block.callback(mce) self.__wb_to_remove.append(watch_block) def __dequeue_mo_until_removed(self, watch_block, time_left=None): # dequeue mce mce = self.__dequeue_mce(time_left, watch_block) if mce is None: return if watch_block.callback is not None: watch_block.callback(mce) # watch mo until gets deleted if mce.mo.status == "deleted": self.__wb_to_remove.append(watch_block) def __dequeue_all_class_id(self, watch_block, time_left=None): # dequeue mce mce = self.__dequeue_mce(time_left, watch_block) if mce is not None and watch_block.callback is not None: watch_block.callback(mce) def __dequeue_function(self): """ Internal method to dequeue to events. """ while len(self.__wbs): self.__lowest_timeout = None self.__wb_to_remove = [] try: for watch_block in self.__wbs: mo = watch_block.params["managed_object"] prop = watch_block.params["prop"] poll_sec = watch_block.params["poll_sec"] timeout_sec = watch_block.params["timeout_sec"] # checks if watch_block is not timed out, else remove time_left = None if timeout_sec is not None: time_left = self.__time_left(watch_block) if time_left <= 0: self.__wb_to_remove.append(watch_block) continue # poll for mo. Not to monitor event. if poll_sec is not None and mo is not None: self.__dequeue_mo_prop_poll(mo, prop, poll_sec, watch_block, timeout_sec, time_left) elif mo is not None: # watch mo until prop_val changed to desired value if prop is not None: self.__dequeue_mo_prop_event(prop, watch_block, time_left) # watch mo until it is removed else: self.__dequeue_mo_until_removed(watch_block, time_left) elif mo is None: # watch all event or specific to class_id self.__dequeue_all_class_id(watch_block, time_left) except Exception as e: log.info(str(e)) self.__wb_to_remove.append(watch_block) # removing watch_block if len(self.__wb_to_remove): self.__wbs_lock.acquire() for wb in self.__wb_to_remove: if "context" in wb.params: ctxt = wb.params['context'] ctxt["done"] = True self.watch_block_remove(wb) self.__wb_to_remove = [] self.__wbs_lock.release() # wait for more events only if watch_block exists if len(self.__wbs): with self.__condition: self.__condition.wait(self.__lowest_timeout) return def __thread_dequeue_start(self): """ Internal method to start dequeue thread. """ self.__dequeue_thread = Thread(name="dequeue_thread", target=self.__dequeue_function) self.__dequeue_thread.daemon = True self.__dequeue_thread.start()
[docs] def watch_block_add(self, params, filter_callback, capacity=500, callback=None): """ Internal method to add a watch block for starting event monitoring. """ if self.__handle.cookie is None: return None self.__wbs_lock.acquire() watch_block = WatchBlock(params, filter_callback, capacity, callback) # Add a List of Watchers if watch_block is not None and watch_block.callback is None: watch_block.callback = watch_block.dequeue_default_callback self.__wbs.append(watch_block) self.__wbs_lock.release() return watch_block
[docs] def watch_block_remove(self, watch_block): """ Internal method to remove a watch block for stopping event monitoring. """ if watch_block in self.__wbs: self.__wbs.remove(watch_block)
def _add_class_id_watch(self, class_id): if ucscoreutils.find_class_id_in_mo_meta_ignore_case(class_id) is None: raise UcsValidationException( "Invalid ClassId %s specified." % class_id) def watch__type_filter(mce): """ Callback method to work on events with a specific class_id. """ if mce.mo.get_class_id().lower() == class_id.lower(): return True return False return watch__type_filter def _add_mo_watch(self, managed_object, prop=None, success_value=[], poll_sec=None): if ucscoreutils.find_class_id_in_mo_meta_ignore_case( managed_object.get_class_id()) is None: raise UcsValidationException( "Unknown ClassId %s provided." % managed_object.get_class_id()) if prop is not None: mo_property_meta = ucscoreutils.get_mo_property_meta( managed_object.get_class_id(), prop) if mo_property_meta is None: raise UcsValidationException( "Unknown Property %s provided." % prop) if not success_value: raise UcsValidationException( "success_value parameter is not provided.") if poll_sec is None: def watch_mo_filter(mce): """ Callback method to work on events specific to respective managed object. """ if mce.mo.dn == managed_object.dn: return True return False return watch_mo_filter else: def watch_none_filter(mce): """ Callback method to ignore all events. """ return False return watch_none_filter
[docs] def add(self, class_id=None, managed_object=None, prop=None, success_value=[], failure_value=[], transient_value=[], poll_sec=None, timeout_sec=None, call_back=None, context=None): """ Adds an event handler. An event handler can be added using this method where an user can subscribe for the event channel from UCS and can monitor those events for any specific success value or failure value for a managed object. Args: class_id (str): managed object class id managed_object (ManagedObject) prop (str) - property of the managed object to monitor success_value (list) - success values of a prop failure_value (list) - failure values of a prop transient_value (list) - transient values of a prop poll_sec - specifies the time in seconds for polling event. timeout_sec - time after which method should stop monitoring. call_back - call back method """ if class_id is not None and managed_object is not None: raise UcsValidationException( "Specify either class_id or managedObject, not both") if class_id is not None: filter_callback = self._add_class_id_watch(class_id) elif managed_object is not None: filter_callback = self._add_mo_watch(managed_object, prop, success_value, poll_sec) else: def watch_all_filter(mce): """ Callback method to work on all events. """ return True filter_callback = watch_all_filter param_dict = {'class_id': class_id, 'managed_object': managed_object, 'prop': prop, 'success_value': success_value, 'failure_value': failure_value, 'transient_value': transient_value, 'poll_sec': poll_sec, 'timeout_sec': timeout_sec, 'call_back': call_back, 'start_time': datetime.datetime.now(), 'context': context} if filter_callback is None: raise UcsValidationException("Error adding WatchBlock...") watch_block = self.watch_block_add(params=param_dict, filter_callback=filter_callback, callback=call_back) if watch_block is not None and len(self.__wbs) == 1: if poll_sec is None: self.__thread_enqueue_start() self.__thread_dequeue_start() return watch_block
[docs] def remove(self, watch_block): """ Removes an event handler. """ self.__wbs_lock.acquire() if watch_block in self.__wbs: self.watch_block_remove(watch_block) else: UcsWarning("Event handler not found") self.__wbs_lock.release()
[docs] def clean(self): """ Removes all the watch blocks from the event handler """ self.__wbs_lock.acquire() for each in self.__wbs: self.watch_block_remove(each) self.__wbs_lock.release()
[docs] def get(self): """ Returns the list of event handlers. """ return self.__wbs
[docs]def wait(handle, mo, prop, value, cb, timeout_sec=None): """ Waits for `mo.prop == value` Args: handle(UcsHandle): connection handle to the server mo (Managed Object): managed object to watch prop (str): property to watch value (str): property value to wait for cb(function): callback on success timeout_sec (int): timeout Returns: None Example: This method is called from UcsHandle class, wait_for_event method """ # create a new event handler ueh = UcsEventHandle(handle) context = {} context["done"] = False if isinstance(value, list): success_value = value else: success_value = [value] # create a watch block ueh.add(managed_object=mo, prop=prop, success_value=success_value, call_back=cb, timeout_sec=timeout_sec, context=context) # wait for the event to occur while not context["done"]: time.sleep(1)