| import errno |
| import itertools |
| import logging |
| import os.path |
| import tempfile |
| from contextlib import ExitStack, contextmanager |
| from typing import Any, Dict, Generator, Optional, TypeVar, Union |
| |
| from pip._internal.utils.misc import enum, rmtree |
| |
| logger = logging.getLogger(__name__) |
| |
| _T = TypeVar("_T", bound="TempDirectory") |
| |
| |
| # Kinds of temporary directories. Only needed for ones that are |
| # globally-managed. |
| tempdir_kinds = enum( |
| BUILD_ENV="build-env", |
| EPHEM_WHEEL_CACHE="ephem-wheel-cache", |
| REQ_BUILD="req-build", |
| ) |
| |
| |
| _tempdir_manager: Optional[ExitStack] = None |
| |
| |
| @contextmanager |
| def global_tempdir_manager() -> Generator[None, None, None]: |
| global _tempdir_manager |
| with ExitStack() as stack: |
| old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack |
| try: |
| yield |
| finally: |
| _tempdir_manager = old_tempdir_manager |
| |
| |
| class TempDirectoryTypeRegistry: |
| """Manages temp directory behavior""" |
| |
| def __init__(self) -> None: |
| self._should_delete: Dict[str, bool] = {} |
| |
| def set_delete(self, kind: str, value: bool) -> None: |
| """Indicate whether a TempDirectory of the given kind should be |
| auto-deleted. |
| """ |
| self._should_delete[kind] = value |
| |
| def get_delete(self, kind: str) -> bool: |
| """Get configured auto-delete flag for a given TempDirectory type, |
| default True. |
| """ |
| return self._should_delete.get(kind, True) |
| |
| |
| _tempdir_registry: Optional[TempDirectoryTypeRegistry] = None |
| |
| |
| @contextmanager |
| def tempdir_registry() -> Generator[TempDirectoryTypeRegistry, None, None]: |
| """Provides a scoped global tempdir registry that can be used to dictate |
| whether directories should be deleted. |
| """ |
| global _tempdir_registry |
| old_tempdir_registry = _tempdir_registry |
| _tempdir_registry = TempDirectoryTypeRegistry() |
| try: |
| yield _tempdir_registry |
| finally: |
| _tempdir_registry = old_tempdir_registry |
| |
| |
| class _Default: |
| pass |
| |
| |
| _default = _Default() |
| |
| |
| class TempDirectory: |
| """Helper class that owns and cleans up a temporary directory. |
| |
| This class can be used as a context manager or as an OO representation of a |
| temporary directory. |
| |
| Attributes: |
| path |
| Location to the created temporary directory |
| delete |
| Whether the directory should be deleted when exiting |
| (when used as a contextmanager) |
| |
| Methods: |
| cleanup() |
| Deletes the temporary directory |
| |
| When used as a context manager, if the delete attribute is True, on |
| exiting the context the temporary directory is deleted. |
| """ |
| |
| def __init__( |
| self, |
| path: Optional[str] = None, |
| delete: Union[bool, None, _Default] = _default, |
| kind: str = "temp", |
| globally_managed: bool = False, |
| ): |
| super().__init__() |
| |
| if delete is _default: |
| if path is not None: |
| # If we were given an explicit directory, resolve delete option |
| # now. |
| delete = False |
| else: |
| # Otherwise, we wait until cleanup and see what |
| # tempdir_registry says. |
| delete = None |
| |
| # The only time we specify path is in for editables where it |
| # is the value of the --src option. |
| if path is None: |
| path = self._create(kind) |
| |
| self._path = path |
| self._deleted = False |
| self.delete = delete |
| self.kind = kind |
| |
| if globally_managed: |
| assert _tempdir_manager is not None |
| _tempdir_manager.enter_context(self) |
| |
| @property |
| def path(self) -> str: |
| assert not self._deleted, f"Attempted to access deleted path: {self._path}" |
| return self._path |
| |
| def __repr__(self) -> str: |
| return f"<{self.__class__.__name__} {self.path!r}>" |
| |
| def __enter__(self: _T) -> _T: |
| return self |
| |
| def __exit__(self, exc: Any, value: Any, tb: Any) -> None: |
| if self.delete is not None: |
| delete = self.delete |
| elif _tempdir_registry: |
| delete = _tempdir_registry.get_delete(self.kind) |
| else: |
| delete = True |
| |
| if delete: |
| self.cleanup() |
| |
| def _create(self, kind: str) -> str: |
| """Create a temporary directory and store its path in self.path""" |
| # We realpath here because some systems have their default tmpdir |
| # symlinked to another directory. This tends to confuse build |
| # scripts, so we canonicalize the path by traversing potential |
| # symlinks here. |
| path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) |
| logger.debug("Created temporary directory: %s", path) |
| return path |
| |
| def cleanup(self) -> None: |
| """Remove the temporary directory created and reset state""" |
| self._deleted = True |
| if not os.path.exists(self._path): |
| return |
| rmtree(self._path) |
| |
| |
| class AdjacentTempDirectory(TempDirectory): |
| """Helper class that creates a temporary directory adjacent to a real one. |
| |
| Attributes: |
| original |
| The original directory to create a temp directory for. |
| path |
| After calling create() or entering, contains the full |
| path to the temporary directory. |
| delete |
| Whether the directory should be deleted when exiting |
| (when used as a contextmanager) |
| |
| """ |
| |
| # The characters that may be used to name the temp directory |
| # We always prepend a ~ and then rotate through these until |
| # a usable name is found. |
| # pkg_resources raises a different error for .dist-info folder |
| # with leading '-' and invalid metadata |
| LEADING_CHARS = "-~.=%0123456789" |
| |
| def __init__(self, original: str, delete: Optional[bool] = None) -> None: |
| self.original = original.rstrip("/\\") |
| super().__init__(delete=delete) |
| |
| @classmethod |
| def _generate_names(cls, name: str) -> Generator[str, None, None]: |
| """Generates a series of temporary names. |
| |
| The algorithm replaces the leading characters in the name |
| with ones that are valid filesystem characters, but are not |
| valid package names (for both Python and pip definitions of |
| package). |
| """ |
| for i in range(1, len(name)): |
| for candidate in itertools.combinations_with_replacement( |
| cls.LEADING_CHARS, i - 1 |
| ): |
| new_name = "~" + "".join(candidate) + name[i:] |
| if new_name != name: |
| yield new_name |
| |
| # If we make it this far, we will have to make a longer name |
| for i in range(len(cls.LEADING_CHARS)): |
| for candidate in itertools.combinations_with_replacement( |
| cls.LEADING_CHARS, i |
| ): |
| new_name = "~" + "".join(candidate) + name |
| if new_name != name: |
| yield new_name |
| |
| def _create(self, kind: str) -> str: |
| root, name = os.path.split(self.original) |
| for candidate in self._generate_names(name): |
| path = os.path.join(root, candidate) |
| try: |
| os.mkdir(path) |
| except OSError as ex: |
| # Continue if the name exists already |
| if ex.errno != errno.EEXIST: |
| raise |
| else: |
| path = os.path.realpath(path) |
| break |
| else: |
| # Final fallback on the default behavior. |
| path = os.path.realpath(tempfile.mkdtemp(prefix=f"pip-{kind}-")) |
| |
| logger.debug("Created temporary directory: %s", path) |
| return path |