| import contextlib |
| import errno |
| import logging |
| import logging.handlers |
| import os |
| import sys |
| import threading |
| from dataclasses import dataclass |
| from io import TextIOWrapper |
| from logging import Filter |
| from typing import Any, ClassVar, Generator, List, Optional, TextIO, Type |
| |
| from pip._vendor.rich.console import ( |
| Console, |
| ConsoleOptions, |
| ConsoleRenderable, |
| RenderableType, |
| RenderResult, |
| RichCast, |
| ) |
| from pip._vendor.rich.highlighter import NullHighlighter |
| from pip._vendor.rich.logging import RichHandler |
| from pip._vendor.rich.segment import Segment |
| from pip._vendor.rich.style import Style |
| |
| from pip._internal.utils._log import VERBOSE, getLogger |
| from pip._internal.utils.compat import WINDOWS |
| from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX |
| from pip._internal.utils.misc import ensure_dir |
| |
| _log_state = threading.local() |
| subprocess_logger = getLogger("pip.subprocessor") |
| |
| |
| class BrokenStdoutLoggingError(Exception): |
| """ |
| Raised if BrokenPipeError occurs for the stdout stream while logging. |
| """ |
| |
| |
| def _is_broken_pipe_error(exc_class: Type[BaseException], exc: BaseException) -> bool: |
| if exc_class is BrokenPipeError: |
| return True |
| |
| # On Windows, a broken pipe can show up as EINVAL rather than EPIPE: |
| # https://bugs.python.org/issue19612 |
| # https://bugs.python.org/issue30418 |
| if not WINDOWS: |
| return False |
| |
| return isinstance(exc, OSError) and exc.errno in (errno.EINVAL, errno.EPIPE) |
| |
| |
| @contextlib.contextmanager |
| def indent_log(num: int = 2) -> Generator[None, None, None]: |
| """ |
| A context manager which will cause the log output to be indented for any |
| log messages emitted inside it. |
| """ |
| # For thread-safety |
| _log_state.indentation = get_indentation() |
| _log_state.indentation += num |
| try: |
| yield |
| finally: |
| _log_state.indentation -= num |
| |
| |
| def get_indentation() -> int: |
| return getattr(_log_state, "indentation", 0) |
| |
| |
| class IndentingFormatter(logging.Formatter): |
| default_time_format = "%Y-%m-%dT%H:%M:%S" |
| |
| def __init__( |
| self, |
| *args: Any, |
| add_timestamp: bool = False, |
| **kwargs: Any, |
| ) -> None: |
| """ |
| A logging.Formatter that obeys the indent_log() context manager. |
| |
| :param add_timestamp: A bool indicating output lines should be prefixed |
| with their record's timestamp. |
| """ |
| self.add_timestamp = add_timestamp |
| super().__init__(*args, **kwargs) |
| |
| def get_message_start(self, formatted: str, levelno: int) -> str: |
| """ |
| Return the start of the formatted log message (not counting the |
| prefix to add to each line). |
| """ |
| if levelno < logging.WARNING: |
| return "" |
| if formatted.startswith(DEPRECATION_MSG_PREFIX): |
| # Then the message already has a prefix. We don't want it to |
| # look like "WARNING: DEPRECATION: ...." |
| return "" |
| if levelno < logging.ERROR: |
| return "WARNING: " |
| |
| return "ERROR: " |
| |
| def format(self, record: logging.LogRecord) -> str: |
| """ |
| Calls the standard formatter, but will indent all of the log message |
| lines by our current indentation level. |
| """ |
| formatted = super().format(record) |
| message_start = self.get_message_start(formatted, record.levelno) |
| formatted = message_start + formatted |
| |
| prefix = "" |
| if self.add_timestamp: |
| prefix = f"{self.formatTime(record)} " |
| prefix += " " * get_indentation() |
| formatted = "".join([prefix + line for line in formatted.splitlines(True)]) |
| return formatted |
| |
| |
| @dataclass |
| class IndentedRenderable: |
| renderable: RenderableType |
| indent: int |
| |
| def __rich_console__( |
| self, console: Console, options: ConsoleOptions |
| ) -> RenderResult: |
| segments = console.render(self.renderable, options) |
| lines = Segment.split_lines(segments) |
| for line in lines: |
| yield Segment(" " * self.indent) |
| yield from line |
| yield Segment("\n") |
| |
| |
| class RichPipStreamHandler(RichHandler): |
| KEYWORDS: ClassVar[Optional[List[str]]] = [] |
| |
| def __init__(self, stream: Optional[TextIO], no_color: bool) -> None: |
| super().__init__( |
| console=Console(file=stream, no_color=no_color, soft_wrap=True), |
| show_time=False, |
| show_level=False, |
| show_path=False, |
| highlighter=NullHighlighter(), |
| ) |
| |
| # Our custom override on Rich's logger, to make things work as we need them to. |
| def emit(self, record: logging.LogRecord) -> None: |
| style: Optional[Style] = None |
| |
| # If we are given a diagnostic error to present, present it with indentation. |
| assert isinstance(record.args, tuple) |
| if record.msg == "[present-rich] %s" and len(record.args) == 1: |
| rich_renderable = record.args[0] |
| assert isinstance( |
| rich_renderable, (ConsoleRenderable, RichCast, str) |
| ), f"{rich_renderable} is not rich-console-renderable" |
| |
| renderable: RenderableType = IndentedRenderable( |
| rich_renderable, indent=get_indentation() |
| ) |
| else: |
| message = self.format(record) |
| renderable = self.render_message(record, message) |
| if record.levelno is not None: |
| if record.levelno >= logging.ERROR: |
| style = Style(color="red") |
| elif record.levelno >= logging.WARNING: |
| style = Style(color="yellow") |
| |
| try: |
| self.console.print(renderable, overflow="ignore", crop=False, style=style) |
| except Exception: |
| self.handleError(record) |
| |
| def handleError(self, record: logging.LogRecord) -> None: |
| """Called when logging is unable to log some output.""" |
| |
| exc_class, exc = sys.exc_info()[:2] |
| # If a broken pipe occurred while calling write() or flush() on the |
| # stdout stream in logging's Handler.emit(), then raise our special |
| # exception so we can handle it in main() instead of logging the |
| # broken pipe error and continuing. |
| if ( |
| exc_class |
| and exc |
| and self.console.file is sys.stdout |
| and _is_broken_pipe_error(exc_class, exc) |
| ): |
| raise BrokenStdoutLoggingError() |
| |
| return super().handleError(record) |
| |
| |
| class BetterRotatingFileHandler(logging.handlers.RotatingFileHandler): |
| def _open(self) -> TextIOWrapper: |
| ensure_dir(os.path.dirname(self.baseFilename)) |
| return super()._open() |
| |
| |
| class MaxLevelFilter(Filter): |
| def __init__(self, level: int) -> None: |
| self.level = level |
| |
| def filter(self, record: logging.LogRecord) -> bool: |
| return record.levelno < self.level |
| |
| |
| class ExcludeLoggerFilter(Filter): |
| |
| """ |
| A logging Filter that excludes records from a logger (or its children). |
| """ |
| |
| def filter(self, record: logging.LogRecord) -> bool: |
| # The base Filter class allows only records from a logger (or its |
| # children). |
| return not super().filter(record) |
| |
| |
| def setup_logging(verbosity: int, no_color: bool, user_log_file: Optional[str]) -> int: |
| """Configures and sets up all of the logging |
| |
| Returns the requested logging level, as its integer value. |
| """ |
| |
| # Determine the level to be logging at. |
| if verbosity >= 2: |
| level_number = logging.DEBUG |
| elif verbosity == 1: |
| level_number = VERBOSE |
| elif verbosity == -1: |
| level_number = logging.WARNING |
| elif verbosity == -2: |
| level_number = logging.ERROR |
| elif verbosity <= -3: |
| level_number = logging.CRITICAL |
| else: |
| level_number = logging.INFO |
| |
| level = logging.getLevelName(level_number) |
| |
| # The "root" logger should match the "console" level *unless* we also need |
| # to log to a user log file. |
| include_user_log = user_log_file is not None |
| if include_user_log: |
| additional_log_file = user_log_file |
| root_level = "DEBUG" |
| else: |
| additional_log_file = "/dev/null" |
| root_level = level |
| |
| # Disable any logging besides WARNING unless we have DEBUG level logging |
| # enabled for vendored libraries. |
| vendored_log_level = "WARNING" if level in ["INFO", "ERROR"] else "DEBUG" |
| |
| # Shorthands for clarity |
| log_streams = { |
| "stdout": "ext://sys.stdout", |
| "stderr": "ext://sys.stderr", |
| } |
| handler_classes = { |
| "stream": "pip._internal.utils.logging.RichPipStreamHandler", |
| "file": "pip._internal.utils.logging.BetterRotatingFileHandler", |
| } |
| handlers = ["console", "console_errors", "console_subprocess"] + ( |
| ["user_log"] if include_user_log else [] |
| ) |
| |
| logging.config.dictConfig( |
| { |
| "version": 1, |
| "disable_existing_loggers": False, |
| "filters": { |
| "exclude_warnings": { |
| "()": "pip._internal.utils.logging.MaxLevelFilter", |
| "level": logging.WARNING, |
| }, |
| "restrict_to_subprocess": { |
| "()": "logging.Filter", |
| "name": subprocess_logger.name, |
| }, |
| "exclude_subprocess": { |
| "()": "pip._internal.utils.logging.ExcludeLoggerFilter", |
| "name": subprocess_logger.name, |
| }, |
| }, |
| "formatters": { |
| "indent": { |
| "()": IndentingFormatter, |
| "format": "%(message)s", |
| }, |
| "indent_with_timestamp": { |
| "()": IndentingFormatter, |
| "format": "%(message)s", |
| "add_timestamp": True, |
| }, |
| }, |
| "handlers": { |
| "console": { |
| "level": level, |
| "class": handler_classes["stream"], |
| "no_color": no_color, |
| "stream": log_streams["stdout"], |
| "filters": ["exclude_subprocess", "exclude_warnings"], |
| "formatter": "indent", |
| }, |
| "console_errors": { |
| "level": "WARNING", |
| "class": handler_classes["stream"], |
| "no_color": no_color, |
| "stream": log_streams["stderr"], |
| "filters": ["exclude_subprocess"], |
| "formatter": "indent", |
| }, |
| # A handler responsible for logging to the console messages |
| # from the "subprocessor" logger. |
| "console_subprocess": { |
| "level": level, |
| "class": handler_classes["stream"], |
| "stream": log_streams["stderr"], |
| "no_color": no_color, |
| "filters": ["restrict_to_subprocess"], |
| "formatter": "indent", |
| }, |
| "user_log": { |
| "level": "DEBUG", |
| "class": handler_classes["file"], |
| "filename": additional_log_file, |
| "encoding": "utf-8", |
| "delay": True, |
| "formatter": "indent_with_timestamp", |
| }, |
| }, |
| "root": { |
| "level": root_level, |
| "handlers": handlers, |
| }, |
| "loggers": {"pip._vendor": {"level": vendored_log_level}}, |
| } |
| ) |
| |
| return level_number |