| import codecs |
| import io |
| import os |
| import re |
| import sys |
| import typing as t |
| from weakref import WeakKeyDictionary |
| |
| CYGWIN = sys.platform.startswith("cygwin") |
| MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version) |
| # Determine local App Engine environment, per Google's own suggestion |
| APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get( |
| "SERVER_SOFTWARE", "" |
| ) |
| WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2 |
| auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None |
| _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]") |
| |
| |
| def get_filesystem_encoding() -> str: |
| return sys.getfilesystemencoding() or sys.getdefaultencoding() |
| |
| |
| def _make_text_stream( |
| stream: t.BinaryIO, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| force_readable: bool = False, |
| force_writable: bool = False, |
| ) -> t.TextIO: |
| if encoding is None: |
| encoding = get_best_encoding(stream) |
| if errors is None: |
| errors = "replace" |
| return _NonClosingTextIOWrapper( |
| stream, |
| encoding, |
| errors, |
| line_buffering=True, |
| force_readable=force_readable, |
| force_writable=force_writable, |
| ) |
| |
| |
| def is_ascii_encoding(encoding: str) -> bool: |
| """Checks if a given encoding is ascii.""" |
| try: |
| return codecs.lookup(encoding).name == "ascii" |
| except LookupError: |
| return False |
| |
| |
| def get_best_encoding(stream: t.IO) -> str: |
| """Returns the default stream encoding if not found.""" |
| rv = getattr(stream, "encoding", None) or sys.getdefaultencoding() |
| if is_ascii_encoding(rv): |
| return "utf-8" |
| return rv |
| |
| |
| class _NonClosingTextIOWrapper(io.TextIOWrapper): |
| def __init__( |
| self, |
| stream: t.BinaryIO, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| force_readable: bool = False, |
| force_writable: bool = False, |
| **extra: t.Any, |
| ) -> None: |
| self._stream = stream = t.cast( |
| t.BinaryIO, _FixupStream(stream, force_readable, force_writable) |
| ) |
| super().__init__(stream, encoding, errors, **extra) |
| |
| def __del__(self) -> None: |
| try: |
| self.detach() |
| except Exception: |
| pass |
| |
| def isatty(self) -> bool: |
| # https://bitbucket.org/pypy/pypy/issue/1803 |
| return self._stream.isatty() |
| |
| |
| class _FixupStream: |
| """The new io interface needs more from streams than streams |
| traditionally implement. As such, this fix-up code is necessary in |
| some circumstances. |
| |
| The forcing of readable and writable flags are there because some tools |
| put badly patched objects on sys (one such offender are certain version |
| of jupyter notebook). |
| """ |
| |
| def __init__( |
| self, |
| stream: t.BinaryIO, |
| force_readable: bool = False, |
| force_writable: bool = False, |
| ): |
| self._stream = stream |
| self._force_readable = force_readable |
| self._force_writable = force_writable |
| |
| def __getattr__(self, name: str) -> t.Any: |
| return getattr(self._stream, name) |
| |
| def read1(self, size: int) -> bytes: |
| f = getattr(self._stream, "read1", None) |
| |
| if f is not None: |
| return t.cast(bytes, f(size)) |
| |
| return self._stream.read(size) |
| |
| def readable(self) -> bool: |
| if self._force_readable: |
| return True |
| x = getattr(self._stream, "readable", None) |
| if x is not None: |
| return t.cast(bool, x()) |
| try: |
| self._stream.read(0) |
| except Exception: |
| return False |
| return True |
| |
| def writable(self) -> bool: |
| if self._force_writable: |
| return True |
| x = getattr(self._stream, "writable", None) |
| if x is not None: |
| return t.cast(bool, x()) |
| try: |
| self._stream.write("") # type: ignore |
| except Exception: |
| try: |
| self._stream.write(b"") |
| except Exception: |
| return False |
| return True |
| |
| def seekable(self) -> bool: |
| x = getattr(self._stream, "seekable", None) |
| if x is not None: |
| return t.cast(bool, x()) |
| try: |
| self._stream.seek(self._stream.tell()) |
| except Exception: |
| return False |
| return True |
| |
| |
| def _is_binary_reader(stream: t.IO, default: bool = False) -> bool: |
| try: |
| return isinstance(stream.read(0), bytes) |
| except Exception: |
| return default |
| # This happens in some cases where the stream was already |
| # closed. In this case, we assume the default. |
| |
| |
| def _is_binary_writer(stream: t.IO, default: bool = False) -> bool: |
| try: |
| stream.write(b"") |
| except Exception: |
| try: |
| stream.write("") |
| return False |
| except Exception: |
| pass |
| return default |
| return True |
| |
| |
| def _find_binary_reader(stream: t.IO) -> t.Optional[t.BinaryIO]: |
| # We need to figure out if the given stream is already binary. |
| # This can happen because the official docs recommend detaching |
| # the streams to get binary streams. Some code might do this, so |
| # we need to deal with this case explicitly. |
| if _is_binary_reader(stream, False): |
| return t.cast(t.BinaryIO, stream) |
| |
| buf = getattr(stream, "buffer", None) |
| |
| # Same situation here; this time we assume that the buffer is |
| # actually binary in case it's closed. |
| if buf is not None and _is_binary_reader(buf, True): |
| return t.cast(t.BinaryIO, buf) |
| |
| return None |
| |
| |
| def _find_binary_writer(stream: t.IO) -> t.Optional[t.BinaryIO]: |
| # We need to figure out if the given stream is already binary. |
| # This can happen because the official docs recommend detaching |
| # the streams to get binary streams. Some code might do this, so |
| # we need to deal with this case explicitly. |
| if _is_binary_writer(stream, False): |
| return t.cast(t.BinaryIO, stream) |
| |
| buf = getattr(stream, "buffer", None) |
| |
| # Same situation here; this time we assume that the buffer is |
| # actually binary in case it's closed. |
| if buf is not None and _is_binary_writer(buf, True): |
| return t.cast(t.BinaryIO, buf) |
| |
| return None |
| |
| |
| def _stream_is_misconfigured(stream: t.TextIO) -> bool: |
| """A stream is misconfigured if its encoding is ASCII.""" |
| # If the stream does not have an encoding set, we assume it's set |
| # to ASCII. This appears to happen in certain unittest |
| # environments. It's not quite clear what the correct behavior is |
| # but this at least will force Click to recover somehow. |
| return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii") |
| |
| |
| def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool: |
| """A stream attribute is compatible if it is equal to the |
| desired value or the desired value is unset and the attribute |
| has a value. |
| """ |
| stream_value = getattr(stream, attr, None) |
| return stream_value == value or (value is None and stream_value is not None) |
| |
| |
| def _is_compatible_text_stream( |
| stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] |
| ) -> bool: |
| """Check if a stream's encoding and errors attributes are |
| compatible with the desired values. |
| """ |
| return _is_compat_stream_attr( |
| stream, "encoding", encoding |
| ) and _is_compat_stream_attr(stream, "errors", errors) |
| |
| |
| def _force_correct_text_stream( |
| text_stream: t.IO, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| is_binary: t.Callable[[t.IO, bool], bool], |
| find_binary: t.Callable[[t.IO], t.Optional[t.BinaryIO]], |
| force_readable: bool = False, |
| force_writable: bool = False, |
| ) -> t.TextIO: |
| if is_binary(text_stream, False): |
| binary_reader = t.cast(t.BinaryIO, text_stream) |
| else: |
| text_stream = t.cast(t.TextIO, text_stream) |
| # If the stream looks compatible, and won't default to a |
| # misconfigured ascii encoding, return it as-is. |
| if _is_compatible_text_stream(text_stream, encoding, errors) and not ( |
| encoding is None and _stream_is_misconfigured(text_stream) |
| ): |
| return text_stream |
| |
| # Otherwise, get the underlying binary reader. |
| possible_binary_reader = find_binary(text_stream) |
| |
| # If that's not possible, silently use the original reader |
| # and get mojibake instead of exceptions. |
| if possible_binary_reader is None: |
| return text_stream |
| |
| binary_reader = possible_binary_reader |
| |
| # Default errors to replace instead of strict in order to get |
| # something that works. |
| if errors is None: |
| errors = "replace" |
| |
| # Wrap the binary stream in a text stream with the correct |
| # encoding parameters. |
| return _make_text_stream( |
| binary_reader, |
| encoding, |
| errors, |
| force_readable=force_readable, |
| force_writable=force_writable, |
| ) |
| |
| |
| def _force_correct_text_reader( |
| text_reader: t.IO, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| force_readable: bool = False, |
| ) -> t.TextIO: |
| return _force_correct_text_stream( |
| text_reader, |
| encoding, |
| errors, |
| _is_binary_reader, |
| _find_binary_reader, |
| force_readable=force_readable, |
| ) |
| |
| |
| def _force_correct_text_writer( |
| text_writer: t.IO, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| force_writable: bool = False, |
| ) -> t.TextIO: |
| return _force_correct_text_stream( |
| text_writer, |
| encoding, |
| errors, |
| _is_binary_writer, |
| _find_binary_writer, |
| force_writable=force_writable, |
| ) |
| |
| |
| def get_binary_stdin() -> t.BinaryIO: |
| reader = _find_binary_reader(sys.stdin) |
| if reader is None: |
| raise RuntimeError("Was not able to determine binary stream for sys.stdin.") |
| return reader |
| |
| |
| def get_binary_stdout() -> t.BinaryIO: |
| writer = _find_binary_writer(sys.stdout) |
| if writer is None: |
| raise RuntimeError("Was not able to determine binary stream for sys.stdout.") |
| return writer |
| |
| |
| def get_binary_stderr() -> t.BinaryIO: |
| writer = _find_binary_writer(sys.stderr) |
| if writer is None: |
| raise RuntimeError("Was not able to determine binary stream for sys.stderr.") |
| return writer |
| |
| |
| def get_text_stdin( |
| encoding: t.Optional[str] = None, errors: t.Optional[str] = None |
| ) -> t.TextIO: |
| rv = _get_windows_console_stream(sys.stdin, encoding, errors) |
| if rv is not None: |
| return rv |
| return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True) |
| |
| |
| def get_text_stdout( |
| encoding: t.Optional[str] = None, errors: t.Optional[str] = None |
| ) -> t.TextIO: |
| rv = _get_windows_console_stream(sys.stdout, encoding, errors) |
| if rv is not None: |
| return rv |
| return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True) |
| |
| |
| def get_text_stderr( |
| encoding: t.Optional[str] = None, errors: t.Optional[str] = None |
| ) -> t.TextIO: |
| rv = _get_windows_console_stream(sys.stderr, encoding, errors) |
| if rv is not None: |
| return rv |
| return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True) |
| |
| |
| def _wrap_io_open( |
| file: t.Union[str, os.PathLike, int], |
| mode: str, |
| encoding: t.Optional[str], |
| errors: t.Optional[str], |
| ) -> t.IO: |
| """Handles not passing ``encoding`` and ``errors`` in binary mode.""" |
| if "b" in mode: |
| return open(file, mode) |
| |
| return open(file, mode, encoding=encoding, errors=errors) |
| |
| |
| def open_stream( |
| filename: str, |
| mode: str = "r", |
| encoding: t.Optional[str] = None, |
| errors: t.Optional[str] = "strict", |
| atomic: bool = False, |
| ) -> t.Tuple[t.IO, bool]: |
| binary = "b" in mode |
| |
| # Standard streams first. These are simple because they ignore the |
| # atomic flag. Use fsdecode to handle Path("-"). |
| if os.fsdecode(filename) == "-": |
| if any(m in mode for m in ["w", "a", "x"]): |
| if binary: |
| return get_binary_stdout(), False |
| return get_text_stdout(encoding=encoding, errors=errors), False |
| if binary: |
| return get_binary_stdin(), False |
| return get_text_stdin(encoding=encoding, errors=errors), False |
| |
| # Non-atomic writes directly go out through the regular open functions. |
| if not atomic: |
| return _wrap_io_open(filename, mode, encoding, errors), True |
| |
| # Some usability stuff for atomic writes |
| if "a" in mode: |
| raise ValueError( |
| "Appending to an existing file is not supported, because that" |
| " would involve an expensive `copy`-operation to a temporary" |
| " file. Open the file in normal `w`-mode and copy explicitly" |
| " if that's what you're after." |
| ) |
| if "x" in mode: |
| raise ValueError("Use the `overwrite`-parameter instead.") |
| if "w" not in mode: |
| raise ValueError("Atomic writes only make sense with `w`-mode.") |
| |
| # Atomic writes are more complicated. They work by opening a file |
| # as a proxy in the same folder and then using the fdopen |
| # functionality to wrap it in a Python file. Then we wrap it in an |
| # atomic file that moves the file over on close. |
| import errno |
| import random |
| |
| try: |
| perm: t.Optional[int] = os.stat(filename).st_mode |
| except OSError: |
| perm = None |
| |
| flags = os.O_RDWR | os.O_CREAT | os.O_EXCL |
| |
| if binary: |
| flags |= getattr(os, "O_BINARY", 0) |
| |
| while True: |
| tmp_filename = os.path.join( |
| os.path.dirname(filename), |
| f".__atomic-write{random.randrange(1 << 32):08x}", |
| ) |
| try: |
| fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm) |
| break |
| except OSError as e: |
| if e.errno == errno.EEXIST or ( |
| os.name == "nt" |
| and e.errno == errno.EACCES |
| and os.path.isdir(e.filename) |
| and os.access(e.filename, os.W_OK) |
| ): |
| continue |
| raise |
| |
| if perm is not None: |
| os.chmod(tmp_filename, perm) # in case perm includes bits in umask |
| |
| f = _wrap_io_open(fd, mode, encoding, errors) |
| af = _AtomicFile(f, tmp_filename, os.path.realpath(filename)) |
| return t.cast(t.IO, af), True |
| |
| |
| class _AtomicFile: |
| def __init__(self, f: t.IO, tmp_filename: str, real_filename: str) -> None: |
| self._f = f |
| self._tmp_filename = tmp_filename |
| self._real_filename = real_filename |
| self.closed = False |
| |
| @property |
| def name(self) -> str: |
| return self._real_filename |
| |
| def close(self, delete: bool = False) -> None: |
| if self.closed: |
| return |
| self._f.close() |
| os.replace(self._tmp_filename, self._real_filename) |
| self.closed = True |
| |
| def __getattr__(self, name: str) -> t.Any: |
| return getattr(self._f, name) |
| |
| def __enter__(self) -> "_AtomicFile": |
| return self |
| |
| def __exit__(self, exc_type, exc_value, tb): # type: ignore |
| self.close(delete=exc_type is not None) |
| |
| def __repr__(self) -> str: |
| return repr(self._f) |
| |
| |
| def strip_ansi(value: str) -> str: |
| return _ansi_re.sub("", value) |
| |
| |
| def _is_jupyter_kernel_output(stream: t.IO) -> bool: |
| while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)): |
| stream = stream._stream |
| |
| return stream.__class__.__module__.startswith("ipykernel.") |
| |
| |
| def should_strip_ansi( |
| stream: t.Optional[t.IO] = None, color: t.Optional[bool] = None |
| ) -> bool: |
| if color is None: |
| if stream is None: |
| stream = sys.stdin |
| return not isatty(stream) and not _is_jupyter_kernel_output(stream) |
| return not color |
| |
| |
| # On Windows, wrap the output streams with colorama to support ANSI |
| # color codes. |
| # NOTE: double check is needed so mypy does not analyze this on Linux |
| if sys.platform.startswith("win") and WIN: |
| from ._winconsole import _get_windows_console_stream |
| |
| def _get_argv_encoding() -> str: |
| import locale |
| |
| return locale.getpreferredencoding() |
| |
| _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() |
| |
| def auto_wrap_for_ansi( |
| stream: t.TextIO, color: t.Optional[bool] = None |
| ) -> t.TextIO: |
| """Support ANSI color and style codes on Windows by wrapping a |
| stream with colorama. |
| """ |
| try: |
| cached = _ansi_stream_wrappers.get(stream) |
| except Exception: |
| cached = None |
| |
| if cached is not None: |
| return cached |
| |
| import colorama |
| |
| strip = should_strip_ansi(stream, color) |
| ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip) |
| rv = t.cast(t.TextIO, ansi_wrapper.stream) |
| _write = rv.write |
| |
| def _safe_write(s): |
| try: |
| return _write(s) |
| except BaseException: |
| ansi_wrapper.reset_all() |
| raise |
| |
| rv.write = _safe_write |
| |
| try: |
| _ansi_stream_wrappers[stream] = rv |
| except Exception: |
| pass |
| |
| return rv |
| |
| else: |
| |
| def _get_argv_encoding() -> str: |
| return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding() |
| |
| def _get_windows_console_stream( |
| f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] |
| ) -> t.Optional[t.TextIO]: |
| return None |
| |
| |
| def term_len(x: str) -> int: |
| return len(strip_ansi(x)) |
| |
| |
| def isatty(stream: t.IO) -> bool: |
| try: |
| return stream.isatty() |
| except Exception: |
| return False |
| |
| |
| def _make_cached_stream_func( |
| src_func: t.Callable[[], t.TextIO], wrapper_func: t.Callable[[], t.TextIO] |
| ) -> t.Callable[[], t.TextIO]: |
| cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary() |
| |
| def func() -> t.TextIO: |
| stream = src_func() |
| try: |
| rv = cache.get(stream) |
| except Exception: |
| rv = None |
| if rv is not None: |
| return rv |
| rv = wrapper_func() |
| try: |
| cache[stream] = rv |
| except Exception: |
| pass |
| return rv |
| |
| return func |
| |
| |
| _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin) |
| _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout) |
| _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr) |
| |
| |
| binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = { |
| "stdin": get_binary_stdin, |
| "stdout": get_binary_stdout, |
| "stderr": get_binary_stderr, |
| } |
| |
| text_streams: t.Mapping[ |
| str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO] |
| ] = { |
| "stdin": get_text_stdin, |
| "stdout": get_text_stdout, |
| "stderr": get_text_stderr, |
| } |