######################################################################## # Copyright (C) 2019 VMWare, Inc. # # All Rights Reserved # ######################################################################## """This module contains utils for handling config schemas in VIB as software tags and in depot as metadata. """ import hashlib import json import os import shutil from .Utils.Misc import byteToStr class ConfigSchemaSoftwareTag(object): """Class represents a config schema software tag in VIB. """ CONFIG_SCHEMA_MAGIC = 'ConfigSchema' SEPARATOR = ':' def __init__(self, schemaId, vibPayload, payloadFilePath, checksumType, checksumHex): self.schemaId = schemaId self.vibPayload = vibPayload self.payloadFilePath = payloadFilePath self.checksumType = checksumType self.checksumHex = checksumHex def ToString(self): """Returns the software tag string. """ return self.SEPARATOR.join( (self.CONFIG_SCHEMA_MAGIC, self.schemaId, self.vibPayload, self.payloadFilePath, self.checksumType, self.checksumHex)) @classmethod def FromString(cls, tag): """Converts a software tag string to object. """ parts = tag.split(cls.SEPARATOR) if len(parts) != 6 or parts[0] != cls.CONFIG_SCHEMA_MAGIC: raise ValueError('Input does not appear to be a config schema ' 'software tag') return cls(parts[1], parts[2], parts[3], parts[4], parts[5]) @classmethod def FromPayloadFile(cls, filePath, payloadName, payloadFilePath): """Generate an object using the schema file, the name of payload it belongs to, and its member path in the payload. """ schema = ConfigSchema.FromFile(filePath) checksumType, checksumHex = schema.checksum return cls(schema.schemaId, payloadName, payloadFilePath, checksumType, checksumHex) class ConfigSchema(object): """A simple class that represents image-relevant attributes of a config schema. """ # In VIB checksum types all contain dashes. HASH_TYPE = 'sha-256' def __init__(self, schemaStr): self._schemaStr = schemaStr jsonDict = json.loads(schemaStr) metadataNode = jsonDict.get('metadata', dict()) self.vibName = metadataNode.get('vibname', None) self.vibVersion = metadataNode.get('vibversion', None) if not self.vibName or not self.vibVersion: raise ValueError('VIB name and version cannot be empty') def __eq__(self, other): return (self.vibName == other.vibName and self.vibVersion == other.vibVersion and self._schemaStr == other._schemaStr) @property def schemaId(self): """ID of the schema is formed by VIB name and version. """ return '%s-%s' % (self.vibName, self.vibVersion) @property def checksum(self): """Returns a tuple of checksum type and hex checksum. """ hashObj = hashlib.new(self.HASH_TYPE.replace('-', '')) hashObj.update(self._schemaStr.encode()) return self.HASH_TYPE, hashObj.hexdigest() @property def fileName(self): """Generates the file name of the schema. """ return '%s-schema.json' % self.schemaId @classmethod def FromFile(cls, filePath): with open(filePath, 'r') as fobj: return cls(fobj.read()) def WriteFile(self, filePath): with open(filePath, 'w') as fobj: fobj.write(self._schemaStr) class ConfigSchemaCollection(dict): """A collection of config schema objects. """ def __add__(self, other): """Merge two objects and return a new one. """ new = self.__class__() for cs in self.values(): new.AddConfigSchema(cs) new += other return new def __iadd__(self, other): for cs in other.values(): self.AddConfigSchema(cs) return self def AddFromJSON(self, jsonStr, validate=False): """Adds a config schema from JSON string. Currently validate has not effect, but is required to match other collections' API. """ jsonStr = byteToStr(jsonStr) cs = ConfigSchema(jsonStr) self.AddConfigSchema(cs) def AddConfigSchema(self, cs): """Adds a config schema. """ if cs.schemaId in self: if self[cs.schemaId] != cs: raise ValueError('Unequal config schemas share the same ID %s' % cs.schemaId) else: self[cs.schemaId] = cs def FromDirectory(self, path): """Populates the collection with files in a directory. This clears the collection before populating the objects. """ if not os.path.exists(path): raise RuntimeError('Directory %s does not exist' % path) elif not os.path.isdir(path): raise RuntimeError('Path %s is not a directory' % path) self.clear() for root, _, files in os.walk(path, topdown=True): for name in files: filePath = os.path.join(root, name) cs = ConfigSchema.FromFile(filePath) self.AddConfigSchema(cs) def ToDirectory(self, path): """Writes config schemas into a directory. If the directory exists, the content of the directory will be clobbered. """ if os.path.isdir(path): shutil.rmtree(path) os.makedirs(path) for cs in self.values(): cs.WriteFile(os.path.join(path, cs.fileName))