######################################################################## # Copyright (C) 2019 VMWare, Inc. # # All Rights Reserved # ######################################################################## ''' Define the base class for _ReleaseCollection and concrete classes BaseImageCollection and AddonCollection, which are dictionaries of specific release types. Provide functionality such as serialization to a directory, deserialization from a dictionary, adding single release object from JSON spec. ''' import os import sys import shutil from copy import deepcopy from .Addon import Addon from .BaseImage import BaseImage from .Errors import (ReleaseUnitIOError, ReleaseUnitConflictError, ManifestValidationError) from .Manifest import Manifest from .Solution import Solution from .Utils.Misc import isPython3OrLater genReleaseUnitFileName = lambda x: x.replace(':', '_') + '.json' def _ToStr(spec): """Convert bytes to str. """ if isinstance(spec, bytes): spec = spec.decode("utf-8") return spec def _CheckDirectory(path, operation): """Check a path is a directory or not. If not, ReleaseUnitIOError is raised. """ if not os.path.isdir(path): msg = 'Failed to %s ReleaseCollection, %s is not a directory.' % \ (operation, path) raise ReleaseUnitIOError(msg) def _CheckSupportedBIVersionOverlap(biVersions, biVersionsToAdd): """Checks if there is any overlap in two given lists of supported base image versions. The supported base image versions are full or partial base image versions without wild cards (*). Example: Version1: 7.1 Version2: 7.1.0 Version3: 7.1.0-1.1.12345 Version4: 7.1.1-2.2.23456 Version1 conflicts with Version2, Version3 and Version4 Version2 conflicts with Version3 Params: * biVersions - Supported base image versions of an existing manifest * biVersionsToAdd - Supported base image versions of the manifest to be added Return: True if there is overlap between two lists of supported base image versions """ for v1 in biVersions: for v2 in biVersionsToAdd: if v1.startswith(v2) or v2.startswith(v1): return True return False class _ReleaseCollection(dict): """The parent class to hold the common code for the release unit collection classes. """ def update(self, other): """Merge another ReleaseCollection into the current one: check whether unequal release units share the same release ID. Exceptions: ReleaseUnitConflictError : When unequal release units share the same release ID. """ for releaseID in other: if releaseID in self: if self[releaseID] != other[releaseID]: releaseType = self[releaseID].releaseType.lower() msg = ('Two %ss share the same releaseID: %s' % (releaseType, releaseID)) raise ReleaseUnitConflictError(releaseID, releaseType, msg) else: self[releaseID] = other[releaseID] def __add__(self, other): """Adds two ReleaseCollections and return a new collection. """ newCollection = self.__class__() newCollection.update(self) newCollection.update(other) return newCollection def __iadd__(self, other): """Adds the items of second ReleaseCollection into this one. """ self.update(other) return self def ToDirectory(self, path): """Write release unit objects to directory. Parameters: * path - A string specifying a directory name. Exceptions: * ReleaseUnitIOError - The specified directory is not a directory or cannot create an empty directory """ if not self: return try: if os.path.isdir(path): shutil.rmtree(path) os.makedirs(path) except EnvironmentError as e: msg = 'Could not create dir %s for %s: %s' % \ (path, self.__class__.__name__, e) raise ReleaseUnitIOError(msg) _CheckDirectory(path, 'write') for ut in self.values(): filepath = os.path.join(path, genReleaseUnitFileName(ut.releaseID)) try: with open(filepath, 'wb') as f: f.write(ut.ToJSON().encode('utf-8')) except (EnvironmentError, IOError) as e: msg = ('Failed to write %s file %s: %s' % (ut.releaseUnitType, filepath, e)) raise ReleaseUnitIOError(msg) def FromDirectory(self, path, validate=False): """Populate this _ReleaseColelction instance from a directory of base image or addon JSON files. Parameters: * path - A string specifying a directory name. * validate - If true perform metadata schema validation. Raises: * ReleaseUnitIOError - Path not found, or read error * ValueError - When the spec doesn't contain mandatory data """ if not os.path.exists(path): msg = 'The directory %s for base image/addon does not exist.' % (path) raise ReleaseUnitIOError(msg) _CheckDirectory(path, 'read') for root, dirs, files in os.walk(path, topdown=True): for name in files: filepath = os.path.join(root, name) try: with open(filepath, 'r') as fp: self.AddFromJSON(fp.read(), validate=validate) except (EnvironmentError, IOError) as e: raise ReleaseUnitIOError("Could not read the file %s: %s" % (filepath, e)) class BaseImageCollection(_ReleaseCollection): def AddBaseImage(self, baseImageToAdd, replace=False): """Adds a Base Image to the collection Params: * baseImageToAdd - The Base Image to add to the collection * replace - If a Base Image already exists with the same version or release ID, replace it. Exceptions: * KeyError - When attempting to add a Base Image whose version or release ID already exists in the the collection and replace=False """ version = baseImageToAdd.versionSpec.version.versionstring if self.HasBaseImage(version): if replace: self.RemoveBaseImage(version) else: raise KeyError('A Base Image with Version: %s already' ' exists in the collection' % version) if baseImageToAdd.releaseID in self: if replace: del self[baseImageToAdd.releaseID] else: raise KeyError('A Base Image with Release ID: %s already exists in' ' the collection' % baseImageToAdd.releaseID) self[baseImageToAdd.releaseID] = baseImageToAdd def HasBaseImage(self, version): """Checks if a Base Image with a version exists in the collection Params: * version - The version of the Base Image Return: True if the Base Image is in the collection, otherwise false """ try: self.GetBaseImage(version) except: return False return True def GetBaseImage(self, version): """Gets a Base Image based upon a version from the collection Params: * version - The version of the Base Image to get Return: A Base Image with version from the collection Exceptions: * KeyError - When no Base Image of the specified version exists in the collection """ for baseImage in self.values(): if baseImage.versionSpec.version.versionstring == version: return baseImage raise KeyError('Unable to find a Base Image with Version: %s' % version) def RemoveBaseImage(self, version): """Removes a Base Image from the collection Params: * version - The version of the Base Image to remove Exceptions: * KeyError - When no Base Image of the specified version exists in the collection """ baseImageToRemove = self.GetBaseImage(version) del self[baseImageToRemove.releaseID] def AddFromJSON(self, spec, validate=False): """The interface method to add base image object from JSON string. """ spec = _ToStr(spec) image = BaseImage.FromJSON(spec, validation=validate) self[image.releaseID] = image class AddonCollection(_ReleaseCollection): def AddAddon(self, addonToAdd, replace=False): """Adds an Addon to the collection Params: * addOnToAdd - The Addon to add to the collection * replace - If an Addon already exists with the same name/version or release ID, replace it. Exceptions: * KeyError - When attempting to add an Addon whose name, version or release Id already exists in the the collection and replace=False """ if self.HasAddon(addonToAdd.nameSpec.name, addonToAdd.versionSpec.version.versionstring): if replace: self.RemoveAddon(addonToAdd.nameSpec.name, addonToAdd.versionSpec.version.versionstring) else: raise KeyError('An Addon with Name: %s and Version: %s already' ' exists in the collection' % (addonToAdd.nameSpec.name, addonToAdd.versionSpec.version.versionstring)) # There could be the case where the ID we generate is already used if addonToAdd.releaseID in self: if replace: del self[addonToAdd.releaseID] else: raise KeyError('An Addon with Release ID: %s already exists in the' ' collection' % addonToAdd.releaseID) self[addonToAdd.releaseID] = addonToAdd def HasAddon(self, name, version): """Checks if an Addon with name and version exists in the collection Params: * name - The name of the Addon * version - The version of the Addon Return: True if the Addon was in the collection, otherwise false. """ try: self.GetAddon(name, version) except: return False return True def GetAddonsFromNameSpec(self, nameSpec): """Gets a list of Addons based upon name or name:version from the collection Params: * nameSpec - name or name:version of Addon to get Returns: A list of addon with name or name:version Exceptions: * KeyError - When the Addon we are trying to find doesn't exist in the collection """ addonList = [] if ':' in nameSpec: # Colon Spec. name, version = nameSpec.split(':') addon = self.GetAddon(name, version) addonList.append(addon) else: name = nameSpec for addon in self.values(): if addon.nameSpec.name == name: addonList.append(addon) return addonList def GetAddon(self, name, version): """Gets an Addon based upon name and version from the collection Params: * name - The name of the Addon to get * version - The version of the Addon to get Returns: An addon with name and version Exceptions: * KeyError - When the Addon we are trying to find doesn't exist in the collection """ for addon in self.values(): if (addon.nameSpec.name == name and addon.versionSpec.version.versionstring == version): return addon raise KeyError('Unable to find an Addon with Name: %s and Version: %s' % (name, version)) def RemoveAddon(self, name, version): """Removes an Addon from the collection Params: * name - The name of the Addon to remove * version - The version of the Addon to remove Exceptions: * KeyError - When the Addon we are trying to find doesn't exist in the collection """ addonToRemove = self.GetAddon(name, version) del self[addonToRemove.releaseID] def AddFromJSON(self, spec, validate=False): """The interface method to add addon object from JSON string. """ spec = _ToStr(spec) addon = Addon.FromJSON(spec, validation=validate) self[addon.releaseID] = addon def Copy(self): '''Copies AddonCollection object.''' addons = AddonCollection() for releaseId, addon in self.items(): addons[releaseId] = addon.Copy() return addons class SolutionCollection(_ReleaseCollection): def AddFromJSON(self, spec, validate=False): """The interface method to add solution object from JSON string. """ spec = _ToStr(spec) solution = Solution.FromJSON(spec, validation=validate) self[solution.releaseID] = solution def AddSolution(self, solutionToAdd, replace=False): """Adds a Solution to the collection Params: * solutionToAdd - The Solution to add to the collection * replace - If an Solution already exists with the same name/version or release ID, replace it. Exceptions: * KeyError - When attempting to add an Solution whose name/version or release Id already exists in the the collection and replace=False """ if self.HasSolution(solutionToAdd.nameSpec.name, solutionToAdd.versionSpec.version.versionstring): if replace: self.RemoveSolution(solutionToAdd.nameSpec.name, solutionToAdd.versionSpec.version.versionstring) else: raise KeyError('An Solution with Name: %s and Version: %s already' ' exists in the collection' % (solutionToAdd.nameSpec.name, solutionToAdd.versionSpec.version.versionstring)) if solutionToAdd.releaseID in self: if replace: del self[solutionToAdd.releaseID] else: raise KeyError('An Solution with Release ID: %s already exists in' ' the collection' % solutionToAdd.releaseID) self[solutionToAdd.releaseID] = solutionToAdd def HasSolution(self, name, version): """Checks if a solution with a name and version exists in the collection Params: * name - The name of the solution * version - The version of the solution Return: True if the solution was in the collection, otherwise false. """ try: self.GetSolution(name, version) except: return False return True def GetSolution(self, name, version): """Gets a solution base upon name and version from the collection Params: * name - The name of the solution to get * version - The version of the solution to get Exception: * KeyError - When a Solution with name and version can't be found Return: A solution with name and version """ for solution in self.values(): if (solution.nameSpec.name == name and solution.versionSpec.version.versionstring == version): return solution raise KeyError('Unable to find a Solution with Name: %s and Version: %s' % (name, version)) def RemoveSolution(self, name, version): """Removes a solution to the Solution Collection Exception: * KeyError - When a Solution with name and version can't be found Params: * name - The name of the solution to remove * version - The version of the solution to remove """ solutionToRemove = self.GetSolution(name, version) del self[solutionToRemove.releaseID] class ManifestCollection(_ReleaseCollection): def update(self, other): for manifest in other.values(): self._CheckManifestConflict(manifest) super(ManifestCollection, self).update(other) def AddManifest(self, manifestToAdd, replace=False): """Adds a manifest to the collection Params: * manifestToAdd - The manifest to add to the collection * replace - If a manifest already exists with the same release ID, replace it. Exceptions: * KeyError - When attempting to add a manifest whose release ID already exists in the the collection and replace is False """ self._CheckManifestConflict(manifestToAdd) releaseID = manifestToAdd.releaseID if releaseID in self: if replace: del self[releaseID] else: raise KeyError('A manifest with Release ID: %s already exists in' ' the collection' % releaseID) self[releaseID] = manifestToAdd def _CheckManifestConflict(self, manifestToAdd): """Check if the manifest to be added has same hardware support information and overlapping supported base image versions of any existing manifests in the manifest collection Params: * manifestToAdd - The manifest to be added to the manifest collection Exceptions: * ManifestValidationError - When attempting to add a manifest that has same hardware support information with an existing manifest and has overlapping supported base image versions with the existing manifest """ for manifest in self.values(): if manifest.releaseID == manifestToAdd.releaseID: continue if (manifest.hardwareSupportInfo == manifestToAdd.hardwareSupportInfo and _CheckSupportedBIVersionOverlap( manifest.supportedBaseImageVersions, manifestToAdd.supportedBaseImageVersions)): msg = ('An existing manifest: %s and the manifest to be added: %s' ' have same hardware support information and overlapping' ' supported base image versions' % (manifest.releaseID, manifestToAdd.releaseID)) raise ManifestValidationError(msg) def HasManifest(self, releaseID): """Checks if a manifest with a releaseID exists in the collection Params: * releaseID - The releaseID of the manifest Return: True if the manifest is in the collection, otherwise false """ return releaseID in self def GetManifest(self, releaseID): """Gets a manifest based upon a releaseID from the collection Params: * releaseID - The version of the manifest to get Return: A manifest with releaseID from the collection Exceptions: * KeyError - When no manifest of the specified releaseID exists in the collection """ try: return self[releaseID] except KeyError: raise KeyError('Unable to find a manifest with release ID: %s' % releaseID) def RemoveManifest(self, releaseID): """Removes a manifest from the collection Params: * releaseID - The releaseID of the manifest to remove Exceptions: * KeyError - When no manifest of the specified releaseID exists in the collection """ manifestToRemove = self.GetManifest(releaseID) del self[manifestToRemove.releaseID] def AddFromJSON(self, spec, validate=False): """The interface method to add manifest object from JSON string. """ spec = _ToStr(spec) manifest = Manifest.FromJSON(spec, validate) self.AddManifest(manifest, True) def Copy(self): '''Copies ManifestCollection object.''' manifests = ManifestCollection() for releaseId, manifest in self.items(): manifests[releaseId] = manifest.Copy() return manifests