######################################################################## # Copyright (C) 2010, 2014, 2018 VMWare, Inc. # # All Rights Reserved # ######################################################################## "This module contains classes that define acceptance level policies." import os from . import Errors from .Utils import XmlUtils etree = XmlUtils.FindElementTree() # Try to get the VibSign module. VibSignModule = None def loadVibSign(): """ VibSign is a dynamic library which has issue when load from file scope in embedded python. Lazy load when it is needed. """ global VibSignModule if VibSignModule is None: try: import VibSign VibSignModule = VibSign except ImportError: VibSignModule = None return VibSignModule try: import buildNumber vibtoolsDir = "vibtools-%s" % buildNumber.BUILDNUMBER except ImportError: vibtoolsDir = "vibtools" path1="/usr/share/certs" path2="/opt/vmware/%s/certs" % vibtoolsDir if os.path.exists(path1): CERTSDIRS = [path1] elif os.path.exists(path2): CERTSDIRS = [path2] else: CERTSDIRS = [os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir))] SCHEMADIR = XmlUtils.GetSchemaDir() class AcceptanceLevelPolicy(object): level = None def __init__(self): self.verifyobj = None self.schemaobj = None def SetSchema(self, schema=None): """Sets the schema to be used for validation. Parameters: * schema - The schema file to validate against. If not specified, defaults to 'vib20-extensibility.rng' located at SCHEMADIR. Raises: VibValidationError - unable to find the schema XML file, parse it, or otherwise obtain the schema object """ if schema is None: schema = os.path.join(SCHEMADIR, 'vib20-extensibility.rng') try: self.schemaobj = XmlUtils.GetSchemaObj(schema) except XmlUtils.ValidationError as e: msg = "Unable to obtain XML schema: %s" % e raise Errors.VibValidationError(None, [], msg) def VerifySignature(self, vib, checkCertDates=False): """Verify VIB descriptor is signed properly. """ if self.verifyobj is not None: vib.VerifySignature(self.verifyobj, checkCertDates=checkCertDates) def VerifySchema(self, vib, errmsg="failed a check of extensibility rules"): """Validates the VIB descriptor XML against a schema for checking things like extensibility rules. Parameters: * vib - The instance of BaseVib or ArFileVib to validate * errmsg - A custom error message to return Raises: * VibValidationError - if the schema validation failed. Each error along with the offending XML snippet is included in the errors attribute. """ if self.schemaobj is not None: result = XmlUtils.ValidateXml(vib.ToXml(), self.schemaobj) if not result: msg = ("VIB (%s) %s for acceptance level " "'%s': %s." % (vib.id, errmsg, self.level, result.errorstrings)) raise Errors.VibValidationError(vib.id, result.errorstrings, msg) def Verify(self, vib, checkCertDates=False): """Validates the acceptance level of this VIB against the policy for that acceptance level. Parameters: * vib - The instance of BaseVib or ArFileVib to validate. * checkCertDates - Whether to check time-validity issues (expired or not-yet-valid certs). Raises: * VibFormatError - The VIB specifies an invalid acceptance level. * VibValidationError - The VIB schema validation failed. * VibSigMissingError - The VIB is not signed. * VibSigFormatError - The VIB signature does not have the appropriate format. * VibSigInvalidError - The VIB signature cannot be verified to be signed by a trusted CA. * VibSigDigestError - The digest from the PKCS7 signature does not match a digest computed for the descriptor text. """ self.VerifySignature(vib, checkCertDates) self.VerifySchema(vib) class CommunityAcceptanceLevel(AcceptanceLevelPolicy): """This acceptance level policy does not validate signing, but does validate the VIB's XML against a schema. """ level = "community" def __init__(self): AcceptanceLevelPolicy.__init__(self) self.verifyobj = None # explicitly; no signing required. self.SetSchema() class PartnerAcceptanceLevel(AcceptanceLevelPolicy): """This acceptance level policy validates the VIB signer can be chained to any CA certificate in the certificates directory, and that the signer is not in any CRL in the certificates directory. It also validates the VIB's XML against a schema. """ level = "partner" def __init__(self): AcceptanceLevelPolicy.__init__(self) if loadVibSign() is None: msg = ("Can not instantiate '%s' policy: VibSign module missing." % self.level) raise Errors.AcceptanceConfigError(msg) self.SetSchema() cacerts=[] crls=[] for c in CERTSDIRS: certPath = os.path.join(c, "vmpartner.cert") crlPath = os.path.join(c, "vmpartner.crl") if os.path.isfile(certPath) and os.path.isfile(crlPath): cacerts.append(certPath) crls.append(crlPath) try: self.verifyobj = VibSignModule.VibSigner(cacerts=cacerts, crls=crls) except Exception as e: msg = "Fail to create VibSigner object, %s" % e raise Errors.VibCertStoreError(cacerts + crls , msg) def VerifySignature(self, vib, checkCertDates=False): signer = vib.VerifySignature(self.verifyobj, checkCertDates=checkCertDates) org = signer.subject.get("O", [""])[0].lower() ou = signer.subject.get("OU", [""])[0].lower() if org == "vmware inc" and ou == "acceptance signing": subject = ", ".join("=".join((k, i)) for k, v in signer.subject.items() for i in v) msg = ("Validating signer subject for '%s' level failed: Signer %s " "is not valid for this acceptance level." % (self.level, subject)) raise Errors.VibSigInvalidError(vib.id, msg) class AcceptedAcceptanceLevel(AcceptanceLevelPolicy): """This acceptance level policy validates the VIB signer can be chained to either the vmware.cert or the vmpartner.cert in the certificates directory. If the signer's certificate is issued by the vmpartner certificate, it must also specifically be signed by VMware's own partner certificate, and not a partner's. """ level = "accepted" def __init__(self): AcceptanceLevelPolicy.__init__(self) if loadVibSign() is None: msg = ("Can not instantiate '%s' policy: VibSign module missing." % self.level) raise Errors.AcceptanceConfigError(msg) self.schemaobj = None # explicitly; no extensibility check for accepted cacerts=[] crls=[] for c in CERTSDIRS: certPath = os.path.join(c, "vmpartner.cert") crlPath = os.path.join(c, "vmpartner.crl") if os.path.isfile(certPath) and os.path.isfile(crlPath): cacerts.append(certPath) crls.append(crlPath) try: self.verifyobj = VibSignModule.VibSigner(cacerts=cacerts, crls=crls) except Exception as e: msg = "Fail to create VibSigner object, %s" % e raise Errors.VibCertStoreError(cacerts + crls, msg) def VerifySignature(self, vib, checkCertDates=False): signer = vib.VerifySignature(self.verifyobj, checkCertDates=checkCertDates) org = signer.subject.get("O", [""])[0].lower() ou = signer.subject.get("OU", [""])[0].lower() if org != "vmware inc" or ou != "acceptance signing": subject = ", ".join("=".join((k, i)) for k, v in signer.subject.items() for i in v) msg = ("Validating signer subject for '%s' level failed: Signer %s " "is not valid for this acceptance level." % (self.level, subject)) raise Errors.VibSigInvalidError(vib.id, msg) class CertifiedAcceptanceLevel(AcceptanceLevelPolicy): """This acceptance level policy validates the VIB signer can be chained to the vmware.cert. """ level = "certified" def __init__(self): AcceptanceLevelPolicy.__init__(self) if loadVibSign() is None: msg = ("Can not instantiate '%s' policy: VibSign module missing." % self.level) raise Errors.AcceptanceConfigError(msg) self.schemaobj = None # explicitly; no extensibility check for certified cacerts=[] for c in CERTSDIRS: certPath = os.path.join(c, "vmware.cert") if os.path.isfile(certPath): cacerts.append(certPath) # I guess we won't ever have to revoke our own certificates? try: self.verifyobj = VibSignModule.VibSigner(cacerts=cacerts) except Exception as e: msg = "Fail to create VibSigner object, %s" % e raise Errors.VibCertStoreError(cacerts, msg) def GetPolicy(level): """Returns AcceptanceLevelPolicy object for the specified level. Use of this method is preferable, as it will retrieve objects from a cache if they exist there, otherwise it will try to instantiate a new object, raising an exception on an error. """ # Temporary code for compatibility: if level == "signed": level = "partner" elif level == "unsigned": level = "community" # End temporary code for compatibility. return POLICY_OBJECTS.setdefault(level, POLICY_CLASSES[level]()) def Initialize(certsdirs=[], schemadir=None): """Initializes acceptance level classes. Automatically called at module import time, but may be called again to re-initialize classes (i.e. to change CERTSDIRS or SCHEMADIR). Parameters: * certsdir - If specified, (re)sets the path in which to look for certificates. Useful for unit testing, or in an environment where a custom certs dir path is needed. * schemadirs - If specified, (re)sets the path in which to look for validation schema files. """ global CERTSDIRS, POLICY_OBJECTS, POLICY_CLASSES, SCHEMADIR if certsdirs: CERTSDIRS = certsdirs if schemadir is not None: SCHEMADIR = schemadir POLICY_CLASSES = dict() POLICY_OBJECTS = dict() for cls in (CommunityAcceptanceLevel, PartnerAcceptanceLevel, AcceptedAcceptanceLevel, CertifiedAcceptanceLevel): POLICY_CLASSES[cls.level] = cls try: POLICY_OBJECTS[cls.level] = cls() except Exception: pass Initialize()