# Copyright 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the SDK general utilities.
"""
from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import platform
import re
import subprocess
import logging
log = logging.getLogger('ucs')
from .ucsexception import UcsWarning, UcsValidationException
AFFIRMATIVE_LIST = ['true', 'True', 'TRUE', True, 'yes', 'Yes', 'YES']
reserved_keywords = [
"and", "as", "assert", "break", "class", "continue", "def", "del", "elif",
"else", "except", "exec", "finally", "for", "from", "global", "if",
"import", "in", "is", "lambda", "not", "or", "pass", "print", "raise",
"return", "try", "while", "with", "yield"]
[docs]def is_python_reserved(word):
"""
Check if it is python reserved word.
"""
return word in reserved_keywords
[docs]def to_safe_prop(word):
"""
Check if it is python reserved word, if yes returns word after prefixing
it with 'r_'
"""
return 'r_' + word if is_python_reserved(word) else word
[docs]def from_safe_prop(word):
"""
removes 'r_' from word.
"""
return re.sub("^r_", "", word)
[docs]def to_python_propname(word):
"""
Converts any word to lowercase word separated by underscore
"""
return re.sub('_+', '_',
re.sub('^_', '',
re.sub('[/\-: +]', '_',
re.sub('([A-Z]+)([A-Z])([a-z0-9])', '\g<1>_\g<2>\g<3>',
re.sub('([a-z0-9])([A-Z])', '\g<1>_\g<2>',(word,
to_safe_prop(word))[is_python_reserved(word)])
)))).lower()
[docs]def convert_to_python_var_name(name):
"""converts a ucs server variable to python recommended format
Args:
name (str): string to be converted to python recommended format
"""
pattern = re.compile(r"(?<!^)(?=[A-Z])")
python_var = re.sub(pattern, '_', name).lower()
if python_var != "class":
return python_var
else:
return "class_"
[docs]def word_l(word):
""" Method makes the first letter of the given string as lower case. """
return word[0].lower() + word[1:]
[docs]def word_u(word):
""" Method makes the first letter of the given string as capital. """
return word[0].upper() + word[1:]
[docs]def make_dn(rn_array):
""" Method forms Dn out of array of rns. """
return '/'.join(rn_array)
[docs]class FileReadStream(object):
"""Internal class to show the progress while reading file."""
def __init__(self, path, progress_cb):
self._fhandle = open(path, 'rb')
# Set the seek positin to the end of the file
# and calcualte the total file size
self._fhandle.seek(0, os.SEEK_END)
self._tsize = self._fhandle.tell()
# Reset the position to the beginning of the file
self._fhandle.seek(0)
self._progress_cb = progress_cb
def __len__(self):
return self._tsize
[docs] def read(self, size):
data = self._fhandle.read(size)
self._progress_cb(self._tsize, size)
return data
[docs]class Progress(object):
""" Internal class to show the progress in chunks of custom percentage """
def __init__(self, interval=10):
self._seen = 0.0
self._interval = interval
self._percent = interval
[docs] def update(self, total, size, name=None):
from sys import stdout
self._seen += size
percent = self._seen * 100 / total
if percent < self._percent:
return
status = r"%10d [%3.2f%%]" % (self._seen, percent)
status = status + chr(8) * (len(status) + 1)
stdout.write("\r%s" % status)
stdout.flush()
self._percent += self._interval
[docs]def download_file(driver, file_url, file_dir, file_name, progress=Progress()):
"""
Downloads the file from web server
Args:
driver (UcsDriver)
file_url (str): url to download the file
file_dir (str): The directory to download to
file_name (str): The destination file name for the download
Returns:
None
Example:
driver = UcsDriver()\n
download_file(driver=UcsDriver(), file_url="http://fileurl", file_dir='/home/user/backup', file_name='my_config_backup.xml')
"""
import os
destination_file = os.path.join(file_dir, file_name)
response = driver.post(uri=file_url, read=False)
if sys.version_info > (3, 0):
# Python 3 code in this block
file_size = int(response.headers['Content-Length'])
else:
# Python 2 code in this block
file_size = int(response.info().getheaders("Content-Length")[0])
print("Downloading: %s Bytes: %s" % (file_name, file_size))
file_handle = open(destination_file, 'wb')
block_sz = 64
while True:
r_buffer = response.read(128 * block_sz)
if not r_buffer:
break
file_handle.write(r_buffer)
progress.update(file_size, len(r_buffer))
print('Downloading Finished.')
file_handle.close()
[docs]def upload_file(driver, uri, file_dir, file_name, progress=Progress()):
"""
Uploads the file on web server
Args:
driver (UcsDriver)
uri (str): url to upload the file
file_dir (str): The directory to download to
file_name (str): The destination file name for the download
Returns:
None
Example:
driver = UcsDriver()\n
upload_file(driver=UcsDriver(), uri="http://fileurl", file_dir='/home/user/backup', file_name='my_config_backup.xml')
"""
stream = FileReadStream(os.path.join(file_dir, file_name), progress.update)
response = driver.post(uri, data=stream)
if not response:
raise ValueError("File upload failed.")
[docs]def check_registry_key(java_key):
""" Method checks for the java in the registry entries. """
try:
from _winreg import ConnectRegistry, HKEY_LOCAL_MACHINE, OpenKey, \
QueryValueEx
except:
from winreg import ConnectRegistry, HKEY_LOCAL_MACHINE, OpenKey, \
QueryValueEx
path = None
try:
a_reg = ConnectRegistry(None, HKEY_LOCAL_MACHINE)
r_key = OpenKey(a_reg, java_key)
for i_cnt in range(1024):
current_version = QueryValueEx(r_key, "CurrentVersion")
if current_version is not None:
key = OpenKey(r_key, current_version[0])
if key is not None:
path = QueryValueEx(key, "JavaHome")
return path[0]
except Exception:
UcsWarning("Not able to access registry.")
return None
[docs]def is_binary_in_path(path, binary):
"""
Checks if the given binary is available in the specified path.
Returns:
True or False (Boolean)
"""
import os
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
path = path.strip('"')
exe_file = os.path.join(path, binary)
if is_exe(exe_file):
return True
return False
[docs]def get_binary_path(binary):
"""
Checks the environment PATH variable for the specified binary file.
If found, it returns the path in which it was found.
Example:
path = get_binary_path('javaws')
"""
import os
fpath, fname = os.path.split(binary)
if fpath:
raise UcsValidationException("Expects only binary name, not Path.")
for path in os.environ["PATH"].split(os.pathsep):
if is_binary_in_path(path, fname):
return path
return None
[docs]def get_java_installation_path():
"""
Method returns the java installation path in the windows or
Linux environment.
"""
if platform.system() in ["Linux", "Darwin"]:
path = os.environ.get('JAVA_HOME')
# is javaws in $JAVA_HOME?
if path and is_binary_in_path(path, 'javaws'):
return path + '/' + 'javaws'
# is javaws available in system path?
path = get_binary_path('javaws')
if path:
return path + '/' + 'javaws'
# javaws was not found
raise UcsValidationException(
"Please make sure JAVA is installed and variable JAVA_HOME"
"is set properly.")
# Get JavaPath for Windows
# elif os.name == "nt":
elif platform.system() == "Windows" or platform.system() == "Microsoft":
path = os.environ.get('JAVA_HOME')
if path is None:
path = check_registry_key(
r"SOFTWARE\\JavaSoft\\Java Runtime Environment\\")
if path is None: # Check for 32 bit Java on 64 bit machine.
path = check_registry_key(
r"SOFTWARE\\Wow6432Node\\JavaSoft\\Java Runtime Environment")
if not path:
raise UcsValidationException("Please make sure JAVA is installed.")
else:
path = os.path.join(path, 'bin')
path = os.path.join(path, 'javaws.exe')
if not os.path.exists(path):
raise UcsValidationException(
"javaws.exe is not installed on System at path <%s>." % (
path))
else:
return path
[docs]def check_output(*popenargs, **kwargs):
"""
Internal method to handle upload/download data from server.
"""
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
output, unused_err = process.communicate()
ret_code = process.poll()
if ret_code:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise subprocess.CalledProcessError(ret_code, cmd, output=output)
return output
[docs]def get_java_version():
"""
Method to get java version.
"""
try:
subprocess.check_output
except Exception:
subprocess.check_output = check_output
java_ver_full_str = subprocess.check_output(["java", "-version"],
stderr=subprocess.STDOUT)
java_ver_full_str = java_ver_full_str.decode()
java_ver_match = re.match(r'java version.*?"(.*?)"', java_ver_full_str)
java_ver_str = java_ver_match.groups()[0]
return java_ver_str
[docs]def get_md5_sum(filename):
"""
Method to get md5sum for the image.
Args:
filename (str): file for which md5sum is to be computed
"""
import hashlib
md5_obj = hashlib.md5()
file_handler = open(filename, 'rb')
for chunk in iter(lambda: file_handler.read(128 * md5_obj.block_size), b''):
md5_obj.update(chunk)
file_handler.close()
return md5_obj.hexdigest()
[docs]def get_sha_hash(input_string):
"""
Method returns the sha hash digest for a given string.
Args:
input_string (str): the input string for which sha has to be computed
"""
import hashlib
return hashlib.md5(input_string).digest()
[docs]def expand_key(key, clen):
"""
Internal method supporting encryption and decryption functionality.
"""
try:
xrange
except:
xrange = range
import hashlib
from array import array
blocks = (clen + 19) / 20
x_key = []
seed = key
for i_cnt in xrange(blocks):
seed = hashlib.md5(key + seed).digest()
x_key.append(seed)
j_str = ''.join(x_key)
return array('L', j_str)
[docs]def encrypt_password(password, key):
"""
Encrypts the password using the given key.
Args:
password (str): password to be encrypted
key (str): key to be used to encrypt the password
"""
from time import time
from array import array
import hmac
import base64
try:
xrange
except:
xrange = range
h_hash = get_sha_hash
uhash = h_hash(','.join(str(x) for x in
[repr(time()), repr(os.getpid()),
repr(len(password)),
password, key]))[:16]
k_enc, k_auth = h_hash('enc' + key + uhash), h_hash('auth' + key + uhash)
pwd_len = len(password)
password_stream = array('L', password + '0000'[pwd_len & 3:])
x_key = expand_key(k_enc, pwd_len + 4)
for i_cnt in xrange(len(password_stream)):
password_stream[i_cnt] = password_stream[i_cnt] ^ x_key[i_cnt]
cipher_t = uhash + password_stream.tostring()[:pwd_len]
auth = hmac.new(cipher_t, k_auth).digest()
encrypt_str = cipher_t + auth[:8]
encoded_str = base64.encodestring(encrypt_str)
encrypted_password = encoded_str.rstrip('\n')
return encrypted_password
[docs]def decrypt_password(cipher, key):
"""
Decrypts the password using the given key with which the password
was encrypted first.
"""
import base64
from array import array
try:
xrange
except:
xrange = range
h_hash = get_sha_hash
cipher += "\n"
cipher = base64.decodestring(cipher)
cipher_len = len(cipher) - 16 - 8
uhash = cipher[:16]
password_stream = cipher[16:-8] + "0000"[cipher_len & 3:]
# auth = cipher[-8:]
k_enc = h_hash('enc' + key + uhash)
# k_auth = h_hash('auth' + key + uhash)
# vauth = hmac.new(cipher[-8:], k_auth, sha).digest()[:8]
password_stream = array('L', password_stream)
x_key = expand_key(k_enc, cipher_len + 4)
for i in xrange(len(password_stream)):
password_stream[i] = password_stream[i] ^ x_key[i]
decrypted_password = password_stream.tostring()[:cipher_len]
return decrypted_password
[docs]def iteritems(d):
"""
Factor-out Py2-to-3 differences in dictionary item iterator methods
"""
try:
return d.iteritems()
except AttributeError:
return d.items()