"""Support functions for working with wheel files. """ import logging from email.message import Message from email.parser import Parser from typing import Dict, Tuple from zipfile import BadZipFile, ZipFile from pip._vendor.packaging.utils import canonicalize_name from pip._vendor.pkg_resources import DistInfoDistribution, Distribution from pip._internal.exceptions import UnsupportedWheel from pip._internal.utils.pkg_resources import DictMetadata VERSION_COMPATIBLE = (1, 0) logger = logging.getLogger(__name__) class WheelMetadata(DictMetadata): """Metadata provider that maps metadata decoding exceptions to our internal exception type. """ def __init__(self, metadata: Dict[str, bytes], wheel_name: str) -> None: super().__init__(metadata) self._wheel_name = wheel_name def get_metadata(self, name: str) -> str: try: return super().get_metadata(name) except UnicodeDecodeError as e: # Augment the default error with the origin of the file. raise UnsupportedWheel( f"Error decoding metadata for {self._wheel_name}: {e}" ) def pkg_resources_distribution_for_wheel( wheel_zip: ZipFile, name: str, location: str ) -> Distribution: """Get a pkg_resources distribution given a wheel. :raises UnsupportedWheel: on any errors """ info_dir, _ = parse_wheel(wheel_zip, name) metadata_files = [p for p in wheel_zip.namelist() if p.startswith(f"{info_dir}/")] metadata_text: Dict[str, bytes] = {} for path in metadata_files: _, metadata_name = path.split("/", 1) try: metadata_text[metadata_name] = read_wheel_metadata_file(wheel_zip, path) except UnsupportedWheel as e: raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e))) metadata = WheelMetadata(metadata_text, location) return DistInfoDistribution(location=location, metadata=metadata, project_name=name) def parse_wheel(wheel_zip: ZipFile, name: str) -> Tuple[str, Message]: """Extract information from the provided wheel, ensuring it meets basic standards. Returns the name of the .dist-info directory and the parsed WHEEL metadata. """ try: info_dir = wheel_dist_info_dir(wheel_zip, name) metadata = wheel_metadata(wheel_zip, info_dir) version = wheel_version(metadata) except UnsupportedWheel as e: raise UnsupportedWheel("{} has an invalid wheel, {}".format(name, str(e))) check_compatibility(version, name) return info_dir, metadata def wheel_dist_info_dir(source: ZipFile, name: str) -> str: """Returns the name of the contained .dist-info directory. Raises AssertionError or UnsupportedWheel if not found, >1 found, or it doesn't match the provided name. """ # Zip file path separators must be / subdirs = {p.split("/", 1)[0] for p in source.namelist()} info_dirs = [s for s in subdirs if s.endswith(".dist-info")] if not info_dirs: raise UnsupportedWheel(".dist-info directory not found") if len(info_dirs) > 1: raise UnsupportedWheel( "multiple .dist-info directories found: {}".format(", ".join(info_dirs)) ) info_dir = info_dirs[0] info_dir_name = canonicalize_name(info_dir) canonical_name = canonicalize_name(name) if not info_dir_name.startswith(canonical_name): raise UnsupportedWheel( ".dist-info directory {!r} does not start with {!r}".format( info_dir, canonical_name ) ) return info_dir def read_wheel_metadata_file(source: ZipFile, path: str) -> bytes: try: return source.read(path) # BadZipFile for general corruption, KeyError for missing entry, # and RuntimeError for password-protected files except (BadZipFile, KeyError, RuntimeError) as e: raise UnsupportedWheel(f"could not read {path!r} file: {e!r}") def wheel_metadata(source: ZipFile, dist_info_dir: str) -> Message: """Return the WHEEL metadata of an extracted wheel, if possible. Otherwise, raise UnsupportedWheel. """ path = f"{dist_info_dir}/WHEEL" # Zip file path separators must be / wheel_contents = read_wheel_metadata_file(source, path) try: wheel_text = wheel_contents.decode() except UnicodeDecodeError as e: raise UnsupportedWheel(f"error decoding {path!r}: {e!r}") # FeedParser (used by Parser) does not raise any exceptions. The returned # message may have .defects populated, but for backwards-compatibility we # currently ignore them. return Parser().parsestr(wheel_text) def wheel_version(wheel_data: Message) -> Tuple[int, ...]: """Given WHEEL metadata, return the parsed Wheel-Version. Otherwise, raise UnsupportedWheel. """ version_text = wheel_data["Wheel-Version"] if version_text is None: raise UnsupportedWheel("WHEEL is missing Wheel-Version") version = version_text.strip() try: return tuple(map(int, version.split("."))) except ValueError: raise UnsupportedWheel(f"invalid Wheel-Version: {version!r}") def check_compatibility(version: Tuple[int, ...], name: str) -> None: """Raises errors or warns if called with an incompatible Wheel-Version. pip should refuse to install a Wheel-Version that's a major series ahead of what it's compatible with (e.g 2.0 > 1.1); and warn when installing a version only minor version ahead (e.g 1.2 > 1.1). version: a 2-tuple representing a Wheel-Version (Major, Minor) name: name of wheel or package to raise exception about :raises UnsupportedWheel: when an incompatible Wheel-Version is given """ if version[0] > VERSION_COMPATIBLE[0]: raise UnsupportedWheel( "{}'s Wheel-Version ({}) is not compatible with this version " "of pip".format(name, ".".join(map(str, version))) ) elif version > VERSION_COMPATIBLE: logger.warning( "Installing from a newer Wheel-Version (%s)", ".".join(map(str, version)), )