# Copyright 2013 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ This module is responsible for event handling of the events exposed by UCSM server. """ from __future__ import print_function from six.moves import queue from threading import Condition, Lock, Thread import datetime import logging import time from . import ucsmo from . import ucscoreutils from . import ucsxmlcodec as xc from .ucsexception import UcsWarning from .ucsexception import UcsValidationException log = logging.getLogger('ucs') class MoChangeEvent(object): """ This class provides structure to save an event generated for any change, its associated managed object and property change list. This functionality is used during add_event_handler. """ def __init__(self, event_id=None, mo=None, change_list=None): self.event_id = event_id self.mo = mo self.change_list = change_list class WatchBlock(object): """ This class handles the functionality about the Event Handling/Watch block. enqueue/dequeue fuctionality of events is handled by this class. This functionality is used during add_event_handler. """ def __init__(self, params, fmce, capacity, callback): self.fmce = fmce self.callback = callback self.capacity = capacity self.params = params self.overflow = False self.error_code = 0 self.event_q = queue.Queue() # infinite size Queue def dequeue(self, miliseconds_timeout): """Internal method to dequeue the events.""" while True: if self.error_code != 0: log.debug("queue error:" + str(self.error_code)) return None if not self.event_q.empty(): mo_chg_event = self.event_q.get_nowait() return mo_chg_event else: return None def enqueue(self, cmce): """Internal method to enqueue the events.""" if self.event_q.maxsize < self.capacity: self.event_q.put(cmce) else: self.overflow = True def queue_size(self): return self.event_q.qsize() def dequeue_default_callback(self, mce): """Default callback method.""" tab_size = 8 print("\n") print('EventId'.ljust(tab_size * 2) + ':' + str(mce.event_id)) print('ChangeList'.ljust(tab_size * 2) + ':' + str(mce.change_list)) print('ClassId'.ljust(tab_size * 2) + ':' + str(mce.mo.get_class_id())) print('MoDn'.ljust(tab_size * 2) + ':' + str(mce.mo.dn)) class UcsEventHandle(object): """This class provides api to add and remove event handler.""" def __init__(self, handle): self._handle = handle self._lock_object = None self._wbs = [] self._wbs_lock = Lock() self._enqueue_thread = None self._condition = Condition() self._event_chan_resp = None self._dequeue_thread = None self._lowest_timeout = None self._wb_to_remove = [] def _process_method_vessel(self, root, elems): for in_stimuli in root: for cmce in in_stimuli: for in_config in cmce: for mo_elem in in_config: elems.append( (mo_elem, cmce.attrib.get('inEid'))) return elems def _process_mo_change_event(self, root, elems): for in_config in root: for mo_elem in in_config: elems.append(mo_elem) return elems def _get_mo_elem(self, xml_str): """ Internal method to extract mo elements from xml string """ root = xc.extract_root_elem(xml_str) mo_elems = [] if root.tag == "methodVessel": mo_elems = self._process_method_vessel(root, mo_elems) elif root.tag == "configMoChangeEvent": mo_elems = self._process_mo_change_event(root, mo_elems) return mo_elems def _can_enqueue(self): return self._event_chan_resp and len( self._wbs) and ( self._handle.cookie is not None) def _notify_to_dequeue(self): with self._condition: self._condition.notify() def _process_event_channel_resp(self, resp): enqueued = False for mo_elem in self._get_mo_elem(resp): gmo = ucsmo.generic_mo_from_xml_elem(mo_elem[0]) mce = MoChangeEvent(event_id=mo_elem[1], mo=gmo.to_mo(), change_list=gmo.properties.keys()) for watch_block in self._wbs: if watch_block.fmce(mce): watch_block.enqueue(mce) enqueued = True if enqueued: self._notify_to_dequeue() def _enqueue_function(self): """ Internal method used by add_event_handler. Provides functionality of enqueue/dequeue of the events and triggering callbacks. """ try: xml_query = '' % self._handle.cookie self._event_chan_resp = self._handle.post_xml( xml_str=xml_query.encode(), read=False) except Exception: raise try: while self._can_enqueue(): resp = self._event_chan_resp.readline() resp = self._event_chan_resp.read(int(resp)) self._process_event_channel_resp(resp) if len(self._wbs) == 0: self._condition.acquire() self._condition.notify() self._condition.release() except: raise def _thread_enqueue_start(self): """ Internal method to start the enqueue thread which adds the events in an internal queue. """ self._enqueue_thread = Thread(name="enqueue_thread", target=self._enqueue_function) self._enqueue_thread.daemon = True self._enqueue_thread.start() def _time_left(self, watch_block): timeout_sec = watch_block.params["timeout_sec"] start_time = watch_block.params["start_time"] time_diff = datetime.datetime.now() - start_time if time_diff.seconds < timeout_sec: return timeout_sec - time_diff.seconds else: return 0 # return 2147483647 def _dequeue_mce(self, time_left, watch_block): if time_left and time_left > 0: if self._lowest_timeout is None or \ self._lowest_timeout > time_left: self._lowest_timeout = time_left mce = watch_block.dequeue(time_left) else: mce = watch_block.dequeue(2147483647) return mce def _get_ucs_prop_name(self, mo, python_prop): if isinstance(mo, ucsmo.GenericMo): return python_prop elif python_prop in mo.prop_meta: # return the Ucs equivalent of this property # so `usr_lbl` will be returned as `usrLbl` return mo.prop_meta[python_prop].xml_attribute else: # we do not know about this property # return the same back - no other option return python_prop def _get_prop_value(self, mo, python_prop): if isinstance(mo, ucsmo.GenericMo): return mo.properties[python_prop] elif python_prop in mo.prop_meta: return getattr(mo, python_prop) elif python_prop in vars(mo): # we do not know about this property # but the property seems to be available # in the mo. so return mo[prop] return getattr(mo, python_prop) else: # Not sure what we can do!! return None def _should_skip_prop_match(self, prop, change_list): # if prop is not a part of change_list # skip processing the property return change_list and prop not in change_list def _is_property_in_success_values(self, value, success_values): return len(success_values) > 0 and (value in success_values) def _prop_val_match(self, mo, prop, success_values, change_list=None): ucs_prop = self._get_ucs_prop_name(mo, prop) prop_val = self._get_prop_value(mo, prop) if self._should_skip_prop_match(ucs_prop, change_list): return False return self._is_property_in_success_values(prop_val, success_values) def _invoke_callback_and_set_done(self, wb, mce): if wb.callback: ctxt = wb.params['context'] if ctxt: ctxt["done"] = True wb.callback(mce) def _dequeue_mo_prop_poll(self, mo, prop, poll_sec, watch_block, timeout_sec=None, time_left=None): success_value = watch_block.params["success_value"] if not success_value or len(success_value) < 1: raise ValueError("success_value is missing.") pmo = self._handle.query_dn(mo.dn) if pmo is None: UcsWarning('Mo ' + pmo.dn + ' not found.') return if timeout_sec is not None and time_left is not None and time_left > 0: if time_left < poll_sec: poll_sec = timeout_sec - time_left if self._lowest_timeout is None or self._lowest_timeout > poll_sec: self._lowest_timeout = poll_sec if self._prop_val_match(pmo, prop, success_value): mce = MoChangeEvent(mo=pmo) self._invoke_callback_and_set_done(watch_block, mce) self._wb_to_remove.append(watch_block) def _dequeue_mo_prop_event(self, prop, watch_block, time_left=None): success_value = watch_block.params["success_value"] if not success_value or len(success_value) < 1: raise ValueError("success_value is missing.") # dequeue mce mce = self._dequeue_mce(time_left, watch_block) if mce is None: return # checks if prop value exist in success value(s) attributes = mce.change_list if self._prop_val_match(mce.mo, prop, success_value, attributes): self._invoke_callback_and_set_done(watch_block, mce) self._wb_to_remove.append(watch_block) def _dequeue_mo_until_removed(self, watch_block, time_left=None): # dequeue mce mce = self._dequeue_mce(time_left, watch_block) if mce is None: return if watch_block.callback is not None: watch_block.callback(mce) # watch mo until gets deleted if mce.mo.status == "deleted": self._wb_to_remove.append(watch_block) def _dequeue_all_class_id(self, watch_block, time_left=None): # dequeue mce mce = self._dequeue_mce(time_left, watch_block) if mce is not None and watch_block.callback is not None: watch_block.callback(mce) def _dequeue_function(self): """ Internal method to dequeue to events. """ while len(self._wbs): self._lowest_timeout = None self._wb_to_remove = [] try: for watch_block in self._wbs: mo = watch_block.params["managed_object"] prop = watch_block.params["prop"] poll_sec = watch_block.params["poll_sec"] timeout_sec = watch_block.params["timeout_sec"] # checks if watch_block is not timed out, else remove time_left = None if timeout_sec is not None: time_left = self._time_left(watch_block) if time_left <= 0: self._wb_to_remove.append(watch_block) continue if self._lowest_timeout is None: self._lowest_timeout = time_left elif self._lowest_timeout < time_left: self._lowest_timeout = time_left # Dequeue any change events for a specified watch_block # poll for mo. Not to monitor event. if poll_sec is not None and mo is not None: self._dequeue_mo_prop_poll(mo, prop, poll_sec, watch_block, timeout_sec, time_left) elif mo is not None: while watch_block.queue_size() > 0: # watch mo until prop_val changed to desired value if prop is not None: self._dequeue_mo_prop_event(prop, watch_block, time_left) # watch mo until it is removed else: self._dequeue_mo_until_removed(watch_block, time_left) elif mo is None: while watch_block.queue_size() > 0: # watch all event or specific to class_id self._dequeue_all_class_id(watch_block, time_left) except Exception as e: log.info(str(e)) self._wb_to_remove.append(watch_block) # remove any watch blocks in to_remove list self._process_wb_remove_list() # wait for more events only if watch_block exists if len(self._wbs): with self._condition: self._condition.wait(self._lowest_timeout) return def _process_wb_remove_list(self): if len(self._wb_to_remove) == 0: return self._wbs_lock.acquire() for wb in self._wb_to_remove: if "context" in wb.params: ctxt = wb.params['context'] if ctxt: ctxt["done"] = True self.watch_block_remove(wb) self._wb_to_remove = [] self._wbs_lock.release() def _thread_dequeue_start(self): """ Internal method to start dequeue thread. """ self._dequeue_thread = Thread(name="dequeue_thread", target=self._dequeue_function) self._dequeue_thread.daemon = True self._dequeue_thread.start() def watch_block_add(self, params, filter_callback, capacity=500, callback=None): """ Internal method to add a watch block for starting event monitoring. """ if self._handle.cookie is None: return None self._wbs_lock.acquire() watch_block = WatchBlock(params, filter_callback, capacity, callback) # Add a List of Watchers if watch_block is not None and watch_block.callback is None: watch_block.callback = watch_block.dequeue_default_callback self._wbs.append(watch_block) self._wbs_lock.release() return watch_block def watch_block_remove(self, watch_block): """ Internal method to remove a watch block for stopping event monitoring. """ if watch_block in self._wbs: self._wbs.remove(watch_block) def _add_class_id_watch(self, class_id): if ucscoreutils.find_class_id_in_mo_meta_ignore_case(class_id) is None: raise UcsValidationException( "Invalid ClassId %s specified." % class_id) def watch__type_filter(mce): """ Callback method to work on events with a specific class_id. """ if mce.mo.get_class_id().lower() == class_id.lower(): return True return False return watch__type_filter def _add_mo_watch(self, managed_object, prop=None, success_value=[], poll_sec=None): if ucscoreutils.find_class_id_in_mo_meta_ignore_case( managed_object.get_class_id()) is None: raise UcsValidationException( "Unknown ClassId %s provided." % managed_object.get_class_id()) if prop is not None: mo_property_meta = ucscoreutils.get_mo_property_meta( managed_object.get_class_id(), prop) if mo_property_meta is None: raise UcsValidationException( "Unknown Property %s provided." % prop) if not success_value: raise UcsValidationException( "success_value parameter is not provided.") if poll_sec is None: def watch_mo_filter(mce): """ Callback method to work on events specific to respective managed object. """ if mce.mo.dn == managed_object.dn: return True return False return watch_mo_filter else: def watch_none_filter(mce): """ Callback method to ignore all events. """ return False return watch_none_filter def add(self, class_id=None, managed_object=None, prop=None, success_value=[], poll_sec=None, timeout_sec=None, call_back=None, context=None): """ Adds an event handler. An event handler can be added using this method where an user can subscribe for the event channel from UCS and can monitor those events for any specific success value for a managed object. Args: class_id (str): managed object class id managed_object (ManagedObject) prop (str) - property of the managed object to monitor success_value (list) - success values of a prop poll_sec - specifies the time in seconds for polling event. timeout_sec - time after which method should stop monitoring. call_back - call back method """ if class_id is not None and managed_object is not None: raise UcsValidationException( "Specify either class_id or managedObject, not both") if class_id is not None: filter_callback = self._add_class_id_watch(class_id) elif managed_object is not None: filter_callback = self._add_mo_watch(managed_object, prop, success_value, poll_sec) else: def watch_all_filter(mce): """ Callback method to work on all events. """ return True filter_callback = watch_all_filter param_dict = {'class_id': class_id, 'managed_object': managed_object, 'prop': prop, 'success_value': success_value, 'poll_sec': poll_sec, 'timeout_sec': timeout_sec, 'call_back': call_back, 'start_time': datetime.datetime.now(), 'context': context} if filter_callback is None: raise UcsValidationException("Error adding WatchBlock...") watch_block = self.watch_block_add(params=param_dict, filter_callback=filter_callback, callback=call_back) if watch_block is not None and len(self._wbs) == 1: if poll_sec is None: self._thread_enqueue_start() self._thread_dequeue_start() return watch_block def remove(self, watch_block): """ Removes an event handler. """ self._wbs_lock.acquire() if watch_block in self._wbs: self.watch_block_remove(watch_block) else: UcsWarning("Event handler not found") self._wbs_lock.release() def clean(self): """ Removes all the watch blocks from the event handler """ self._wbs_lock.acquire() for each in self._wbs: self.watch_block_remove(each) self._wbs_lock.release() def get(self): """ Returns the list of event handlers. """ return self._wbs def wait(handle, mo, prop, value, cb, timeout_sec=None, poll_sec=None): """ Waits for `mo.prop == value` Args: handle(UcsHandle): connection handle to the server mo (Managed Object): managed object to watch prop (str): property to watch value (str): property value to wait for cb(function): callback on success timeout_sec (int): timeout poll_sec (int): polling interval in seconds Returns: None Example: This method is called from UcsHandle class, wait_for_event method """ if mo is None: return # create a new event handler ueh = UcsEventHandle(handle) context = {} context["done"] = False if isinstance(value, list): success_value = value else: success_value = [value] # create a watch block ueh.add( managed_object=mo, prop=prop, success_value=success_value, call_back=cb, timeout_sec=timeout_sec, poll_sec=poll_sec, context=context) # wait for the event to occur while not context["done"]: time.sleep(1)