| # This module is based on the excellent work by Adam Bartoš who |
| # provided a lot of what went into the implementation here in |
| # the discussion to issue1602 in the Python bug tracker. |
| # |
| # There are some general differences in regards to how this works |
| # compared to the original patches as we do not need to patch |
| # the entire interpreter but just work in our little world of |
| # echo and prompt. |
| import io |
| import sys |
| import time |
| import typing as t |
| from ctypes import byref |
| from ctypes import c_char |
| from ctypes import c_char_p |
| from ctypes import c_int |
| from ctypes import c_ssize_t |
| from ctypes import c_ulong |
| from ctypes import c_void_p |
| from ctypes import POINTER |
| from ctypes import py_object |
| from ctypes import Structure |
| from ctypes.wintypes import DWORD |
| from ctypes.wintypes import HANDLE |
| from ctypes.wintypes import LPCWSTR |
| from ctypes.wintypes import LPWSTR |
| |
| from ._compat import _NonClosingTextIOWrapper |
| |
| assert sys.platform == "win32" |
| import msvcrt # noqa: E402 |
| from ctypes import windll # noqa: E402 |
| from ctypes import WINFUNCTYPE # noqa: E402 |
| |
| c_ssize_p = POINTER(c_ssize_t) |
| |
| kernel32 = windll.kernel32 |
| GetStdHandle = kernel32.GetStdHandle |
| ReadConsoleW = kernel32.ReadConsoleW |
| WriteConsoleW = kernel32.WriteConsoleW |
| GetConsoleMode = kernel32.GetConsoleMode |
| GetLastError = kernel32.GetLastError |
| GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32)) |
| CommandLineToArgvW = WINFUNCTYPE(POINTER(LPWSTR), LPCWSTR, POINTER(c_int))( |
| ("CommandLineToArgvW", windll.shell32) |
| ) |
| LocalFree = WINFUNCTYPE(c_void_p, c_void_p)(("LocalFree", windll.kernel32)) |
| |
| STDIN_HANDLE = GetStdHandle(-10) |
| STDOUT_HANDLE = GetStdHandle(-11) |
| STDERR_HANDLE = GetStdHandle(-12) |
| |
| PyBUF_SIMPLE = 0 |
| PyBUF_WRITABLE = 1 |
| |
| ERROR_SUCCESS = 0 |
| ERROR_NOT_ENOUGH_MEMORY = 8 |
| ERROR_OPERATION_ABORTED = 995 |
| |
| STDIN_FILENO = 0 |
| STDOUT_FILENO = 1 |
| STDERR_FILENO = 2 |
| |
| EOF = b"\x1a" |
| MAX_BYTES_WRITTEN = 32767 |
| |
| try: |
| from ctypes import pythonapi |
| except ImportError: |
| # On PyPy we cannot get buffers so our ability to operate here is |
| # severely limited. |
| get_buffer = None |
| else: |
| |
| class Py_buffer(Structure): |
| _fields_ = [ |
| ("buf", c_void_p), |
| ("obj", py_object), |
| ("len", c_ssize_t), |
| ("itemsize", c_ssize_t), |
| ("readonly", c_int), |
| ("ndim", c_int), |
| ("format", c_char_p), |
| ("shape", c_ssize_p), |
| ("strides", c_ssize_p), |
| ("suboffsets", c_ssize_p), |
| ("internal", c_void_p), |
| ] |
| |
| PyObject_GetBuffer = pythonapi.PyObject_GetBuffer |
| PyBuffer_Release = pythonapi.PyBuffer_Release |
| |
| def get_buffer(obj, writable=False): |
| buf = Py_buffer() |
| flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE |
| PyObject_GetBuffer(py_object(obj), byref(buf), flags) |
| |
| try: |
| buffer_type = c_char * buf.len |
| return buffer_type.from_address(buf.buf) |
| finally: |
| PyBuffer_Release(byref(buf)) |
| |
| |
| class _WindowsConsoleRawIOBase(io.RawIOBase): |
| def __init__(self, handle): |
| self.handle = handle |
| |
| def isatty(self): |
| super().isatty() |
| return True |
| |
| |
| class _WindowsConsoleReader(_WindowsConsoleRawIOBase): |
| def readable(self): |
| return True |
| |
| def readinto(self, b): |
| bytes_to_be_read = len(b) |
| if not bytes_to_be_read: |
| return 0 |
| elif bytes_to_be_read % 2: |
| raise ValueError( |
| "cannot read odd number of bytes from UTF-16-LE encoded console" |
| ) |
| |
| buffer = get_buffer(b, writable=True) |
| code_units_to_be_read = bytes_to_be_read // 2 |
| code_units_read = c_ulong() |
| |
| rv = ReadConsoleW( |
| HANDLE(self.handle), |
| buffer, |
| code_units_to_be_read, |
| byref(code_units_read), |
| None, |
| ) |
| if GetLastError() == ERROR_OPERATION_ABORTED: |
| # wait for KeyboardInterrupt |
| time.sleep(0.1) |
| if not rv: |
| raise OSError(f"Windows error: {GetLastError()}") |
| |
| if buffer[0] == EOF: |
| return 0 |
| return 2 * code_units_read.value |
| |
| |
| class _WindowsConsoleWriter(_WindowsConsoleRawIOBase): |
| def writable(self): |
| return True |
| |
| @staticmethod |
| def _get_error_message(errno): |
| if errno == ERROR_SUCCESS: |
| return "ERROR_SUCCESS" |
| elif errno == ERROR_NOT_ENOUGH_MEMORY: |
| return "ERROR_NOT_ENOUGH_MEMORY" |
| return f"Windows error {errno}" |
| |
| def write(self, b): |
| bytes_to_be_written = len(b) |
| buf = get_buffer(b) |
| code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2 |
| code_units_written = c_ulong() |
| |
| WriteConsoleW( |
| HANDLE(self.handle), |
| buf, |
| code_units_to_be_written, |
| byref(code_units_written), |
| None, |
| ) |
| bytes_written = 2 * code_units_written.value |
| |
| if bytes_written == 0 and bytes_to_be_written > 0: |
| raise OSError(self._get_error_message(GetLastError())) |
| return bytes_written |
| |
| |
| class ConsoleStream: |
| def __init__(self, text_stream: t.TextIO, byte_stream: t.BinaryIO) -> None: |
| self._text_stream = text_stream |
| self.buffer = byte_stream |
| |
| @property |
| def name(self) -> str: |
| return self.buffer.name |
| |
| def write(self, x: t.AnyStr) -> int: |
| if isinstance(x, str): |
| return self._text_stream.write(x) |
| try: |
| self.flush() |
| except Exception: |
| pass |
| return self.buffer.write(x) |
| |
| def writelines(self, lines: t.Iterable[t.AnyStr]) -> None: |
| for line in lines: |
| self.write(line) |
| |
| def __getattr__(self, name: str) -> t.Any: |
| return getattr(self._text_stream, name) |
| |
| def isatty(self) -> bool: |
| return self.buffer.isatty() |
| |
| def __repr__(self): |
| return f"<ConsoleStream name={self.name!r} encoding={self.encoding!r}>" |
| |
| |
| def _get_text_stdin(buffer_stream: t.BinaryIO) -> t.TextIO: |
| text_stream = _NonClosingTextIOWrapper( |
| io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)), |
| "utf-16-le", |
| "strict", |
| line_buffering=True, |
| ) |
| return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream)) |
| |
| |
| def _get_text_stdout(buffer_stream: t.BinaryIO) -> t.TextIO: |
| text_stream = _NonClosingTextIOWrapper( |
| io.BufferedWriter(_WindowsConsoleWriter(STDOUT_HANDLE)), |
| "utf-16-le", |
| "strict", |
| line_buffering=True, |
| ) |
| return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream)) |
| |
| |
| def _get_text_stderr(buffer_stream: t.BinaryIO) -> t.TextIO: |
| text_stream = _NonClosingTextIOWrapper( |
| io.BufferedWriter(_WindowsConsoleWriter(STDERR_HANDLE)), |
| "utf-16-le", |
| "strict", |
| line_buffering=True, |
| ) |
| return t.cast(t.TextIO, ConsoleStream(text_stream, buffer_stream)) |
| |
| |
| _stream_factories: t.Mapping[int, t.Callable[[t.BinaryIO], t.TextIO]] = { |
| 0: _get_text_stdin, |
| 1: _get_text_stdout, |
| 2: _get_text_stderr, |
| } |
| |
| |
| def _is_console(f: t.TextIO) -> bool: |
| if not hasattr(f, "fileno"): |
| return False |
| |
| try: |
| fileno = f.fileno() |
| except (OSError, io.UnsupportedOperation): |
| return False |
| |
| handle = msvcrt.get_osfhandle(fileno) |
| return bool(GetConsoleMode(handle, byref(DWORD()))) |
| |
| |
| def _get_windows_console_stream( |
| f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str] |
| ) -> t.Optional[t.TextIO]: |
| if ( |
| get_buffer is not None |
| and encoding in {"utf-16-le", None} |
| and errors in {"strict", None} |
| and _is_console(f) |
| ): |
| func = _stream_factories.get(f.fileno()) |
| if func is not None: |
| b = getattr(f, "buffer", None) |
| |
| if b is None: |
| return None |
| |
| return func(b) |