# Copyright 2015 Cisco Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import json import logging import threading from threading import Timer from .ucsexception import UcsException, UcsLoginError from .ucsdriver import UcsDriver from .ucsgenutils import Progress log = logging.getLogger('ucs') tx_lock = threading.Lock() class UcsSession(object): """ UcsSession class is session interface for any Ucs related communication. Parent class of UcsHandle, used internally by UcsHandle class. """ def __init__(self, ip, username, password, port=None, secure=None, proxy=None, timeout=None): self.__ip = ip self.__username = username self.__password = password self.__proxy = proxy self.__uri = self.__create_uri(port, secure) self.__ucs = ip self.__name = None self.__cookie = None self.__session_id = None self.__version = None self.__refresh_period = None self.__priv = None self.__domains = None self.__channel = None self.__evt_channel = None self.__last_update_time = None self.__refresh_timer = None self.__force = False self.__auto_refresh = False self.__timeout = timeout self.__dump_xml = False self.__redirect = False self.__threaded = False self.__driver = UcsDriver(proxy=self.__proxy) @property def ip(self): return self.__ip @property def username(self): return self.__username @property def proxy(self): return self.__proxy @property def uri(self): return self.__uri @property def ucs(self): return self.__ucs @property def name(self): return self.__name @property def cookie(self): return self.__cookie @property def session_id(self): return self.__session_id @property def version(self): from .ucscoremeta import UcsVersion return UcsVersion(self.__version) @property def refresh_period(self): return self.__refresh_period @property def priv(self): return self.__priv @property def domains(self): return self.__domains @property def channel(self): return self.__channel @property def evt_channel(self): return self.__evt_channel @property def last_update_time(self): return self.__last_update_time @property def threaded(self): return self.__threaded @property def timeout(self): return self.__timeout def _freeze(self): save = { "ip": self.__ip, "username": self.__username, "password": self.__password, "proxy": self.__proxy, "uri": self.__uri, "ucs": self.__ucs, "name": self.__name, "cookie": self.__cookie, "session_id": self.__session_id, "version": self.__version, "refresh_period": self.__refresh_period, "priv": self.__priv, "domains": self.__domains, "channel": self.__channel, "evt_channel": self.__evt_channel, "last_update_time": self.__last_update_time, "force": self.__force, "auto_refresh": self.__auto_refresh, "dump_xml": self.__dump_xml, "redirect": self.__redirect, "threaded": self.__threaded, "timeout": self.__timeout } return json.dumps(save) def _unfreeze(self, params_json): params_dict = json.loads(params_json) for param in params_dict: setattr(self, '_UcsSession__' + param, params_dict[param]) # update the drvier if a proxy configuration exists self.__driver = UcsDriver(proxy=self.__proxy) # cookie might be stale, if so relogin if self.__auto_refresh and not self.__validate_connection(): self._refresh(auto_relogin=True) self.__auto_refresh = True def __create_uri(self, port, secure): """ Generates UCSM URI used for connection Args: port (int or None): The port number to be used during connection secure (bool or None): True for secure connection otherwise False Returns: uri (str) Example: uri = __create_uri(port=443, secure=True) """ port = _get_port(port, secure) protocol = _get_proto(port, secure) uri = "%s://%s%s%s" % (protocol, self.__ip, ":", str(port)) return uri def __clear(self): """ Internal method to clear the session variables """ self.__name = None self.__cookie = None self.__session_id = None self.__version = None self.__refresh_period = None self.__priv = None self.__domains = None self.__channel = None self.__evt_channel = None self.__last_update_time = str(time.asctime()) def __update(self, response): """ Internal method to update the session variables """ self.__name = response.out_name self.__cookie = response.out_cookie self.__session_id = response.out_session_id self.__version = response.out_version self.__refresh_period = int(response.out_refresh_period) self.__priv = response.out_priv self.__domains = response.out_domains self.__channel = response.out_channel self.__evt_channel = response.out_evt_channel self.__last_update_time = str(time.asctime()) def post(self, uri, data=None, read=True, timeout=None): """ sends the request and receives the response from ucsm server Args: uri (str): URI of the the UCS Server data (str): request data to send via post request timeout (int): if set, this will be used as timeout in secs for urllib2 Returns: response xml string Example: response = post("http://192.168.1.1:80", data=xml_str) """ timeout = self.__timeout if timeout is None else timeout response = self.__driver.post(uri=uri, data=data, read=read, timeout=timeout) return response def post_xml(self, xml_str, read=True, timeout=None): """ sends the xml request and receives the response from ucsm server Args: xml_str (str): xml string read (bool): if True, returns response.read() else returns object. timeout (int): if set, this will be used as timeout in secs for urllib2 Returns: response xml string Example: response = post_xml('') """ ucsm_uri = self.__uri + "/nuova" response_str = self.post(uri=ucsm_uri, data=xml_str, read=read, timeout=timeout) if self.__driver.redirect_uri: self.__uri = self.__driver.redirect_uri return response_str def dump_xml_request(self, elem): from . import ucsxmlcodec as xc if not self.__dump_xml: return if elem.tag == "aaaLogin": elem.attrib['inPassword'] = "*********" xml_str = xc.to_xml_str(elem) log.debug('%s ====> %s' % (self.__uri, xml_str)) elem.attrib['inPassword'] = self.__password xml_str = xc.to_xml_str(elem) else: xml_str = xc.to_xml_str(elem) log.debug('%s ====> %s' % (self.__uri, xml_str)) def dump_xml_response(self, resp): if self.__dump_xml: log.debug('%s <==== %s' % (self.__uri, resp)) def post_elem(self, elem, timeout=None): """ sends the request and receives the response from ucsm server using xml element Args: elem (xml element) timeout (int): if set, it is used as timeout in secs for urllib2 Returns: response xml string Example: response = post_elem(elem=xml_element) """ from . import ucsxmlcodec as xc self._tx_lock_acquire_conditional(elem) if self._is_stale_cookie(elem): elem.attrib['cookie'] = self.cookie self.dump_xml_request(elem) xml_str = xc.to_xml_str(elem) try: response_str = self.post_xml(xml_str, timeout=timeout) self.dump_xml_response(response_str) if response_str: response = xc.from_xml_str(response_str, self) # Cookie update should happen with-in the lock # this ensures that the next packet goes out # with the new cookie if elem.tag == "aaaRefresh": self._update_cookie(response) self._tx_lock_release_conditional(elem) return response except: self._tx_lock_release_conditional(elem) raise self._tx_lock_release_conditional(elem) return None def _tx_lock_acquire_conditional(self, elem): """ tx_lock is used to maintain the order of messages Let aaaLogout always pass, and not be stuck for locks. """ if elem.tag != "aaaLogout": tx_lock.acquire() def _tx_lock_release_conditional(self, elem): """ Release the global tx_lock. We do not acquire lock for aaaLogout """ if elem.tag != "aaaLogout": tx_lock.release() def file_download( self, url_suffix, file_dir, file_name, progress=Progress()): """ Downloads the file from ucsm server Args: url_suffix (str): suffix url to be appended to http\https://host:port/ to locate the file on the server file_dir (str): The directory to download to file_name (str): The destination file name for the download progress (ucsgenutils.Progress): Class that has method to display progress Returns: None Example: file_download(url_suffix='backupfile/config_backup.xml', dest_dir='/home/user/backup', file_name='my_config_backup.xml') """ from .ucsgenutils import download_file file_url = "%s/%s" % (self.__uri, url_suffix) self.__driver.add_header('Cookie', 'ucsm-cookie=%s' % self.__cookie) download_file(driver=self.__driver, file_url=file_url, file_dir=file_dir, file_name=file_name, progress=progress) self.__driver.remove_header('Cookie') def file_upload( self, url_suffix, file_dir, file_name, progress=Progress()): """ Uploads the file on UCSM server. Args: url_suffix (str): suffix url to be appended to http\https://host:port/ to locate the file on the server source_dir (str): The directory to upload from file_name (str): The destination file name for the download progress (ucsgenutils.Progress): Class that has method to display progress Returns: None Example: source_dir = "/home/user/backup"\n file_name = "config_backup.xml"\n uri_suffix = "operations/file-%s/importconfig.txt" % file_name\n file_upload(url_suffix=uri_suffix, source_dir=source_dir, file_name=file_name) """ from .ucsgenutils import upload_file file_url = "%s/%s" % (self.__uri, url_suffix) self.__driver.add_header('Cookie', 'ucsm-cookie=%s' % self.__cookie) upload_file(self.__driver, uri=file_url, file_dir=file_dir, file_name=file_name, progress=progress) self.__driver.remove_header('Cookie') def __start_refresh_timer(self): """ Internal method to support auto-refresh functionality. """ if self.__refresh_period > 60: interval = int(self.__refresh_period) - 60 else: interval = 60 self.__refresh_timer = Timer(interval, self._refresh) self.__refresh_timer.setDaemon(True) self.__refresh_timer.start() def __stop_refresh_timer(self): """ Internal method to support auto-refresh functionality. """ if self.__refresh_timer is not None: self.__refresh_timer.cancel() self.__refresh_timer = None def _update_cookie(self, response): if response.error_code != 0: return self.__cookie = response.out_cookie def _is_stale_cookie(self, elem): return 'cookie' in elem.attrib and elem.attrib[ 'cookie'] != "" and elem.attrib['cookie'] != self.cookie def _refresh(self, auto_relogin=False): """ Sends the aaaRefresh query to the UCS to refresh the connection (to prevent session expiration). """ from .ucsmethodfactory import aaa_refresh self.__stop_refresh_timer() elem = aaa_refresh(self.__cookie, self.__username, self.__password) response = self.post_elem(elem) if response.error_code != 0: self.__cookie = None if auto_relogin: return self._login() return False self.__cookie = response.out_cookie self.__refresh_period = int(response.out_refresh_period) self.__priv = response.out_priv.split(',') self.__domains = response.out_domains self.__last_update_time = str(time.asctime()) # re-enable the timer self.__start_refresh_timer() return True def __is_ucsm(self, timeout=None): """ Internal method to validate if connecting server is UCS. """ is_ucs = False from .ucsmethodfactory import config_resolve_class nw_elem = config_resolve_class(cookie=self.__cookie, in_filter=None, class_id="networkElement") try: nw_elem_response = self.post_elem(nw_elem, timeout=timeout) if nw_elem_response.error_code != 0: self._logout() else: is_ucs = True except: self._logout() return is_ucs def __validate_connection(self, timeout=None): """ Internal method to validate if needs to reconnect or if exist use the existing connection. """ from .mometa.top.TopSystem import TopSystem from .ucsmethodfactory import config_resolve_dn if self.__cookie is not None and self.__cookie != "": if not self.__force: top_system = TopSystem() elem = config_resolve_dn(cookie=self.__cookie, dn=top_system.dn) response = self.post_elem(elem, timeout=timeout) if response.error_code != 0: return False return True else: self._logout() return False def _update_version(self, response=None): from .ucscoremeta import UcsVersion from .ucsmethodfactory import config_resolve_dn from .mometa.top.TopSystem import TopSystem from .mometa.firmware.FirmwareRunning import FirmwareRunning, \ FirmwareRunningConsts # If the aaaLogin response has the version populated, we do not # need to query for it # There are cases where version is missing from aaaLogin response # In such cases the later part of this method populates it if response.out_version is not None and response.out_version != "": return top_system = TopSystem() firmware = FirmwareRunning(top_system, FirmwareRunningConsts.DEPLOYMENT_SYSTEM) elem = config_resolve_dn(cookie=self.__cookie, dn=firmware.dn) response = self.post_elem(elem) if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) firmware = response.out_config.child[0] self.__version = firmware.version def _update_domain_name_and_ip(self): from .ucsmethodfactory import config_resolve_dn from .mometa.top.TopSystem import TopSystem top_system = TopSystem() elem = config_resolve_dn(cookie=self.__cookie, dn=top_system.dn) response = self.post_elem(elem) if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) top_system = response.out_config.child[0] self.__ucs = top_system.name self.__virtual_ipv4_address = top_system.address def _login(self, auto_refresh=False, force=False, timeout=None): """ Internal method responsible to do a login on UCSM server. Args: auto_refresh (bool): if set to True, it refresh the cookie continuously force (bool): if set to True it reconnects even if cookie exists and is valid for respective connection. timeout (int): if set, this will be used as timeout in secs for urllib2 Returns: True on successful connect """ from .ucsmethodfactory import aaa_login self.__auto_refresh = auto_refresh self.__force = force if self.__validate_connection(timeout=timeout): return True elem = aaa_login(in_name=self.__username, in_password=self.__password) response = self.post_elem(elem, timeout=timeout) if response.error_code != 0: self.__clear() raise UcsException(response.error_code, response.error_descr) self.__update(response) # Verify not to connect to IMC if not self.__is_ucsm(timeout=timeout): raise UcsLoginError("Not a supported server.") self._update_version(response) self._update_domain_name_and_ip() if auto_refresh: self.__start_refresh_timer() return True def _logout(self, timeout=None): """ Internal method to disconnect from ucsm server. Args: None timeout (int): if set, this will be used as timeout in secs for urllib2 Returns: True on successful disconnect """ from .ucsmethodfactory import aaa_logout if self.__cookie is None: return True if self.__refresh_timer: self.__refresh_timer.cancel() elem = aaa_logout(self.__cookie, 301) response = self.post_elem(elem, timeout=timeout) if response.error_code == "555": return True if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) self.__clear() return True def _set_dump_xml(self): """ Internal method to set dump_xml to True """ self.__dump_xml = True def _unset_dump_xml(self): """ Internal method to set dump_xml to False """ self.__dump_xml = False def _set_timeout(self, timeout): """ Internal method to set timeout for the request """ timeout = None if type(timeout) != int else timeout self.__timeout = timeout def _unset_timeout(self): """ Internal method to unset timeout for the request """ self.__timeout = None def _set_mode_threading(self, enable=False): self.__threaded = enable def _get_port(port, secure): if port is not None: return int(port) if secure is False: return 80 return 443 def _get_proto(port, secure): if secure is None: if port == 80: return "http" elif secure is False: return "http" return "https"