| """ |
| The main purpose of this module is to expose LinkCollector.collect_sources(). |
| """ |
| |
| import collections |
| import email.message |
| import functools |
| import itertools |
| import json |
| import logging |
| import os |
| import urllib.parse |
| import urllib.request |
| from html.parser import HTMLParser |
| from optparse import Values |
| from typing import ( |
| TYPE_CHECKING, |
| Callable, |
| Dict, |
| Iterable, |
| List, |
| MutableMapping, |
| NamedTuple, |
| Optional, |
| Sequence, |
| Tuple, |
| Union, |
| ) |
| |
| from pip._vendor import requests |
| from pip._vendor.requests import Response |
| from pip._vendor.requests.exceptions import RetryError, SSLError |
| |
| from pip._internal.exceptions import NetworkConnectionError |
| from pip._internal.models.link import Link |
| from pip._internal.models.search_scope import SearchScope |
| from pip._internal.network.session import PipSession |
| from pip._internal.network.utils import raise_for_status |
| from pip._internal.utils.filetypes import is_archive_file |
| from pip._internal.utils.misc import redact_auth_from_url |
| from pip._internal.vcs import vcs |
| |
| from .sources import CandidatesFromPage, LinkSource, build_source |
| |
| if TYPE_CHECKING: |
| from typing import Protocol |
| else: |
| Protocol = object |
| |
| logger = logging.getLogger(__name__) |
| |
| ResponseHeaders = MutableMapping[str, str] |
| |
| |
| def _match_vcs_scheme(url: str) -> Optional[str]: |
| """Look for VCS schemes in the URL. |
| |
| Returns the matched VCS scheme, or None if there's no match. |
| """ |
| for scheme in vcs.schemes: |
| if url.lower().startswith(scheme) and url[len(scheme)] in "+:": |
| return scheme |
| return None |
| |
| |
| class _NotAPIContent(Exception): |
| def __init__(self, content_type: str, request_desc: str) -> None: |
| super().__init__(content_type, request_desc) |
| self.content_type = content_type |
| self.request_desc = request_desc |
| |
| |
| def _ensure_api_header(response: Response) -> None: |
| """ |
| Check the Content-Type header to ensure the response contains a Simple |
| API Response. |
| |
| Raises `_NotAPIContent` if the content type is not a valid content-type. |
| """ |
| content_type = response.headers.get("Content-Type", "Unknown") |
| |
| content_type_l = content_type.lower() |
| if content_type_l.startswith( |
| ( |
| "text/html", |
| "application/vnd.pypi.simple.v1+html", |
| "application/vnd.pypi.simple.v1+json", |
| ) |
| ): |
| return |
| |
| raise _NotAPIContent(content_type, response.request.method) |
| |
| |
| class _NotHTTP(Exception): |
| pass |
| |
| |
| def _ensure_api_response(url: str, session: PipSession) -> None: |
| """ |
| Send a HEAD request to the URL, and ensure the response contains a simple |
| API Response. |
| |
| Raises `_NotHTTP` if the URL is not available for a HEAD request, or |
| `_NotAPIContent` if the content type is not a valid content type. |
| """ |
| scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url) |
| if scheme not in {"http", "https"}: |
| raise _NotHTTP() |
| |
| resp = session.head(url, allow_redirects=True) |
| raise_for_status(resp) |
| |
| _ensure_api_header(resp) |
| |
| |
| def _get_simple_response(url: str, session: PipSession) -> Response: |
| """Access an Simple API response with GET, and return the response. |
| |
| This consists of three parts: |
| |
| 1. If the URL looks suspiciously like an archive, send a HEAD first to |
| check the Content-Type is HTML or Simple API, to avoid downloading a |
| large file. Raise `_NotHTTP` if the content type cannot be determined, or |
| `_NotAPIContent` if it is not HTML or a Simple API. |
| 2. Actually perform the request. Raise HTTP exceptions on network failures. |
| 3. Check the Content-Type header to make sure we got a Simple API response, |
| and raise `_NotAPIContent` otherwise. |
| """ |
| if is_archive_file(Link(url).filename): |
| _ensure_api_response(url, session=session) |
| |
| logger.debug("Getting page %s", redact_auth_from_url(url)) |
| |
| resp = session.get( |
| url, |
| headers={ |
| "Accept": ", ".join( |
| [ |
| "application/vnd.pypi.simple.v1+json", |
| "application/vnd.pypi.simple.v1+html; q=0.1", |
| "text/html; q=0.01", |
| ] |
| ), |
| # We don't want to blindly returned cached data for |
| # /simple/, because authors generally expecting that |
| # twine upload && pip install will function, but if |
| # they've done a pip install in the last ~10 minutes |
| # it won't. Thus by setting this to zero we will not |
| # blindly use any cached data, however the benefit of |
| # using max-age=0 instead of no-cache, is that we will |
| # still support conditional requests, so we will still |
| # minimize traffic sent in cases where the page hasn't |
| # changed at all, we will just always incur the round |
| # trip for the conditional GET now instead of only |
| # once per 10 minutes. |
| # For more information, please see pypa/pip#5670. |
| "Cache-Control": "max-age=0", |
| }, |
| ) |
| raise_for_status(resp) |
| |
| # The check for archives above only works if the url ends with |
| # something that looks like an archive. However that is not a |
| # requirement of an url. Unless we issue a HEAD request on every |
| # url we cannot know ahead of time for sure if something is a |
| # Simple API response or not. However we can check after we've |
| # downloaded it. |
| _ensure_api_header(resp) |
| |
| logger.debug( |
| "Fetched page %s as %s", |
| redact_auth_from_url(url), |
| resp.headers.get("Content-Type", "Unknown"), |
| ) |
| |
| return resp |
| |
| |
| def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]: |
| """Determine if we have any encoding information in our headers.""" |
| if headers and "Content-Type" in headers: |
| m = email.message.Message() |
| m["content-type"] = headers["Content-Type"] |
| charset = m.get_param("charset") |
| if charset: |
| return str(charset) |
| return None |
| |
| |
| class CacheablePageContent: |
| def __init__(self, page: "IndexContent") -> None: |
| assert page.cache_link_parsing |
| self.page = page |
| |
| def __eq__(self, other: object) -> bool: |
| return isinstance(other, type(self)) and self.page.url == other.page.url |
| |
| def __hash__(self) -> int: |
| return hash(self.page.url) |
| |
| |
| class ParseLinks(Protocol): |
| def __call__(self, page: "IndexContent") -> Iterable[Link]: |
| ... |
| |
| |
| def with_cached_index_content(fn: ParseLinks) -> ParseLinks: |
| """ |
| Given a function that parses an Iterable[Link] from an IndexContent, cache the |
| function's result (keyed by CacheablePageContent), unless the IndexContent |
| `page` has `page.cache_link_parsing == False`. |
| """ |
| |
| @functools.lru_cache(maxsize=None) |
| def wrapper(cacheable_page: CacheablePageContent) -> List[Link]: |
| return list(fn(cacheable_page.page)) |
| |
| @functools.wraps(fn) |
| def wrapper_wrapper(page: "IndexContent") -> List[Link]: |
| if page.cache_link_parsing: |
| return wrapper(CacheablePageContent(page)) |
| return list(fn(page)) |
| |
| return wrapper_wrapper |
| |
| |
| @with_cached_index_content |
| def parse_links(page: "IndexContent") -> Iterable[Link]: |
| """ |
| Parse a Simple API's Index Content, and yield its anchor elements as Link objects. |
| """ |
| |
| content_type_l = page.content_type.lower() |
| if content_type_l.startswith("application/vnd.pypi.simple.v1+json"): |
| data = json.loads(page.content) |
| for file in data.get("files", []): |
| link = Link.from_json(file, page.url) |
| if link is None: |
| continue |
| yield link |
| return |
| |
| parser = HTMLLinkParser(page.url) |
| encoding = page.encoding or "utf-8" |
| parser.feed(page.content.decode(encoding)) |
| |
| url = page.url |
| base_url = parser.base_url or url |
| for anchor in parser.anchors: |
| link = Link.from_element(anchor, page_url=url, base_url=base_url) |
| if link is None: |
| continue |
| yield link |
| |
| |
| class IndexContent: |
| """Represents one response (or page), along with its URL""" |
| |
| def __init__( |
| self, |
| content: bytes, |
| content_type: str, |
| encoding: Optional[str], |
| url: str, |
| cache_link_parsing: bool = True, |
| ) -> None: |
| """ |
| :param encoding: the encoding to decode the given content. |
| :param url: the URL from which the HTML was downloaded. |
| :param cache_link_parsing: whether links parsed from this page's url |
| should be cached. PyPI index urls should |
| have this set to False, for example. |
| """ |
| self.content = content |
| self.content_type = content_type |
| self.encoding = encoding |
| self.url = url |
| self.cache_link_parsing = cache_link_parsing |
| |
| def __str__(self) -> str: |
| return redact_auth_from_url(self.url) |
| |
| |
| class HTMLLinkParser(HTMLParser): |
| """ |
| HTMLParser that keeps the first base HREF and a list of all anchor |
| elements' attributes. |
| """ |
| |
| def __init__(self, url: str) -> None: |
| super().__init__(convert_charrefs=True) |
| |
| self.url: str = url |
| self.base_url: Optional[str] = None |
| self.anchors: List[Dict[str, Optional[str]]] = [] |
| |
| def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None: |
| if tag == "base" and self.base_url is None: |
| href = self.get_href(attrs) |
| if href is not None: |
| self.base_url = href |
| elif tag == "a": |
| self.anchors.append(dict(attrs)) |
| |
| def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]: |
| for name, value in attrs: |
| if name == "href": |
| return value |
| return None |
| |
| |
| def _handle_get_simple_fail( |
| link: Link, |
| reason: Union[str, Exception], |
| meth: Optional[Callable[..., None]] = None, |
| ) -> None: |
| if meth is None: |
| meth = logger.debug |
| meth("Could not fetch URL %s: %s - skipping", link, reason) |
| |
| |
| def _make_index_content( |
| response: Response, cache_link_parsing: bool = True |
| ) -> IndexContent: |
| encoding = _get_encoding_from_headers(response.headers) |
| return IndexContent( |
| response.content, |
| response.headers["Content-Type"], |
| encoding=encoding, |
| url=response.url, |
| cache_link_parsing=cache_link_parsing, |
| ) |
| |
| |
| def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]: |
| url = link.url.split("#", 1)[0] |
| |
| # Check for VCS schemes that do not support lookup as web pages. |
| vcs_scheme = _match_vcs_scheme(url) |
| if vcs_scheme: |
| logger.warning( |
| "Cannot look at %s URL %s because it does not support lookup as web pages.", |
| vcs_scheme, |
| link, |
| ) |
| return None |
| |
| # Tack index.html onto file:// URLs that point to directories |
| scheme, _, path, _, _, _ = urllib.parse.urlparse(url) |
| if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)): |
| # add trailing slash if not present so urljoin doesn't trim |
| # final segment |
| if not url.endswith("/"): |
| url += "/" |
| # TODO: In the future, it would be nice if pip supported PEP 691 |
| # style respones in the file:// URLs, however there's no |
| # standard file extension for application/vnd.pypi.simple.v1+json |
| # so we'll need to come up with something on our own. |
| url = urllib.parse.urljoin(url, "index.html") |
| logger.debug(" file: URL is directory, getting %s", url) |
| |
| try: |
| resp = _get_simple_response(url, session=session) |
| except _NotHTTP: |
| logger.warning( |
| "Skipping page %s because it looks like an archive, and cannot " |
| "be checked by a HTTP HEAD request.", |
| link, |
| ) |
| except _NotAPIContent as exc: |
| logger.warning( |
| "Skipping page %s because the %s request got Content-Type: %s. " |
| "The only supported Content-Types are application/vnd.pypi.simple.v1+json, " |
| "application/vnd.pypi.simple.v1+html, and text/html", |
| link, |
| exc.request_desc, |
| exc.content_type, |
| ) |
| except NetworkConnectionError as exc: |
| _handle_get_simple_fail(link, exc) |
| except RetryError as exc: |
| _handle_get_simple_fail(link, exc) |
| except SSLError as exc: |
| reason = "There was a problem confirming the ssl certificate: " |
| reason += str(exc) |
| _handle_get_simple_fail(link, reason, meth=logger.info) |
| except requests.ConnectionError as exc: |
| _handle_get_simple_fail(link, f"connection error: {exc}") |
| except requests.Timeout: |
| _handle_get_simple_fail(link, "timed out") |
| else: |
| return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing) |
| return None |
| |
| |
| class CollectedSources(NamedTuple): |
| find_links: Sequence[Optional[LinkSource]] |
| index_urls: Sequence[Optional[LinkSource]] |
| |
| |
| class LinkCollector: |
| |
| """ |
| Responsible for collecting Link objects from all configured locations, |
| making network requests as needed. |
| |
| The class's main method is its collect_sources() method. |
| """ |
| |
| def __init__( |
| self, |
| session: PipSession, |
| search_scope: SearchScope, |
| ) -> None: |
| self.search_scope = search_scope |
| self.session = session |
| |
| @classmethod |
| def create( |
| cls, |
| session: PipSession, |
| options: Values, |
| suppress_no_index: bool = False, |
| ) -> "LinkCollector": |
| """ |
| :param session: The Session to use to make requests. |
| :param suppress_no_index: Whether to ignore the --no-index option |
| when constructing the SearchScope object. |
| """ |
| index_urls = [options.index_url] + options.extra_index_urls |
| if options.no_index and not suppress_no_index: |
| logger.debug( |
| "Ignoring indexes: %s", |
| ",".join(redact_auth_from_url(url) for url in index_urls), |
| ) |
| index_urls = [] |
| |
| # Make sure find_links is a list before passing to create(). |
| find_links = options.find_links or [] |
| |
| search_scope = SearchScope.create( |
| find_links=find_links, |
| index_urls=index_urls, |
| no_index=options.no_index, |
| ) |
| link_collector = LinkCollector( |
| session=session, |
| search_scope=search_scope, |
| ) |
| return link_collector |
| |
| @property |
| def find_links(self) -> List[str]: |
| return self.search_scope.find_links |
| |
| def fetch_response(self, location: Link) -> Optional[IndexContent]: |
| """ |
| Fetch an HTML page containing package links. |
| """ |
| return _get_index_content(location, session=self.session) |
| |
| def collect_sources( |
| self, |
| project_name: str, |
| candidates_from_page: CandidatesFromPage, |
| ) -> CollectedSources: |
| # The OrderedDict calls deduplicate sources by URL. |
| index_url_sources = collections.OrderedDict( |
| build_source( |
| loc, |
| candidates_from_page=candidates_from_page, |
| page_validator=self.session.is_secure_origin, |
| expand_dir=False, |
| cache_link_parsing=False, |
| ) |
| for loc in self.search_scope.get_index_urls_locations(project_name) |
| ).values() |
| find_links_sources = collections.OrderedDict( |
| build_source( |
| loc, |
| candidates_from_page=candidates_from_page, |
| page_validator=self.session.is_secure_origin, |
| expand_dir=True, |
| cache_link_parsing=True, |
| ) |
| for loc in self.find_links |
| ).values() |
| |
| if logger.isEnabledFor(logging.DEBUG): |
| lines = [ |
| f"* {s.link}" |
| for s in itertools.chain(find_links_sources, index_url_sources) |
| if s is not None and s.link is not None |
| ] |
| lines = [ |
| f"{len(lines)} location(s) to search " |
| f"for versions of {project_name}:" |
| ] + lines |
| logger.debug("\n".join(lines)) |
| |
| return CollectedSources( |
| find_links=list(find_links_sources), |
| index_urls=list(index_url_sources), |
| ) |