Source code for ucsmsdk.ucssession

# Copyright 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import logging
import threading
from threading import Timer

from .ucsexception import UcsException, UcsLoginError
from .ucsdriver import UcsDriver
from .ucsgenutils import Progress

log = logging.getLogger('ucs')
tx_lock = threading.Lock()


[docs]class UcsSession(object): """ UcsSession class is session interface for any Ucs related communication. Parent class of UcsHandle, used internally by UcsHandle class. """ def __init__(self, ip, username, password, port=None, secure=None, proxy=None): self.__ip = ip self.__username = username self.__password = password self.__proxy = proxy self.__uri = self.__create_uri(port, secure) self.__ucs = ip self.__name = None self.__cookie = None self.__session_id = None self.__version = None self.__refresh_period = None self.__priv = None self.__domains = None self.__channel = None self.__evt_channel = None self.__last_update_time = None self.__refresh_timer = None self.__force = False self.__dump_xml = False self.__redirect = False self.__driver = UcsDriver(proxy=self.__proxy) @property def ip(self): return self.__ip @property def username(self): return self.__username @property def proxy(self): return self.__proxy @property def uri(self): return self.__uri @property def ucs(self): return self.__ucs @property def name(self): return self.__name @property def cookie(self): return self.__cookie @property def session_id(self): return self.__session_id @property def version(self): return self.__version @property def refresh_period(self): return self.__refresh_period @property def priv(self): return self.__priv @property def domains(self): return self.__domains @property def channel(self): return self.__channel @property def evt_channel(self): return self.__evt_channel @property def last_update_time(self): return self.__last_update_time def __create_uri(self, port, secure): """ Generates UCSM URI used for connection Args: port (int or None): The port number to be used during connection secure (bool or None): True for secure connection otherwise False Returns: uri (str) Example: uri = __create_uri(port=443, secure=True) """ port = _get_port(port, secure) protocol = _get_proto(port, secure) uri = "%s://%s%s%s" % (protocol, self.__ip, ":", str(port)) return uri def __clear(self): """ Internal method to clear the session variables """ self.__name = None self.__cookie = None self.__session_id = None self.__version = None self.__refresh_period = None self.__priv = None self.__domains = None self.__channel = None self.__evt_channel = None self.__last_update_time = str(time.asctime()) def __update(self, response): """ Internal method to update the session variables """ from .ucscoremeta import UcsVersion self.__name = response.out_name self.__cookie = response.out_cookie self.__session_id = response.out_session_id self.__version = UcsVersion(response.out_version) self.__refresh_period = response.out_refresh_period self.__priv = response.out_priv self.__domains = response.out_domains self.__channel = response.out_channel self.__evt_channel = response.out_evt_channel self.__last_update_time = str(time.asctime())
[docs] def post(self, uri, data=None, read=True): """ sends the request and receives the response from ucsm server Args: uri (str): URI of the the UCS Server data (str): request data to send via post request Returns: response xml string Example: response = post("http://192.168.1.1:80", data=xml_str) """ response = self.__driver.post(uri=uri, data=data, read=read) return response
[docs] def post_xml(self, xml_str, read=True): """ sends the xml request and receives the response from ucsm server Args: xml_str (str): xml string Returns: response xml string Example: response = post_xml('<aaaLogin inName="user" inPassword="pass">') """ ucsm_uri = self.__uri + "/nuova" response_str = self.post(uri=ucsm_uri, data=xml_str, read=read) if self.__driver.redirect_uri: self.__uri = self.__driver.redirect_uri return response_str
[docs] def dump_xml_request(self, elem): from . import ucsxmlcodec as xc if not self.__dump_xml: return if elem.tag == "aaaLogin": elem.attrib['inPassword'] = "*********" xml_str = xc.to_xml_str(elem) log.debug('%s ====> %s' % (self.__uri, xml_str)) elem.attrib['inPassword'] = self.__password xml_str = xc.to_xml_str(elem) else: xml_str = xc.to_xml_str(elem) log.debug('%s ====> %s' % (self.__uri, xml_str))
[docs] def dump_xml_response(self, resp): if self.__dump_xml: log.debug('%s <==== %s' % (self.__uri, resp))
[docs] def post_elem(self, elem): """ sends the request and receives the response from ucsm server using xml element Args: elem (xml element) Returns: response xml string Example: response = post_elem(elem=xml_element) """ from . import ucsxmlcodec as xc tx_lock.acquire() if self._is_stale_cookie(elem): elem.attrib['cookie'] = self.cookie self.dump_xml_request(elem) xml_str = xc.to_xml_str(elem) response_str = self.post_xml(xml_str) self.dump_xml_response(response_str) if response_str: response = xc.from_xml_str(response_str, self) # Cookie update should happen with-in the lock # this ensures that the next packet goes out # with the new cookie if elem.tag == "aaaRefresh": self._update_cookie(response) tx_lock.release() return response tx_lock.release() return None
[docs] def file_download( self, url_suffix, file_dir, file_name, progress=Progress()): """ Downloads the file from ucsm server Args: url_suffix (str): suffix url to be appended to http\https://host:port/ to locate the file on the server file_dir (str): The directory to download to file_name (str): The destination file name for the download progress (ucsgenutils.Progress): Class that has method to display progress Returns: None Example: file_download(url_suffix='backupfile/config_backup.xml', dest_dir='/home/user/backup', file_name='my_config_backup.xml') """ from .ucsgenutils import download_file file_url = "%s/%s" % (self.__uri, url_suffix) self.__driver.add_header('Cookie', 'ucsm-cookie=%s' % self.__cookie) download_file(driver=self.__driver, file_url=file_url, file_dir=file_dir, file_name=file_name, progress=progress) self.__driver.remove_header('Cookie')
[docs] def file_upload( self, url_suffix, file_dir, file_name, progress=Progress()): """ Uploads the file on UCSM server. Args: url_suffix (str): suffix url to be appended to http\https://host:port/ to locate the file on the server source_dir (str): The directory to upload from file_name (str): The destination file name for the download progress (ucsgenutils.Progress): Class that has method to display progress Returns: None Example: source_dir = "/home/user/backup"\n file_name = "config_backup.xml"\n uri_suffix = "operations/file-%s/importconfig.txt" % file_name\n file_upload(url_suffix=uri_suffix, source_dir=source_dir, file_name=file_name) """ from .ucsgenutils import upload_file file_url = "%s/%s" % (self.__uri, url_suffix) self.__driver.add_header('Cookie', 'ucsm-cookie=%s' % self.__cookie) upload_file(self.__driver, uri=file_url, file_dir=file_dir, file_name=file_name, progress=progress) self.__driver.remove_header('Cookie')
def __start_refresh_timer(self): """ Internal method to support auto-refresh functionality. """ if self.__refresh_period > 60: interval = int(self.__refresh_period) - 60 else: interval = 60 self.__refresh_timer = Timer(interval, self._refresh) self.__refresh_timer.setDaemon(True) self.__refresh_timer.start() def __stop_refresh_timer(self): """ Internal method to support auto-refresh functionality. """ if self.__refresh_timer is not None: self.__refresh_timer.cancel() self.__refresh_timer = None def _update_cookie(self, response): if response.error_code != 0: return self.__cookie = response.out_cookie def _is_stale_cookie(self, elem): return 'cookie' in elem.attrib and elem.attrib[ 'cookie'] != "" and elem.attrib['cookie'] != self.cookie def _refresh(self, auto_relogin=False): """ Sends the aaaRefresh query to the UCS to refresh the connection (to prevent session expiration). """ from .ucsmethodfactory import aaa_refresh self.__stop_refresh_timer() elem = aaa_refresh(self.__cookie, self.__username, self.__password) response = self.post_elem(elem) if response.error_code != 0: self.__cookie = None if auto_relogin: return self._login() return False self.__cookie = response.out_cookie self.__refresh_period = int(response.out_refresh_period) self.__priv = response.out_priv.split(',') self.__domains = response.out_domains self.__last_update_time = str(time.asctime()) # re-enable the timer self.__start_refresh_timer() return True def __is_ucsm(self): """ Internal method to validate if connecting server is UCS. """ is_ucs = False from .ucsmethodfactory import config_resolve_class nw_elem = config_resolve_class(cookie=self.__cookie, in_filter=None, class_id="networkElement") try: nw_elem_response = self.post_elem(nw_elem) if nw_elem_response.error_code != 0: self._logout() else: is_ucs = True except: self._logout() return is_ucs def __validate_connection(self): """ Internal method to validate if needs to reconnect or if exist use the existing connection. """ from .mometa.top.TopSystem import TopSystem from .ucsmethodfactory import config_resolve_dn if self.__cookie is not None and self.__cookie != "": if not self.__force: top_system = TopSystem() elem = config_resolve_dn(cookie=self.__cookie, dn=top_system.dn) response = self.post_elem(elem) if response.error_code != 0: return False return True else: self._logout() return False def _update_version(self, response=None): from .ucscoremeta import UcsVersion from .ucsmethodfactory import config_resolve_dn from .mometa.top.TopSystem import TopSystem from .mometa.firmware.FirmwareRunning import FirmwareRunning, \ FirmwareRunningConsts # If the aaaLogin response has the version populated, we do not # need to query for it # There are cases where version is missing from aaaLogin response # In such cases the later part of this method populates it if response.out_version is not None and response.out_version != "": return top_system = TopSystem() firmware = FirmwareRunning(top_system, FirmwareRunningConsts.DEPLOYMENT_SYSTEM) elem = config_resolve_dn(cookie=self.__cookie, dn=firmware.dn) response = self.post_elem(elem) if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) firmware = response.out_config.child[0] self.__version = UcsVersion(firmware.version) def _update_domain_name_and_ip(self): from .ucsmethodfactory import config_resolve_dn from .mometa.top.TopSystem import TopSystem top_system = TopSystem() elem = config_resolve_dn(cookie=self.__cookie, dn=top_system.dn) response = self.post_elem(elem) if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) top_system = response.out_config.child[0] self.__ucs = top_system.name self.__virtual_ipv4_address = top_system.address def _login(self, auto_refresh=False, force=False): """ Internal method responsible to do a login on UCSM server. Args: auto_refresh (bool): if set to True, it refresh the cookie continuously force (bool): if set to True it reconnects even if cookie exists and is valid for respective connection. Returns: True on successful connect """ from .ucsmethodfactory import aaa_login self.__force = force if self.__validate_connection(): return True elem = aaa_login(in_name=self.__username, in_password=self.__password) response = self.post_elem(elem) if response.error_code != 0: self.__clear() raise UcsException(response.error_code, response.error_descr) self.__update(response) # Verify not to connect to IMC if not self.__is_ucsm(): raise UcsLoginError("Not a supported server.") self._update_version(response) self._update_domain_name_and_ip() if auto_refresh: self.__start_refresh_timer() return True def _logout(self): """ Internal method to disconnect from ucsm server. Args: None Returns: True on successful disconnect """ from .ucsmethodfactory import aaa_logout if self.__cookie is None: return True if self.__refresh_timer: self.__refresh_timer.cancel() elem = aaa_logout(self.__cookie, 301) response = self.post_elem(elem) if response.error_code == "555": return True if response.error_code != 0: raise UcsException(response.error_code, response.error_descr) self.__clear() return True def _set_dump_xml(self): """ Internal method to set dump_xml to True """ self.__dump_xml = True def _unset_dump_xml(self): """ Internal method to set dump_xml to False """ self.__dump_xml = False
def _get_port(port, secure): if port is not None: return int(port) if secure is False: return 80 return 443 def _get_proto(port, secure): if secure is None: if port == "80": return "http" elif secure is False: return "http" return "https"