| import logging |
| import shutil |
| import sys |
| import textwrap |
| import xmlrpc.client |
| from collections import OrderedDict |
| from optparse import Values |
| from typing import TYPE_CHECKING, Dict, List, Optional |
| |
| from pip._vendor.packaging.version import parse as parse_version |
| |
| from pip._internal.cli.base_command import Command |
| from pip._internal.cli.req_command import SessionCommandMixin |
| from pip._internal.cli.status_codes import NO_MATCHES_FOUND, SUCCESS |
| from pip._internal.exceptions import CommandError |
| from pip._internal.metadata import get_default_environment |
| from pip._internal.models.index import PyPI |
| from pip._internal.network.xmlrpc import PipXmlrpcTransport |
| from pip._internal.utils.logging import indent_log |
| from pip._internal.utils.misc import write_output |
| |
| if TYPE_CHECKING: |
| from typing import TypedDict |
| |
| class TransformedHit(TypedDict): |
| name: str |
| summary: str |
| versions: List[str] |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class SearchCommand(Command, SessionCommandMixin): |
| """Search for PyPI packages whose name or summary contains <query>.""" |
| |
| usage = """ |
| %prog [options] <query>""" |
| ignore_require_venv = True |
| |
| def add_options(self) -> None: |
| self.cmd_opts.add_option( |
| "-i", |
| "--index", |
| dest="index", |
| metavar="URL", |
| default=PyPI.pypi_url, |
| help="Base URL of Python Package Index (default %default)", |
| ) |
| |
| self.parser.insert_option_group(0, self.cmd_opts) |
| |
| def run(self, options: Values, args: List[str]) -> int: |
| if not args: |
| raise CommandError("Missing required argument (search query).") |
| query = args |
| pypi_hits = self.search(query, options) |
| hits = transform_hits(pypi_hits) |
| |
| terminal_width = None |
| if sys.stdout.isatty(): |
| terminal_width = shutil.get_terminal_size()[0] |
| |
| print_results(hits, terminal_width=terminal_width) |
| if pypi_hits: |
| return SUCCESS |
| return NO_MATCHES_FOUND |
| |
| def search(self, query: List[str], options: Values) -> List[Dict[str, str]]: |
| index_url = options.index |
| |
| session = self.get_default_session(options) |
| |
| transport = PipXmlrpcTransport(index_url, session) |
| pypi = xmlrpc.client.ServerProxy(index_url, transport) |
| try: |
| hits = pypi.search({"name": query, "summary": query}, "or") |
| except xmlrpc.client.Fault as fault: |
| message = "XMLRPC request failed [code: {code}]\n{string}".format( |
| code=fault.faultCode, |
| string=fault.faultString, |
| ) |
| raise CommandError(message) |
| assert isinstance(hits, list) |
| return hits |
| |
| |
| def transform_hits(hits: List[Dict[str, str]]) -> List["TransformedHit"]: |
| """ |
| The list from pypi is really a list of versions. We want a list of |
| packages with the list of versions stored inline. This converts the |
| list from pypi into one we can use. |
| """ |
| packages: Dict[str, "TransformedHit"] = OrderedDict() |
| for hit in hits: |
| name = hit["name"] |
| summary = hit["summary"] |
| version = hit["version"] |
| |
| if name not in packages.keys(): |
| packages[name] = { |
| "name": name, |
| "summary": summary, |
| "versions": [version], |
| } |
| else: |
| packages[name]["versions"].append(version) |
| |
| # if this is the highest version, replace summary and score |
| if version == highest_version(packages[name]["versions"]): |
| packages[name]["summary"] = summary |
| |
| return list(packages.values()) |
| |
| |
| def print_dist_installation_info(name: str, latest: str) -> None: |
| env = get_default_environment() |
| dist = env.get_distribution(name) |
| if dist is not None: |
| with indent_log(): |
| if dist.version == latest: |
| write_output("INSTALLED: %s (latest)", dist.version) |
| else: |
| write_output("INSTALLED: %s", dist.version) |
| if parse_version(latest).pre: |
| write_output( |
| "LATEST: %s (pre-release; install" |
| " with `pip install --pre`)", |
| latest, |
| ) |
| else: |
| write_output("LATEST: %s", latest) |
| |
| |
| def print_results( |
| hits: List["TransformedHit"], |
| name_column_width: Optional[int] = None, |
| terminal_width: Optional[int] = None, |
| ) -> None: |
| if not hits: |
| return |
| if name_column_width is None: |
| name_column_width = ( |
| max( |
| [ |
| len(hit["name"]) + len(highest_version(hit.get("versions", ["-"]))) |
| for hit in hits |
| ] |
| ) |
| + 4 |
| ) |
| |
| for hit in hits: |
| name = hit["name"] |
| summary = hit["summary"] or "" |
| latest = highest_version(hit.get("versions", ["-"])) |
| if terminal_width is not None: |
| target_width = terminal_width - name_column_width - 5 |
| if target_width > 10: |
| # wrap and indent summary to fit terminal |
| summary_lines = textwrap.wrap(summary, target_width) |
| summary = ("\n" + " " * (name_column_width + 3)).join(summary_lines) |
| |
| name_latest = f"{name} ({latest})" |
| line = f"{name_latest:{name_column_width}} - {summary}" |
| try: |
| write_output(line) |
| print_dist_installation_info(name, latest) |
| except UnicodeEncodeError: |
| pass |
| |
| |
| def highest_version(versions: List[str]) -> str: |
| return max(versions, key=parse_version) |