| import contextlib |
| import itertools |
| import logging |
| import sys |
| import time |
| from typing import IO, Generator, Optional |
| |
| from pip._internal.utils.compat import WINDOWS |
| from pip._internal.utils.logging import get_indentation |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class SpinnerInterface: |
| def spin(self) -> None: |
| raise NotImplementedError() |
| |
| def finish(self, final_status: str) -> None: |
| raise NotImplementedError() |
| |
| |
| class InteractiveSpinner(SpinnerInterface): |
| def __init__( |
| self, |
| message: str, |
| file: Optional[IO[str]] = None, |
| spin_chars: str = "-\\|/", |
| # Empirically, 8 updates/second looks nice |
| min_update_interval_seconds: float = 0.125, |
| ): |
| self._message = message |
| if file is None: |
| file = sys.stdout |
| self._file = file |
| self._rate_limiter = RateLimiter(min_update_interval_seconds) |
| self._finished = False |
| |
| self._spin_cycle = itertools.cycle(spin_chars) |
| |
| self._file.write(" " * get_indentation() + self._message + " ... ") |
| self._width = 0 |
| |
| def _write(self, status: str) -> None: |
| assert not self._finished |
| # Erase what we wrote before by backspacing to the beginning, writing |
| # spaces to overwrite the old text, and then backspacing again |
| backup = "\b" * self._width |
| self._file.write(backup + " " * self._width + backup) |
| # Now we have a blank slate to add our status |
| self._file.write(status) |
| self._width = len(status) |
| self._file.flush() |
| self._rate_limiter.reset() |
| |
| def spin(self) -> None: |
| if self._finished: |
| return |
| if not self._rate_limiter.ready(): |
| return |
| self._write(next(self._spin_cycle)) |
| |
| def finish(self, final_status: str) -> None: |
| if self._finished: |
| return |
| self._write(final_status) |
| self._file.write("\n") |
| self._file.flush() |
| self._finished = True |
| |
| |
| # Used for dumb terminals, non-interactive installs (no tty), etc. |
| # We still print updates occasionally (once every 60 seconds by default) to |
| # act as a keep-alive for systems like Travis-CI that take lack-of-output as |
| # an indication that a task has frozen. |
| class NonInteractiveSpinner(SpinnerInterface): |
| def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None: |
| self._message = message |
| self._finished = False |
| self._rate_limiter = RateLimiter(min_update_interval_seconds) |
| self._update("started") |
| |
| def _update(self, status: str) -> None: |
| assert not self._finished |
| self._rate_limiter.reset() |
| logger.info("%s: %s", self._message, status) |
| |
| def spin(self) -> None: |
| if self._finished: |
| return |
| if not self._rate_limiter.ready(): |
| return |
| self._update("still running...") |
| |
| def finish(self, final_status: str) -> None: |
| if self._finished: |
| return |
| self._update(f"finished with status '{final_status}'") |
| self._finished = True |
| |
| |
| class RateLimiter: |
| def __init__(self, min_update_interval_seconds: float) -> None: |
| self._min_update_interval_seconds = min_update_interval_seconds |
| self._last_update: float = 0 |
| |
| def ready(self) -> bool: |
| now = time.time() |
| delta = now - self._last_update |
| return delta >= self._min_update_interval_seconds |
| |
| def reset(self) -> None: |
| self._last_update = time.time() |
| |
| |
| @contextlib.contextmanager |
| def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]: |
| # Interactive spinner goes directly to sys.stdout rather than being routed |
| # through the logging system, but it acts like it has level INFO, |
| # i.e. it's only displayed if we're at level INFO or better. |
| # Non-interactive spinner goes through the logging system, so it is always |
| # in sync with logging configuration. |
| if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: |
| spinner: SpinnerInterface = InteractiveSpinner(message) |
| else: |
| spinner = NonInteractiveSpinner(message) |
| try: |
| with hidden_cursor(sys.stdout): |
| yield spinner |
| except KeyboardInterrupt: |
| spinner.finish("canceled") |
| raise |
| except Exception: |
| spinner.finish("error") |
| raise |
| else: |
| spinner.finish("done") |
| |
| |
| HIDE_CURSOR = "\x1b[?25l" |
| SHOW_CURSOR = "\x1b[?25h" |
| |
| |
| @contextlib.contextmanager |
| def hidden_cursor(file: IO[str]) -> Generator[None, None, None]: |
| # The Windows terminal does not support the hide/show cursor ANSI codes, |
| # even via colorama. So don't even try. |
| if WINDOWS: |
| yield |
| # We don't want to clutter the output with control characters if we're |
| # writing to a file, or if the user is running with --quiet. |
| # See https://github.com/pypa/pip/issues/3418 |
| elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: |
| yield |
| else: |
| file.write(HIDE_CURSOR) |
| try: |
| yield |
| finally: |
| file.write(SHOW_CURSOR) |