######################################################################## # Copyright (C) 2019 VMWare, Inc. # # All Rights Reserved # ######################################################################## ''' Define the base class for base image spec and addon spec. This class contains helper function for serialization/deserialization, name/version spec classes. ''' from collections import OrderedDict from copy import deepcopy from datetime import datetime import json import re from .Bulletin import ComponentCollection from .AcceptanceLevels import (AcceptedAcceptanceLevel, CertifiedAcceptanceLevel, CommunityAcceptanceLevel, PartnerAcceptanceLevel) from .Errors import MissingComponentError from .Utils import XmlUtils from .Utils.Misc import isPython3OrLater from .Version import VibVersion ESX_COMP_NAME = 'ESXi' TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' CATEGORY_ENHANCEMENT = 'enhancement' CATEGORY_SECURITY = 'security' CATEGORY_BUGFIX = 'bugfix' # Attributes name constants ATTR_REL_ID = 'releaseID' ATTR_REL_TYPE = 'releaseType' ATTR_REL_DATE = 'releaseDate' ATTR_CATEGORY = 'category' ATTR_VENDOR = 'vendor' ATTR_DESC = 'description' ATTR_SUMMARY = 'summary' ATTR_DOCURL = 'docURL' ATTR_ACPT_LVL = 'acceptanceLevel' ATTR_SCHM_VER = 'schemaVersion' ATTR_VER_SPEC = 'versionSpec' ATTR_COMPS = 'components' ATTR_VER = 'version' ATTR_UISTR = 'uiString' ATTR_NAME = 'name' # Constant for NameSpec and VersionSpec validation VER_REG_EXP = \ r'^[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)*-[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)*$' BASEIMAGE = 'baseimage' ADDON = 'addon' MIN_NAME_LEN = 3 MIN_VER_LEN = 3 MIN_UISTR_LEN = 3 MAX_NAME_LEN = 35 MAX_VER_LEN = 35 MAX_UISTR_LEN = 70 MAX_DESC_LEN = 2048 MAX_SUMM_LEN = 2048 # Constant to sanitize the values of the release unit attributes. MIN_LEN = 3 MAX_LEN = 35 SCH_VER_REG_EXP = r'^([0-9]+)\.([0-9]+)$' DOCURL_REG_EXP = \ r'^(|(http[s]?://(\w+|[\-._~:/?#\[\]@!$&\'\(\)*+,;=])+))$' class JsonParsingError(Exception): pass def _IsStr(value): """Checks whether type of value is str. """ if isPython3OrLater(): return isinstance(value, str) else: return isinstance(value, basestring) class _CustomizedJSONEncoder(json.JSONEncoder): ''' This is a helper function to change the behavior of the default JSON encoder for (1) datetime: not JSON serializable, encode to its ISO format string; (2) name/version spec: to its internal attribute dict. ''' def default(self, obj): # pylint: disable=E0202 if isinstance(obj, datetime): return obj.isoformat() try: return json.JSONEncoder.default(self, obj) except TypeError: try: return obj.ToJSONDict() except: # Recursively remove the type for serialization. return obj.__dict__ def _SetAttributes(obj, attrKeys, attrDict): for name in attrKeys: setattr(obj, '_' + name, deepcopy(attrDict[name])) def checkNameSpec(func): def checker(obj, param): if not isinstance(param, NameSpec) and \ not isinstance(param, dict): raise TypeError('The argument must be a NameSpec.') if isinstance(param, dict): try: func(obj, NameSpec(param[ATTR_NAME], param[ATTR_UISTR])) except KeyError as e: raise AttributeError('Missing attribute %s in NameSpec.' % e) else: func(obj, param) return checker def checkVersionSpec(func): def checker(obj, param): if not isinstance(param, VersionSpec) and \ not isinstance(param, dict): raise TypeError('The argument must be a VersionSpec.') if isinstance(param, dict): try: func(obj, VersionSpec(param[ATTR_VER], param[ATTR_UISTR])) except KeyError as e: raise AttributeError('Missing attribute %s in versionSpec.' % e) else: func(obj, param) return checker class NameSpec(object): def __init__(self, name, uiString): if not _IsStr(name) or \ len(name) < MIN_NAME_LEN or \ len(name) > MAX_NAME_LEN: raise ValueError('The "name" attribute of NameSpec must be ' 'of type string and %d to %d characters long.' \ % (MIN_NAME_LEN, MAX_NAME_LEN)) if not _IsStr(uiString) or \ len(uiString) < MIN_UISTR_LEN or \ len(uiString) > MAX_UISTR_LEN: raise ValueError('The "uiString" attribute of NameSpec must ' 'be of type string and %d to %d characters long.' \ % (MIN_UISTR_LEN, (2 * MAX_NAME_LEN))) self.name = name self.uiString = uiString def __eq__(self, other): return self.name == other.name and self.uiString == other.uiString def ToJSONDict(self): return self.__dict__ class VersionSpec(object): def __init__(self, version, uiString): if isinstance(version, VibVersion): version = version.versionstring errMsg = None if not _IsStr(version) or \ len(version) < MIN_VER_LEN or \ len(version) > MAX_VER_LEN: errMsg = 'The "version" attribute of VersionSpec must be ' \ 'of type string. It must be %d to %d characters long.' \ % (MIN_VER_LEN, MAX_VER_LEN) if not re.match(VER_REG_EXP, version): if not errMsg: errMsg = 'The "version" attribute of VersionSpec must be ' else: errMsg += 'It must be ' errMsg += 'of form "[x(.x)*-x(.x)*]" where x, is alphanumeric.' if errMsg: raise ValueError(errMsg) if not _IsStr(uiString) or \ len(uiString) < MIN_UISTR_LEN or \ len(uiString) > MAX_UISTR_LEN: raise ValueError('The "uiString" attribute of NameSpec must ' 'be of type string %d to %d characters long.' \ % (MIN_UISTR_LEN, (2 * MAX_VER_LEN))) self.version = version self.uiString = uiString def SetVersion(self, version): self._version = version if _IsStr(version): self._version = VibVersion.fromstring(version) def SetUIString(self, uiString): self._uiString = uiString version = property(lambda self: self._version, SetVersion) uiString = property(lambda self: self._uiString, SetUIString) def __eq__(self, other): return self.version == other.version and self.uiString == other.uiString def ToJSONDict(self): return dict({ATTR_VER: self.version.versionstring, ATTR_UISTR: self.uiString}) _DictToNamespace = lambda x : NameSpec(x['name'], x['uiString']) class ReleaseUnit(object): ''' This is the common piece for base image and add on. Attributes: releaseID: A unique identifier for this release unit. releaseType: 'baseimage', 'addon' or 'manifest'; defined for generating release unit from release unit doc with the right type and for filtering release unit doc. vendor: The vendor name. category: The cagory of the release unit. Acceptable values are either of 'enhancement', 'security', or 'bugfix'. summary: A summary of the release unit. docURL: The docURL description: The description of the release unit. acceptanceLevel: The acceptance level of the release unit; should be the lowest acceptance level of all components. releaseDate: When this release unit is created. components: The component list ''' attributes = (ATTR_REL_ID, ATTR_REL_TYPE, ATTR_CATEGORY, ATTR_VENDOR, ATTR_DESC, ATTR_SUMMARY, ATTR_DOCURL, ATTR_ACPT_LVL, ATTR_REL_DATE, ATTR_SCHM_VER, ATTR_VER_SPEC, ATTR_COMPS) defaultValues = (None, None, CATEGORY_ENHANCEMENT, "", "", "", "", PartnerAcceptanceLevel.level, None, None, None, {}) defaultMap = dict(zip(attributes, defaultValues)) mandatoryAttr = [ATTR_VENDOR, ATTR_VER_SPEC, ATTR_REL_DATE] validAcceptanceLevels = (CertifiedAcceptanceLevel.level, AcceptedAcceptanceLevel.level, PartnerAcceptanceLevel.level, CommunityAcceptanceLevel.level) # Valid category. validCategoryList = (CATEGORY_ENHANCEMENT, CATEGORY_SECURITY, CATEGORY_BUGFIX) # Equalty check ignore list equalIgnores = () # Type convertors typeConverters = {'nameSpec': _DictToNamespace} def _SetSubclassAttributes(self, releaseObj): ''' Interface for subclass to set mandatory member into release unit doc. ''' for name in self.__class__.extraAttributes: releaseObj[name] = getattr(self, name) def _GetSubclassAttributes(self, releaseObj): ''' Interface for subclass to get mandatory member from release unit doc. ''' missingAttrs = list() errors = list() for name in self.__class__.extraAttributes: value = releaseObj[name] if isinstance(value, dict) and name in self.__class__.typeConverters: value = ReleaseUnit.typeConverters[name](value) try: setattr(self, name, value) except KeyError as key: missingAttrs.append(key) except Exception as err: missingAttrs.append(name) msg = 'Error: %s.' % str(err) errors.append(msg) if missingAttrs: errMsg = 'Corrupted release unit doc. Missing ' \ 'attributes [%s]\n' % (', '.join(missingAttrs)) if errors: errMsg += '%s' % ('\n'.join(errors)) raise AttributeError(errMsg) def _CreateFromDoc(self, releaseDoc): ''' Convert release unit doc string into release unit object: Deserialize the json string Preprocess: convert members to the right type Populate members from dict ''' # Deserilize the json stringi for release unit doc. try: releaseObj = json.loads(releaseDoc) except Exception as err: raise JsonParsingError('Failed to parse json spec, ' 'error: %s.' % str(err)) self.FromJSONDict(releaseObj) def FromJSONDict(self, releaseObj): ''' Convert release unit dict into release unit object: Check release unit type Convert component and solution to right python type Convert releaseDate from string to datetime Check mandatory attributes ''' # Copy the components. toBeProcessed = list(self.__class__.attributes) # Ignore releaseID and schemaVersion since they are internal # and derived internally. toBeProcessed.remove(ATTR_REL_ID) toBeProcessed.remove(ATTR_SCHM_VER) # Copy other members. errors = list() for name in releaseObj.keys(): if name not in toBeProcessed: continue try: value = releaseObj[name] setattr(self, name, value) toBeProcessed.remove(name) except KeyError: pass except Exception as err: msg = 'Err: %s' % str(err) errors.append(msg) # When missing member is found, throw exception. if toBeProcessed: errMsg = 'Corrupted release unit doc. Incorrect or missing ' \ 'attributes [%s]:\n' % (', '.join(toBeProcessed)) if errors: errMsg += '%s' % ('\n'.join(errors)) raise AttributeError(errMsg) # Get mandatory member for subclass. self._GetSubclassAttributes(releaseObj) self._GenerateReleaseID() def _AttributeEqual(self, other, attrList): for name in attrList: if name not in self.__class__.equalIgnores: if getattr(self, name) != getattr(other, name): return False return True def __eq__(self, other): if not isinstance(other, ReleaseUnit): return False cls = self.__class__ if not self._AttributeEqual(other, cls.attributes): return False return self._AttributeEqual(other, cls.extraAttributes) def _CheckMandatoryAttr(self): wrongAttr = [name for name in self.__class__.mandatoryAttr if not getattr(self, '%s' % name, None)] if wrongAttr: raise AttributeError('Missing mandatory members: %s' % ','.join(wrongAttr)) def _PopulateComponentsInternal(self, compSource): if not compSource: return sourceComponents = ComponentCollection(compSource.bulletins) missing = [] for name in self._components: cid = name + '_' + self._components[name] try: comp = sourceComponents.GetComponent(cid) except KeyError: missing.append(name, self._components[name]) self._componentCollection.AddComponent(comp) if missing: raise MissingComponentError('Missing component %s in release unit %s' % (missing, self._releaseID)) def __init__(self, spec=None, compSource=None): ''' A release unit is created in following ways: 1. Fully empty object 2. Only has releaseID 3. Created from a spec doc without releaseID 4. Created from a spec with releaseID 4. Created from ID, spec and component source. ''' cls = self.__class__ _SetAttributes(self, cls.attributes, cls.defaultMap) _SetAttributes(self, cls.extraAttributes, cls.extraMap) self._schemaVersion = cls.SCHEMA_VERSION self._componentCollection = ComponentCollection() if spec is not None: self._CreateFromDoc(spec) self._CheckMandatoryAttr() else: self._releaseType = cls.releaseType if not self._releaseDate: self.releaseDate = datetime.utcnow() self._PopulateComponentsInternal(compSource) def PopulateComponents(self, compSource): if not compSource: return self._componentCollection = ComponentCollection() self._PopulateComponentsInternal(compSource) def GetComponentsVersions(self): return self._components def GetComponentVersion(self, name): try: return self._components[name] except KeyError: raise ValueError('The component %s is not found' % name) def GetComponent(self, name): ''' Get the full component object. ''' raise Exception('Not implemented.') @checkVersionSpec def SetVersionSpec(self, version): self._versionSpec = version self._GenerateReleaseID() def SetVendor(self, vendor): if not _IsStr(vendor): raise ValueError('The vendor must be of type string.') if len(vendor) < MIN_LEN or \ len(vendor) > MAX_LEN: raise ValueError('The vendor name length must be 3 ' 'to 35 characters long.') self._vendor = vendor def SetDocURL(self, docUrl): if not _IsStr(docUrl): raise ValueError('The docUrl must be of type string.') if not re.match(DOCURL_REG_EXP, docUrl): raise ValueError('The docUrl must be a http/https url.') self._docURL = docUrl def SetDescription(self, description): if not _IsStr(description) or \ len(description) > MAX_DESC_LEN: raise ValueError('The description must be of type string and ' 'at most 2048 characters long.') self._description = description def SetSummary(self, summary): if not _IsStr(summary) or \ len(summary) > MAX_SUMM_LEN: raise ValueError('The summary must be of type string and ' 'at most 2048 characters long.') self._summary = summary def AddComponentByVersion(self, name, version): ''' Add the index info to the simple component list. ''' self._components[name] = version def AddComponent(self, comp): ''' This method has an full component object as input. The provided component wil be added into the component collection. ''' self._componentCollection.AddComponent(comp) name = comp.componentnamespec['name'] version = str(comp.componentversionspec['version']) self._components[name] = version def RemoveComponent(self, compName): try: version = self._components.pop(compName) self._componentCollection.RemoveComponent(compName, version) except KeyError: pass def AddComponents(self, comps): for comp in comps: self.AddComponent(comp) def _ClearComponents(self): self._components.clear() if hasattr(self, '_componentCollection'): self._componentCollection.clear() def SetComponentsByVersion(self, nameVersionDict): self._ClearComponents() for name in nameVersionDict: self.AddComponentByVersion(name, nameVersionDict[name]) def SetComponents(self, comps): ''' Set the component collection. ''' self._ClearComponents() self.AddComponents(comps) def SetAcceptanceLevel(self, level): if not _IsStr(level) or \ not level.lower() in self.__class__.validAcceptanceLevels: raise ValueError('Invalid acceptance value %s.' % level) self._acceptanceLevel = level.lower() def SetCategory(self, category): if not _IsStr(category) or \ not category.lower() in self.__class__.validCategoryList: raise ValueError('Invalid category value %s.' % category) self._category = category.lower() def SetReleaseDate(self, date): if not _IsStr(date) and \ not isinstance(date, datetime): raise ValueError('The date must be of type either string or datetime' ' and matches ISO8601 format "%s".' % TIME_FORMAT) self._releaseDate = date if _IsStr(date): self._releaseDate = datetime.strptime(date, TIME_FORMAT) def SetReleaseType(self, relType): if relType != self.__class_.releaseType: raise ValueError('Invalid release type value %s' % relType) self._schemaVersion = relType releaseID = property(lambda self: self._releaseID) releaseType = property(lambda self: self._releaseType, SetReleaseType) category = property(lambda self: self._category, SetCategory) vendor = property(lambda self: self._vendor, SetVendor) description = property(lambda self: self._description, SetDescription) summary = property(lambda self: self._summary, SetSummary) docURL = property(lambda self: self._docURL, SetDocURL) acceptanceLevel = property(lambda self: self._acceptanceLevel, SetAcceptanceLevel) releaseDate = property(lambda self: self._releaseDate, SetReleaseDate) components = property(lambda self: self._components, SetComponentsByVersion) versionSpec = property(lambda self: self._versionSpec, SetVersionSpec) schemaVersion = property(lambda self: self._schemaVersion) def ToJSONDict(self): # Make sure all mandatory members exist. self._CheckMandatoryAttr() # Create a release unit dictionary. releaseObj = OrderedDict() for name in self.__class__.attributes: if name == ATTR_VER_SPEC: break releaseObj[name] = getattr(self, name) releaseObj[ATTR_REL_DATE] = self.releaseDate.isoformat() # Set members for child class. self._SetSubclassAttributes(releaseObj) releaseObj[ATTR_VER_SPEC] = deepcopy(self.versionSpec.ToJSONDict()) releaseObj[ATTR_COMPS] = deepcopy(self.components) return releaseObj def ToJSON(self): # Serialize to json string. obj = self.ToJSONDict() return json.dumps(obj, cls=_CustomizedJSONEncoder, indent=3) def CollectReservedComponents(self, comps): ''' Return a list of components that are not effective components but listed in base image or addon. Parameters: comps - The complete component colelction. Returns: A list of component (name, version) pairs. ''' reservedCIDs = [] for name in self.components: version = self.components[name] if not comps.HasComponent(name, version): reservedCIDs.append((name, version)) return reservedCIDs