#!/usr/bin/python ############################################################## # Copyright 2010-2020 VMware, Inc. All rights reserved. # -- VMware Confidential ############################################################## import os import shutil import sys # Try to get a SHA256 method. try: from hashlib import sha256 HAVE_HASHLIB = True except ImportError: HAVE_HASHLIB = False from . import Scan from . import Vib from . import Errors from .Utils import IndexedDict, XmlUtils from .Utils.Misc import isString etree = XmlUtils.FindElementTree() HASH_BUF_SIZE = 16 * 1024 class ScanReport(object): """Result structure for the ScanAndReportIssues function. """ def __init__(self, missingDeps, obsolete, conflicts): """ 1. missingDeps: If Vib A requires Vib B (or a provided capability) but Vib B is not in the set of Vibs. The set of Vibs cannot be installed as a whole. 2. obsolete: If Vib A is replaced by Vib B, it is obsolete and does not need to be applied. Removing Vib A will remove the obsolete Vib. 3. conflicts: If Vib A and Vib B conflict, they cannot be installed together. A conflict is bidirectional. Removal of either one will resolve the conflict. """ self.missingDeps = missingDeps self.obsolete = obsolete self.conflicts = conflicts class VibCollection(IndexedDict.IndexedDict): """Implements a sequence of VIB objects and methods for establishing relationships between them and searching them easily. Attributes: * nameindex - A dict mapping VIB names to a set of VIB IDs * vendorindex - A dict mapping VIB vendors to a set of VIB IDs * verindex - A dict mapping VIB version strings to a set of VIB IDs * tagindex - A dict mapping VIB swtags to a set of VIB IDs """ INDEXES = ('name', 'vendor', 'versionstr', 'swtags') def __init__(self, *args): """Constructs a VibCollection object. """ IndexedDict.IndexedDict.__init__(self, indexedfields=self.INDEXES, *args) nameindex = property(lambda self: self.indexes['name']) vendorindex = property(lambda self: self.indexes['vendor']) verindex = property(lambda self: self.indexes['versionstr']) tagindex = property(lambda self: self.indexes['swtags']) def __iadd__(self, other): """Merges this collection with another collection. Parameters: * other - An instance of VibCollection """ for v in other.values(): self.AddVib(v) return self def __add__(self, other): """Merges this collection with another collection. The resulting collection will be a new object referencing the union of VIBs from self and other. VIB objects in the new VibCollection are not guaranteed to reference either distinct (new) VIB objects nor existing VIB objects, and may reference a combination of the two. Parameters: * other - An instance of VibCollection. Returns: A new VibCollection instance. Exceptions: None """ new = VibCollection() new.update(self) for v in other.values(): new.AddVib(v) return new def AddVib(self, vib): """Add a VIB object to collection. The exact VIB object is not guaranteed to be added. If the same VIB id is already in the collection, the Vib is updated with the combination of the two. Parameters: * vib - A VIB object to add. Returns: The added or updated VIB object in the collection Raises: * ValueError - If a VIB with the same ID exists in the collection, and the two VIB objects cannot be merged. """ vibid = vib.id if vibid in self and id(self[vibid]) != id(vib): self[vibid] = self[vibid].MergeVib(vib) else: self[vibid] = vib return self[vibid] def RemoveVib(self, vib): """Remove a VIB from collection. Parameters: * vib - either a string VIB ID or a Vib instance representing the VIB to remove from the profile. Returns: None Exceptions: * KeyError : if the VIB ID is not in the collection. """ if isString(vib): vibid = vib else: vibid = vib.id if vibid in self: del self[vibid] else: msg = "'%s' not in VibCollection" % (vibid) raise KeyError(msg) def AddVibFromXml(self, xml, origdesc, signature, validate = False, schema = Vib.BaseVib.VIB_SCHEMA): """Add a VIB object to this collection from XML. Parameters: * xml - Either an instance of ElementTree, or a string of XML-formatted data. * origdesc - Original descriptor data for the vib * signature- Original signature data for the vib * validate - If True, XML will be validated against a schema. If False, no validation will be done. Defaults to True. * schema - A file path giving the location of an VIB schema. Returns: The added or updated VIB object in the collection Exceptions: * VibFormatError """ return self.AddVib(Vib.ArFileVib.FromXml(xml, origdesc, signature, validate, schema)) def AddVibFromVibfile(self, path, validate = False, schema = Vib.BaseVib.VIB_SCHEMA): """Add a VIB object to collection from VIB file path. Parameters: * path - The path to a VIB file. * validate - If True, XML will be validated against a schema. If False, no validation will be done. Defaults to True. * schema - A file path giving the location of an VIB schema. Returns: The added or updated VIB object in the collection Exceptions: * VibFormatError * MetadataBuildError """ if not validate: schema = None vib = Vib.ArFileVib.FromFile(path, schema) vib.packedsize = os.stat(path).st_size if not HAVE_HASHLIB: msg = 'Do not have required haslib.' raise Errors.MetadataBuildError(msg) vib.checksum = Vib.Checksum("sha-256", _getdigest(path, 'sha256')) vib.Close() return self.AddVib(vib) def FromDirectory(self, path, ignoreinvalidfiles = False, validate = False, schema = Vib.BaseVib.VIB_SCHEMA, metadataonly = False): """Populate this VibCollection instance from a directory of VIB descriptors or a directory of VIB files. This method may replace existing VIBs in the collection. Parameters: * path - A string specifying a directory name. * ignoreinvalidfiles - If True, causes the method to silently ignore VibFormatError exceptions. Useful if a directory may contain both VIB content and other content. * validate - If True, VIBs will be validated against a schema. If False, no validation will be done. Defaults to False. * schema - A file path giving the location of an VIB schema. * metadataonly - If True, only metadata are available for the VIBs. Attempts to iterate payload content or create updated VIB will fail. If False, operations with payload content are allowed. Returns: None Exceptions: * VibIOError - The specified directory does not exist or cannot be read, or one or more files could not be read. * VibFormatError - One or more files were neither a valid descriptor nor a valid VIB archive. """ if not os.path.exists(path): msg = 'VibCollection directory %s does not exist.' % (path) raise Errors.VibIOError(msg) elif not os.path.isdir(path): msg = 'VibCollection path %s is not a directory.' % (path) raise Errors.VibIOError(msg) for root, dirs, files in os.walk(path, topdown=True): for name in files: filepath = os.path.join(root, name) # ignore any original descriptor or signature files if str(filepath).endswith(Vib.EXTENSION_ORIG_DESC) or \ str(filepath).endswith(Vib.EXTENSION_ORIG_SIG): continue try: vib = self.AddVibFromVibfile(filepath, validate, schema) r = filepath[(len(path)):] if r.startswith(os.sep): r = r[1:] # PR 665309: always use POSIX style seperator, which also works on # windows vib.relativepath = r.replace('\\', '/') if metadataonly: vib.Close() except Errors.VibFormatError as e: try: sigpath = filepath + Vib.EXTENSION_ORIG_SIG if os.path.exists(sigpath): sigfile = open(sigpath, 'rb') signature = sigfile.read() if len(signature) == 0: signature = None sigfile.close() else: signature = None origpath = filepath + Vib.EXTENSION_ORIG_DESC if os.path.exists(origpath): origfile = open(origpath) origdesc = origfile.read() if len(origdesc) == 0: origdesc = None origfile.close() else: origdesc = None with open(filepath, 'rb') as f: self.AddVibFromXml(f.read(), origdesc, signature, validate, schema) except (Errors.VibFormatError,Errors.VibValidationError) as e: if not ignoreinvalidfiles: raise except EnvironmentError as e: msg = 'Failed to create VIB obj from path %s: %s' % (filepath, e) raise Errors.VibIOError(msg) def ToDirectory(self, path, namingfunc=None, skipOrigAndSigFiles = False): """Write descriptors in the VibCollection to a directory. If the directory exists, the content of the directory will be clobbered. Parameters: * path - A string specifying a directory name. * namingfunc - A function pointer, return a short string with a VIB object as the only input and the string will be used as the file name of the descriptor. Return: None Exceptions: * VibIOError - The specified directory is not a directory or cannot create an empty directory """ try: if os.path.isdir(path): shutil.rmtree(path) os.makedirs(path) except EnvironmentError as e: msg = 'Could not create dir %s for VibCollection: %s' % (path, e) raise Errors.VibIOError(msg) if not os.path.isdir(path): msg = 'Failed to write VibCollection, %s is not a directory.' raise Errors.VibIOError(msg) if namingfunc is None: namingfunc = self.FilenameForVib for vib in self.values(): name = namingfunc(vib) filepath = os.path.join(path, name) sigpath = filepath + Vib.EXTENSION_ORIG_SIG origpath = filepath + Vib.EXTENSION_ORIG_DESC if sys.version_info[0] >= 3: encoding = 'unicode' else: encoding = 'us-ascii' try: descriptor = vib.ToXml() with open(filepath, 'w') as f: f.write(etree.tostring(descriptor, encoding=encoding)) except EnvironmentError as e: msg = 'Failed to write VIB descriptor to %s: %s' % (filepath, e) raise Errors.VibIOError(msg) try: if not skipOrigAndSigFiles: signature = vib.GetSignature() if signature is not None: with open(sigpath, 'wb') as f: f.write(signature) except EnvironmentError as e: msg = 'Failed to write VIB descriptor signature to %s: %s' % (sigpath, e) raise Errors.VibIOError(msg) try: if not skipOrigAndSigFiles: desc = vib.GetOrigDescriptor() if sys.version_info[0] >= 3 and isinstance(desc, bytes): desc = desc.decode() if desc is not None: with open(origpath, 'w') as f: f.write(desc) except EnvironmentError as e: msg = 'Failed to write the original VIB descriptor to %s: %s' % (origpath, e) raise Errors.VibIOError(msg) def Scan(self): """Builds dependency relationships between VIBs in the collection. Returns: A Scan.VibScanner instance. """ result = Scan.VibScanner() result.Scan(self) return result def ScanAndReportIssues(self): """Scan the collection of vibs and return the report with below mentioned issues: 1. missingDeps: If Vib A requires Vib B (or a provided capability) but Vib B is not in the set of Vibs. The set of Vibs cannot be installed as a whole. 2. obsolete: If Vib A is replaced by Vib B, it is obsolete and does not need to be applied. Removing Vib A will remove the obsolete Vib. 3. conflicts: If Vib A and Vib B conflict, they cannot be installed together. A conflict is bidirectional. Removal of either one will resolve the conflict. Parameters: * None. Returns: * Returns the ScanReport object with missingDeps, obsolete and conflicts information. Raises: None. """ scan = self.Scan() missingDeps = {} obsolete = {} conflicts = {} for vibid, report in scan.results.items(): if len(report.replacedBy) > 0: obsolete[vibid] = report.replacedBy for cap, vibids in report.depends.items(): if len(vibids) == 0: missingDeps.setdefault(cap, set()).add(vibid) if len(report.conflicts) > 0: conflicts[vibid] = report.conflicts return ScanReport(missingDeps, obsolete, conflicts) def FindVibsByColonSpec(self, colonspec, onevendor=False): """Finds VIB packages from the VibCollection using the colon-separated specification used by esxcli --vib parameter. Parameters: * colonspec - A string specifying the VIB packages to search for. Takes one of the following forms: : : :: where , , should match the corresponding attributes in the Vib exactly. If there is one colon, a : search will be tried first, followed by :. * onevendor - If True, throw an error if the matching VIBs are from more than one vendor Returns: A set of VIB IDs for the matching VIBs, or the empty set if no VIBs match. Raises: ValueError - if there are more than two colons in colonspec, or if onevendor is True and the matches are from more than one vendor """ parts = colonspec.split(':') vibs = set() if len(parts) == 1: # if parts[0] in self.nameindex: vibs = self.nameindex[parts[0]] elif len(parts) == 2: # Try : first if parts[0] in self.nameindex and parts[1] in self.verindex: vibs = self.nameindex[parts[0]] & self.verindex[parts[1]] # Then try : elif parts[0] in self.vendorindex and parts[1] in self.nameindex: vibs = self.vendorindex[parts[0]] & self.nameindex[parts[1]] elif len(parts) == 3: # :: if parts[0] in self.vendorindex and parts[1] in self.nameindex and \ parts[2] in self.verindex: vibs = self.vendorindex[parts[0]] & self.nameindex[parts[1]] & \ self.verindex[parts[2]] else: raise ValueError("Too many colons in VIB search specification '%s'. " "Try one of , :, " "::, or :." % (colonspec)) if onevendor and len(vibs): firstvendorvibs = self.vendorindex[self[list(vibs)[0]].vendor] # # Make sure vibs is a subset of the vibs from the same/first vendor if len(vibs - firstvendorvibs): raise ValueError("VIBs from more than one vendor matched search " "specification '%s'. Please try vendor:name " "instead to specify a particular vendor." % (colonspec)) if not vibs: raise Errors.NoMatchError('', "No VIB matching VIB search specification '%s'." % colonspec) return vibs @classmethod def FilenameForVib(self, vib): """Generate a short unique string based on VIB metadata""" return vib.name + '-' + str(hash(str(vib.id))) + '.xml' def _getdigest(path, alg): import hashlib hasher = getattr(hashlib, alg) h = hasher() remaining = os.stat(path).st_size f = open(path, 'rb') while remaining > 0: chunk = f.read(min(HASH_BUF_SIZE, remaining)) h.update(chunk) remaining -= len(chunk) f.close() return h.hexdigest()