| """Cache Management |
| """ |
| |
| import hashlib |
| import json |
| import logging |
| import os |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Set |
| |
| from pip._vendor.packaging.tags import Tag, interpreter_name, interpreter_version |
| from pip._vendor.packaging.utils import canonicalize_name |
| |
| from pip._internal.exceptions import InvalidWheelFilename |
| from pip._internal.models.direct_url import DirectUrl |
| from pip._internal.models.format_control import FormatControl |
| from pip._internal.models.link import Link |
| from pip._internal.models.wheel import Wheel |
| from pip._internal.utils.temp_dir import TempDirectory, tempdir_kinds |
| from pip._internal.utils.urls import path_to_url |
| |
| logger = logging.getLogger(__name__) |
| |
| ORIGIN_JSON_NAME = "origin.json" |
| |
| |
| def _hash_dict(d: Dict[str, str]) -> str: |
| """Return a stable sha224 of a dictionary.""" |
| s = json.dumps(d, sort_keys=True, separators=(",", ":"), ensure_ascii=True) |
| return hashlib.sha224(s.encode("ascii")).hexdigest() |
| |
| |
| class Cache: |
| """An abstract class - provides cache directories for data from links |
| |
| |
| :param cache_dir: The root of the cache. |
| :param format_control: An object of FormatControl class to limit |
| binaries being read from the cache. |
| :param allowed_formats: which formats of files the cache should store. |
| ('binary' and 'source' are the only allowed values) |
| """ |
| |
| def __init__( |
| self, cache_dir: str, format_control: FormatControl, allowed_formats: Set[str] |
| ) -> None: |
| super().__init__() |
| assert not cache_dir or os.path.isabs(cache_dir) |
| self.cache_dir = cache_dir or None |
| self.format_control = format_control |
| self.allowed_formats = allowed_formats |
| |
| _valid_formats = {"source", "binary"} |
| assert self.allowed_formats.union(_valid_formats) == _valid_formats |
| |
| def _get_cache_path_parts(self, link: Link) -> List[str]: |
| """Get parts of part that must be os.path.joined with cache_dir""" |
| |
| # We want to generate an url to use as our cache key, we don't want to |
| # just re-use the URL because it might have other items in the fragment |
| # and we don't care about those. |
| key_parts = {"url": link.url_without_fragment} |
| if link.hash_name is not None and link.hash is not None: |
| key_parts[link.hash_name] = link.hash |
| if link.subdirectory_fragment: |
| key_parts["subdirectory"] = link.subdirectory_fragment |
| |
| # Include interpreter name, major and minor version in cache key |
| # to cope with ill-behaved sdists that build a different wheel |
| # depending on the python version their setup.py is being run on, |
| # and don't encode the difference in compatibility tags. |
| # https://github.com/pypa/pip/issues/7296 |
| key_parts["interpreter_name"] = interpreter_name() |
| key_parts["interpreter_version"] = interpreter_version() |
| |
| # Encode our key url with sha224, we'll use this because it has similar |
| # security properties to sha256, but with a shorter total output (and |
| # thus less secure). However the differences don't make a lot of |
| # difference for our use case here. |
| hashed = _hash_dict(key_parts) |
| |
| # We want to nest the directories some to prevent having a ton of top |
| # level directories where we might run out of sub directories on some |
| # FS. |
| parts = [hashed[:2], hashed[2:4], hashed[4:6], hashed[6:]] |
| |
| return parts |
| |
| def _get_candidates(self, link: Link, canonical_package_name: str) -> List[Any]: |
| can_not_cache = not self.cache_dir or not canonical_package_name or not link |
| if can_not_cache: |
| return [] |
| |
| formats = self.format_control.get_allowed_formats(canonical_package_name) |
| if not self.allowed_formats.intersection(formats): |
| return [] |
| |
| candidates = [] |
| path = self.get_path_for_link(link) |
| if os.path.isdir(path): |
| for candidate in os.listdir(path): |
| candidates.append((candidate, path)) |
| return candidates |
| |
| def get_path_for_link(self, link: Link) -> str: |
| """Return a directory to store cached items in for link.""" |
| raise NotImplementedError() |
| |
| def get( |
| self, |
| link: Link, |
| package_name: Optional[str], |
| supported_tags: List[Tag], |
| ) -> Link: |
| """Returns a link to a cached item if it exists, otherwise returns the |
| passed link. |
| """ |
| raise NotImplementedError() |
| |
| |
| class SimpleWheelCache(Cache): |
| """A cache of wheels for future installs.""" |
| |
| def __init__(self, cache_dir: str, format_control: FormatControl) -> None: |
| super().__init__(cache_dir, format_control, {"binary"}) |
| |
| def get_path_for_link(self, link: Link) -> str: |
| """Return a directory to store cached wheels for link |
| |
| Because there are M wheels for any one sdist, we provide a directory |
| to cache them in, and then consult that directory when looking up |
| cache hits. |
| |
| We only insert things into the cache if they have plausible version |
| numbers, so that we don't contaminate the cache with things that were |
| not unique. E.g. ./package might have dozens of installs done for it |
| and build a version of 0.0...and if we built and cached a wheel, we'd |
| end up using the same wheel even if the source has been edited. |
| |
| :param link: The link of the sdist for which this will cache wheels. |
| """ |
| parts = self._get_cache_path_parts(link) |
| assert self.cache_dir |
| # Store wheels within the root cache_dir |
| return os.path.join(self.cache_dir, "wheels", *parts) |
| |
| def get( |
| self, |
| link: Link, |
| package_name: Optional[str], |
| supported_tags: List[Tag], |
| ) -> Link: |
| candidates = [] |
| |
| if not package_name: |
| return link |
| |
| canonical_package_name = canonicalize_name(package_name) |
| for wheel_name, wheel_dir in self._get_candidates(link, canonical_package_name): |
| try: |
| wheel = Wheel(wheel_name) |
| except InvalidWheelFilename: |
| continue |
| if canonicalize_name(wheel.name) != canonical_package_name: |
| logger.debug( |
| "Ignoring cached wheel %s for %s as it " |
| "does not match the expected distribution name %s.", |
| wheel_name, |
| link, |
| package_name, |
| ) |
| continue |
| if not wheel.supported(supported_tags): |
| # Built for a different python/arch/etc |
| continue |
| candidates.append( |
| ( |
| wheel.support_index_min(supported_tags), |
| wheel_name, |
| wheel_dir, |
| ) |
| ) |
| |
| if not candidates: |
| return link |
| |
| _, wheel_name, wheel_dir = min(candidates) |
| return Link(path_to_url(os.path.join(wheel_dir, wheel_name))) |
| |
| |
| class EphemWheelCache(SimpleWheelCache): |
| """A SimpleWheelCache that creates it's own temporary cache directory""" |
| |
| def __init__(self, format_control: FormatControl) -> None: |
| self._temp_dir = TempDirectory( |
| kind=tempdir_kinds.EPHEM_WHEEL_CACHE, |
| globally_managed=True, |
| ) |
| |
| super().__init__(self._temp_dir.path, format_control) |
| |
| |
| class CacheEntry: |
| def __init__( |
| self, |
| link: Link, |
| persistent: bool, |
| ): |
| self.link = link |
| self.persistent = persistent |
| self.origin: Optional[DirectUrl] = None |
| origin_direct_url_path = Path(self.link.file_path).parent / ORIGIN_JSON_NAME |
| if origin_direct_url_path.exists(): |
| self.origin = DirectUrl.from_json(origin_direct_url_path.read_text()) |
| |
| |
| class WheelCache(Cache): |
| """Wraps EphemWheelCache and SimpleWheelCache into a single Cache |
| |
| This Cache allows for gracefully degradation, using the ephem wheel cache |
| when a certain link is not found in the simple wheel cache first. |
| """ |
| |
| def __init__( |
| self, cache_dir: str, format_control: Optional[FormatControl] = None |
| ) -> None: |
| if format_control is None: |
| format_control = FormatControl() |
| super().__init__(cache_dir, format_control, {"binary"}) |
| self._wheel_cache = SimpleWheelCache(cache_dir, format_control) |
| self._ephem_cache = EphemWheelCache(format_control) |
| |
| def get_path_for_link(self, link: Link) -> str: |
| return self._wheel_cache.get_path_for_link(link) |
| |
| def get_ephem_path_for_link(self, link: Link) -> str: |
| return self._ephem_cache.get_path_for_link(link) |
| |
| def get( |
| self, |
| link: Link, |
| package_name: Optional[str], |
| supported_tags: List[Tag], |
| ) -> Link: |
| cache_entry = self.get_cache_entry(link, package_name, supported_tags) |
| if cache_entry is None: |
| return link |
| return cache_entry.link |
| |
| def get_cache_entry( |
| self, |
| link: Link, |
| package_name: Optional[str], |
| supported_tags: List[Tag], |
| ) -> Optional[CacheEntry]: |
| """Returns a CacheEntry with a link to a cached item if it exists or |
| None. The cache entry indicates if the item was found in the persistent |
| or ephemeral cache. |
| """ |
| retval = self._wheel_cache.get( |
| link=link, |
| package_name=package_name, |
| supported_tags=supported_tags, |
| ) |
| if retval is not link: |
| return CacheEntry(retval, persistent=True) |
| |
| retval = self._ephem_cache.get( |
| link=link, |
| package_name=package_name, |
| supported_tags=supported_tags, |
| ) |
| if retval is not link: |
| return CacheEntry(retval, persistent=False) |
| |
| return None |
| |
| @staticmethod |
| def record_download_origin(cache_dir: str, download_info: DirectUrl) -> None: |
| origin_path = Path(cache_dir) / ORIGIN_JSON_NAME |
| if origin_path.is_file(): |
| origin = DirectUrl.from_json(origin_path.read_text()) |
| # TODO: use DirectUrl.equivalent when https://github.com/pypa/pip/pull/10564 |
| # is merged. |
| if origin.url != download_info.url: |
| logger.warning( |
| "Origin URL %s in cache entry %s does not match download URL %s. " |
| "This is likely a pip bug or a cache corruption issue.", |
| origin.url, |
| cache_dir, |
| download_info.url, |
| ) |
| origin_path.write_text(download_info.to_json(), encoding="utf-8") |