import json import logging from optparse import Values from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence, Tuple, cast from pip._vendor.packaging.utils import canonicalize_name from pip._internal.cli import cmdoptions from pip._internal.cli.req_command import IndexGroupCommand from pip._internal.cli.status_codes import SUCCESS from pip._internal.exceptions import CommandError from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.metadata import BaseDistribution, get_environment from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession from pip._internal.utils.misc import stdlib_pkgs, tabulate, write_output from pip._internal.utils.parallel import map_multithread if TYPE_CHECKING: from pip._internal.metadata.base import DistributionVersion class _DistWithLatestInfo(BaseDistribution): """Give the distribution object a couple of extra fields. These will be populated during ``get_outdated()``. This is dirty but makes the rest of the code much cleaner. """ latest_version: DistributionVersion latest_filetype: str _ProcessedDists = Sequence[_DistWithLatestInfo] logger = logging.getLogger(__name__) class ListCommand(IndexGroupCommand): """ List installed packages, including editables. Packages are listed in a case-insensitive sorted order. """ ignore_require_venv = True usage = """ %prog [options]""" def add_options(self) -> None: self.cmd_opts.add_option( '-o', '--outdated', action='store_true', default=False, help='List outdated packages') self.cmd_opts.add_option( '-u', '--uptodate', action='store_true', default=False, help='List uptodate packages') self.cmd_opts.add_option( '-e', '--editable', action='store_true', default=False, help='List editable projects.') self.cmd_opts.add_option( '-l', '--local', action='store_true', default=False, help=('If in a virtualenv that has global access, do not list ' 'globally-installed packages.'), ) self.cmd_opts.add_option( '--user', dest='user', action='store_true', default=False, help='Only output packages installed in user-site.') self.cmd_opts.add_option(cmdoptions.list_path()) self.cmd_opts.add_option( '--pre', action='store_true', default=False, help=("Include pre-release and development versions. By default, " "pip only finds stable versions."), ) self.cmd_opts.add_option( '--format', action='store', dest='list_format', default="columns", choices=('columns', 'freeze', 'json'), help="Select the output format among: columns (default), freeze, " "or json", ) self.cmd_opts.add_option( '--not-required', action='store_true', dest='not_required', help="List packages that are not dependencies of " "installed packages.", ) self.cmd_opts.add_option( '--exclude-editable', action='store_false', dest='include_editable', help='Exclude editable package from output.', ) self.cmd_opts.add_option( '--include-editable', action='store_true', dest='include_editable', help='Include editable package from output.', default=True, ) self.cmd_opts.add_option(cmdoptions.list_exclude()) index_opts = cmdoptions.make_option_group( cmdoptions.index_group, self.parser ) self.parser.insert_option_group(0, index_opts) self.parser.insert_option_group(0, self.cmd_opts) def _build_package_finder( self, options: Values, session: PipSession ) -> PackageFinder: """ Create a package finder appropriate to this list command. """ link_collector = LinkCollector.create(session, options=options) # Pass allow_yanked=False to ignore yanked versions. selection_prefs = SelectionPreferences( allow_yanked=False, allow_all_prereleases=options.pre, ) return PackageFinder.create( link_collector=link_collector, selection_prefs=selection_prefs, ) def run(self, options: Values, args: List[str]) -> int: if options.outdated and options.uptodate: raise CommandError( "Options --outdated and --uptodate cannot be combined.") cmdoptions.check_list_path_option(options) skip = set(stdlib_pkgs) if options.excludes: skip.update(canonicalize_name(n) for n in options.excludes) packages: "_ProcessedDists" = [ cast("_DistWithLatestInfo", d) for d in get_environment(options.path).iter_installed_distributions( local_only=options.local, user_only=options.user, editables_only=options.editable, include_editables=options.include_editable, skip=skip, ) ] # get_not_required must be called firstly in order to find and # filter out all dependencies correctly. Otherwise a package # can't be identified as requirement because some parent packages # could be filtered out before. if options.not_required: packages = self.get_not_required(packages, options) if options.outdated: packages = self.get_outdated(packages, options) elif options.uptodate: packages = self.get_uptodate(packages, options) self.output_package_listing(packages, options) return SUCCESS def get_outdated( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": return [ dist for dist in self.iter_packages_latest_infos(packages, options) if dist.latest_version > dist.version ] def get_uptodate( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": return [ dist for dist in self.iter_packages_latest_infos(packages, options) if dist.latest_version == dist.version ] def get_not_required( self, packages: "_ProcessedDists", options: Values ) -> "_ProcessedDists": dep_keys = { canonicalize_name(dep.name) for dist in packages for dep in (dist.iter_dependencies() or ()) } # Create a set to remove duplicate packages, and cast it to a list # to keep the return type consistent with get_outdated and # get_uptodate return list({pkg for pkg in packages if pkg.canonical_name not in dep_keys}) def iter_packages_latest_infos( self, packages: "_ProcessedDists", options: Values ) -> Iterator["_DistWithLatestInfo"]: with self._build_session(options) as session: finder = self._build_package_finder(options, session) def latest_info( dist: "_DistWithLatestInfo" ) -> Optional["_DistWithLatestInfo"]: all_candidates = finder.find_all_candidates(dist.canonical_name) if not options.pre: # Remove prereleases all_candidates = [candidate for candidate in all_candidates if not candidate.version.is_prerelease] evaluator = finder.make_candidate_evaluator( project_name=dist.canonical_name, ) best_candidate = evaluator.sort_best_candidate(all_candidates) if best_candidate is None: return None remote_version = best_candidate.version if best_candidate.link.is_wheel: typ = 'wheel' else: typ = 'sdist' dist.latest_version = remote_version dist.latest_filetype = typ return dist for dist in map_multithread(latest_info, packages): if dist is not None: yield dist def output_package_listing( self, packages: "_ProcessedDists", options: Values ) -> None: packages = sorted( packages, key=lambda dist: dist.canonical_name, ) if options.list_format == 'columns' and packages: data, header = format_for_columns(packages, options) self.output_package_listing_columns(data, header) elif options.list_format == 'freeze': for dist in packages: if options.verbose >= 1: write_output("%s==%s (%s)", dist.raw_name, dist.version, dist.location) else: write_output("%s==%s", dist.raw_name, dist.version) elif options.list_format == 'json': write_output(format_for_json(packages, options)) def output_package_listing_columns( self, data: List[List[str]], header: List[str] ) -> None: # insert the header first: we need to know the size of column names if len(data) > 0: data.insert(0, header) pkg_strings, sizes = tabulate(data) # Create and add a separator. if len(data) > 0: pkg_strings.insert(1, " ".join(map(lambda x: '-' * x, sizes))) for val in pkg_strings: write_output(val) def format_for_columns( pkgs: "_ProcessedDists", options: Values ) -> Tuple[List[List[str]], List[str]]: """ Convert the package data into something usable by output_package_listing_columns. """ running_outdated = options.outdated # Adjust the header for the `pip list --outdated` case. if running_outdated: header = ["Package", "Version", "Latest", "Type"] else: header = ["Package", "Version"] data = [] if options.verbose >= 1 or any(x.editable for x in pkgs): header.append("Location") if options.verbose >= 1: header.append("Installer") for proj in pkgs: # if we're working on the 'outdated' list, separate out the # latest_version and type row = [proj.raw_name, str(proj.version)] if running_outdated: row.append(str(proj.latest_version)) row.append(proj.latest_filetype) if options.verbose >= 1 or proj.editable: row.append(proj.location or "") if options.verbose >= 1: row.append(proj.installer) data.append(row) return data, header def format_for_json(packages: "_ProcessedDists", options: Values) -> str: data = [] for dist in packages: info = { 'name': dist.raw_name, 'version': str(dist.version), } if options.verbose >= 1: info['location'] = dist.location or "" info['installer'] = dist.installer if options.outdated: info['latest_version'] = str(dist.latest_version) info['latest_filetype'] = dist.latest_filetype data.append(info) return json.dumps(data)