| import os |
| import sys |
| import typing as t |
| from functools import update_wrapper |
| from types import ModuleType |
| |
| from ._compat import _default_text_stderr |
| from ._compat import _default_text_stdout |
| from ._compat import _find_binary_writer |
| from ._compat import auto_wrap_for_ansi |
| from ._compat import binary_streams |
| from ._compat import get_filesystem_encoding |
| from ._compat import open_stream |
| from ._compat import should_strip_ansi |
| from ._compat import strip_ansi |
| from ._compat import text_streams |
| from ._compat import WIN |
| from .globals import resolve_color_default |
| |
| if t.TYPE_CHECKING: |
| import typing_extensions as te |
| |
| F = t.TypeVar("F", bound=t.Callable[..., t.Any]) |
| |
| |
| def _posixify(name: str) -> str: |
| return "-".join(name.split()).lower() |
| |
| |
| def safecall(func: F) -> F: |
| """Wraps a function so that it swallows exceptions.""" |
| |
| def wrapper(*args, **kwargs): # type: ignore |
| try: |
| return func(*args, **kwargs) |
| except Exception: |
| pass |
| |
| return update_wrapper(t.cast(F, wrapper), func) |
| |
| |
| def make_str(value: t.Any) -> str: |
| """Converts a value into a valid string.""" |
| if isinstance(value, bytes): |
| try: |
| return value.decode(get_filesystem_encoding()) |
| except UnicodeError: |
| return value.decode("utf-8", "replace") |
| return str(value) |
| |
| |
| def make_default_short_help(help: str, max_length: int = 45) -> str: |
| """Returns a condensed version of help string.""" |
| # Consider only the first paragraph. |
| paragraph_end = help.find("\n\n") |
| |
| if paragraph_end != -1: |
| help = help[:paragraph_end] |
| |
| # Collapse newlines, tabs, and spaces. |
| words = help.split() |
| |
| if not words: |
| return "" |
| |
| # The first paragraph started with a "no rewrap" marker, ignore it. |
| if words[0] == "\b": |
| words = words[1:] |
| |
| total_length = 0 |
| last_index = len(words) - 1 |
| |
| for i, word in enumerate(words): |
| total_length += len(word) + (i > 0) |
| |
| if total_length > max_length: # too long, truncate |
| break |
| |
| if word[-1] == ".": # sentence end, truncate without "..." |
| return " ".join(words[: i + 1]) |
| |
| if total_length == max_length and i != last_index: |
| break # not at sentence end, truncate with "..." |
| else: |
| return " ".join(words) # no truncation needed |
| |
| # Account for the length of the suffix. |
| total_length += len("...") |
| |
| # remove words until the length is short enough |
| while i > 0: |
| total_length -= len(words[i]) + (i > 0) |
| |
| if total_length <= max_length: |
| break |
| |
| i -= 1 |
| |
| return " ".join(words[:i]) + "..." |
| |
| |
| class LazyFile: |
| """A lazy file works like a regular file but it does not fully open |
| the file but it does perform some basic checks early to see if the |
| filename parameter does make sense. This is useful for safely opening |
| files for writing. |
| """ |
| |
| def __init__( |
| self, |
| filename: str, |
| mode: str = "r", |
| encoding: t.Optional[str] = None, |
| errors: t.Optional[str] = "strict", |
| atomic: bool = False, |
| ): |
| self.name = filename |
| self.mode = mode |
| self.encoding = encoding |
| self.errors = errors |
| self.atomic = atomic |
| self._f: t.Optional[t.IO] |
| |
| if filename == "-": |
| self._f, self.should_close = open_stream(filename, mode, encoding, errors) |
| else: |
| if "r" in mode: |
| # Open and close the file in case we're opening it for |
| # reading so that we can catch at least some errors in |
| # some cases early. |
| open(filename, mode).close() |
| self._f = None |
| self.should_close = True |
| |
| def __getattr__(self, name: str) -> t.Any: |
| return getattr(self.open(), name) |
| |
| def __repr__(self) -> str: |
| if self._f is not None: |
| return repr(self._f) |
| return f"<unopened file '{self.name}' {self.mode}>" |
| |
| def open(self) -> t.IO: |
| """Opens the file if it's not yet open. This call might fail with |
| a :exc:`FileError`. Not handling this error will produce an error |
| that Click shows. |
| """ |
| if self._f is not None: |
| return self._f |
| try: |
| rv, self.should_close = open_stream( |
| self.name, self.mode, self.encoding, self.errors, atomic=self.atomic |
| ) |
| except OSError as e: # noqa: E402 |
| from .exceptions import FileError |
| |
| raise FileError(self.name, hint=e.strerror) from e |
| self._f = rv |
| return rv |
| |
| def close(self) -> None: |
| """Closes the underlying file, no matter what.""" |
| if self._f is not None: |
| self._f.close() |
| |
| def close_intelligently(self) -> None: |
| """This function only closes the file if it was opened by the lazy |
| file wrapper. For instance this will never close stdin. |
| """ |
| if self.should_close: |
| self.close() |
| |
| def __enter__(self) -> "LazyFile": |
| return self |
| |
| def __exit__(self, exc_type, exc_value, tb): # type: ignore |
| self.close_intelligently() |
| |
| def __iter__(self) -> t.Iterator[t.AnyStr]: |
| self.open() |
| return iter(self._f) # type: ignore |
| |
| |
| class KeepOpenFile: |
| def __init__(self, file: t.IO) -> None: |
| self._file = file |
| |
| def __getattr__(self, name: str) -> t.Any: |
| return getattr(self._file, name) |
| |
| def __enter__(self) -> "KeepOpenFile": |
| return self |
| |
| def __exit__(self, exc_type, exc_value, tb): # type: ignore |
| pass |
| |
| def __repr__(self) -> str: |
| return repr(self._file) |
| |
| def __iter__(self) -> t.Iterator[t.AnyStr]: |
| return iter(self._file) |
| |
| |
| def echo( |
| message: t.Optional[t.Any] = None, |
| file: t.Optional[t.IO[t.Any]] = None, |
| nl: bool = True, |
| err: bool = False, |
| color: t.Optional[bool] = None, |
| ) -> None: |
| """Print a message and newline to stdout or a file. This should be |
| used instead of :func:`print` because it provides better support |
| for different data, files, and environments. |
| |
| Compared to :func:`print`, this does the following: |
| |
| - Ensures that the output encoding is not misconfigured on Linux. |
| - Supports Unicode in the Windows console. |
| - Supports writing to binary outputs, and supports writing bytes |
| to text outputs. |
| - Supports colors and styles on Windows. |
| - Removes ANSI color and style codes if the output does not look |
| like an interactive terminal. |
| - Always flushes the output. |
| |
| :param message: The string or bytes to output. Other objects are |
| converted to strings. |
| :param file: The file to write to. Defaults to ``stdout``. |
| :param err: Write to ``stderr`` instead of ``stdout``. |
| :param nl: Print a newline after the message. Enabled by default. |
| :param color: Force showing or hiding colors and other styles. By |
| default Click will remove color if the output does not look like |
| an interactive terminal. |
| |
| .. versionchanged:: 6.0 |
| Support Unicode output on the Windows console. Click does not |
| modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` |
| will still not support Unicode. |
| |
| .. versionchanged:: 4.0 |
| Added the ``color`` parameter. |
| |
| .. versionadded:: 3.0 |
| Added the ``err`` parameter. |
| |
| .. versionchanged:: 2.0 |
| Support colors on Windows if colorama is installed. |
| """ |
| if file is None: |
| if err: |
| file = _default_text_stderr() |
| else: |
| file = _default_text_stdout() |
| |
| # Convert non bytes/text into the native string type. |
| if message is not None and not isinstance(message, (str, bytes, bytearray)): |
| out: t.Optional[t.Union[str, bytes]] = str(message) |
| else: |
| out = message |
| |
| if nl: |
| out = out or "" |
| if isinstance(out, str): |
| out += "\n" |
| else: |
| out += b"\n" |
| |
| if not out: |
| file.flush() |
| return |
| |
| # If there is a message and the value looks like bytes, we manually |
| # need to find the binary stream and write the message in there. |
| # This is done separately so that most stream types will work as you |
| # would expect. Eg: you can write to StringIO for other cases. |
| if isinstance(out, (bytes, bytearray)): |
| binary_file = _find_binary_writer(file) |
| |
| if binary_file is not None: |
| file.flush() |
| binary_file.write(out) |
| binary_file.flush() |
| return |
| |
| # ANSI style code support. For no message or bytes, nothing happens. |
| # When outputting to a file instead of a terminal, strip codes. |
| else: |
| color = resolve_color_default(color) |
| |
| if should_strip_ansi(file, color): |
| out = strip_ansi(out) |
| elif WIN: |
| if auto_wrap_for_ansi is not None: |
| file = auto_wrap_for_ansi(file) # type: ignore |
| elif not color: |
| out = strip_ansi(out) |
| |
| file.write(out) # type: ignore |
| file.flush() |
| |
| |
| def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO: |
| """Returns a system stream for byte processing. |
| |
| :param name: the name of the stream to open. Valid names are ``'stdin'``, |
| ``'stdout'`` and ``'stderr'`` |
| """ |
| opener = binary_streams.get(name) |
| if opener is None: |
| raise TypeError(f"Unknown standard stream '{name}'") |
| return opener() |
| |
| |
| def get_text_stream( |
| name: "te.Literal['stdin', 'stdout', 'stderr']", |
| encoding: t.Optional[str] = None, |
| errors: t.Optional[str] = "strict", |
| ) -> t.TextIO: |
| """Returns a system stream for text processing. This usually returns |
| a wrapped stream around a binary stream returned from |
| :func:`get_binary_stream` but it also can take shortcuts for already |
| correctly configured streams. |
| |
| :param name: the name of the stream to open. Valid names are ``'stdin'``, |
| ``'stdout'`` and ``'stderr'`` |
| :param encoding: overrides the detected default encoding. |
| :param errors: overrides the default error mode. |
| """ |
| opener = text_streams.get(name) |
| if opener is None: |
| raise TypeError(f"Unknown standard stream '{name}'") |
| return opener(encoding, errors) |
| |
| |
| def open_file( |
| filename: str, |
| mode: str = "r", |
| encoding: t.Optional[str] = None, |
| errors: t.Optional[str] = "strict", |
| lazy: bool = False, |
| atomic: bool = False, |
| ) -> t.IO: |
| """Open a file, with extra behavior to handle ``'-'`` to indicate |
| a standard stream, lazy open on write, and atomic write. Similar to |
| the behavior of the :class:`~click.File` param type. |
| |
| If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is |
| wrapped so that using it in a context manager will not close it. |
| This makes it possible to use the function without accidentally |
| closing a standard stream: |
| |
| .. code-block:: python |
| |
| with open_file(filename) as f: |
| ... |
| |
| :param filename: The name of the file to open, or ``'-'`` for |
| ``stdin``/``stdout``. |
| :param mode: The mode in which to open the file. |
| :param encoding: The encoding to decode or encode a file opened in |
| text mode. |
| :param errors: The error handling mode. |
| :param lazy: Wait to open the file until it is accessed. For read |
| mode, the file is temporarily opened to raise access errors |
| early, then closed until it is read again. |
| :param atomic: Write to a temporary file and replace the given file |
| on close. |
| |
| .. versionadded:: 3.0 |
| """ |
| if lazy: |
| return t.cast(t.IO, LazyFile(filename, mode, encoding, errors, atomic=atomic)) |
| |
| f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) |
| |
| if not should_close: |
| f = t.cast(t.IO, KeepOpenFile(f)) |
| |
| return f |
| |
| |
| def get_os_args() -> t.Sequence[str]: |
| """Returns the argument part of ``sys.argv``, removing the first |
| value which is the name of the script. |
| |
| .. deprecated:: 8.0 |
| Will be removed in Click 8.1. Access ``sys.argv[1:]`` directly |
| instead. |
| """ |
| import warnings |
| |
| warnings.warn( |
| "'get_os_args' is deprecated and will be removed in Click 8.1." |
| " Access 'sys.argv[1:]' directly instead.", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
| return sys.argv[1:] |
| |
| |
| def format_filename( |
| filename: t.Union[str, bytes, os.PathLike], shorten: bool = False |
| ) -> str: |
| """Formats a filename for user display. The main purpose of this |
| function is to ensure that the filename can be displayed at all. This |
| will decode the filename to unicode if necessary in a way that it will |
| not fail. Optionally, it can shorten the filename to not include the |
| full path to the filename. |
| |
| :param filename: formats a filename for UI display. This will also convert |
| the filename into unicode without failing. |
| :param shorten: this optionally shortens the filename to strip of the |
| path that leads up to it. |
| """ |
| if shorten: |
| filename = os.path.basename(filename) |
| |
| return os.fsdecode(filename) |
| |
| |
| def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: |
| r"""Returns the config folder for the application. The default behavior |
| is to return whatever is most appropriate for the operating system. |
| |
| To give you an idea, for an app called ``"Foo Bar"``, something like |
| the following folders could be returned: |
| |
| Mac OS X: |
| ``~/Library/Application Support/Foo Bar`` |
| Mac OS X (POSIX): |
| ``~/.foo-bar`` |
| Unix: |
| ``~/.config/foo-bar`` |
| Unix (POSIX): |
| ``~/.foo-bar`` |
| Windows (roaming): |
| ``C:\Users\<user>\AppData\Roaming\Foo Bar`` |
| Windows (not roaming): |
| ``C:\Users\<user>\AppData\Local\Foo Bar`` |
| |
| .. versionadded:: 2.0 |
| |
| :param app_name: the application name. This should be properly capitalized |
| and can contain whitespace. |
| :param roaming: controls if the folder should be roaming or not on Windows. |
| Has no affect otherwise. |
| :param force_posix: if this is set to `True` then on any POSIX system the |
| folder will be stored in the home folder with a leading |
| dot instead of the XDG config home or darwin's |
| application support folder. |
| """ |
| if WIN: |
| key = "APPDATA" if roaming else "LOCALAPPDATA" |
| folder = os.environ.get(key) |
| if folder is None: |
| folder = os.path.expanduser("~") |
| return os.path.join(folder, app_name) |
| if force_posix: |
| return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) |
| if sys.platform == "darwin": |
| return os.path.join( |
| os.path.expanduser("~/Library/Application Support"), app_name |
| ) |
| return os.path.join( |
| os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), |
| _posixify(app_name), |
| ) |
| |
| |
| class PacifyFlushWrapper: |
| """This wrapper is used to catch and suppress BrokenPipeErrors resulting |
| from ``.flush()`` being called on broken pipe during the shutdown/final-GC |
| of the Python interpreter. Notably ``.flush()`` is always called on |
| ``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any |
| other cleanup code, and the case where the underlying file is not a broken |
| pipe, all calls and attributes are proxied. |
| """ |
| |
| def __init__(self, wrapped: t.IO) -> None: |
| self.wrapped = wrapped |
| |
| def flush(self) -> None: |
| try: |
| self.wrapped.flush() |
| except OSError as e: |
| import errno |
| |
| if e.errno != errno.EPIPE: |
| raise |
| |
| def __getattr__(self, attr: str) -> t.Any: |
| return getattr(self.wrapped, attr) |
| |
| |
| def _detect_program_name( |
| path: t.Optional[str] = None, _main: ModuleType = sys.modules["__main__"] |
| ) -> str: |
| """Determine the command used to run the program, for use in help |
| text. If a file or entry point was executed, the file name is |
| returned. If ``python -m`` was used to execute a module or package, |
| ``python -m name`` is returned. |
| |
| This doesn't try to be too precise, the goal is to give a concise |
| name for help text. Files are only shown as their name without the |
| path. ``python`` is only shown for modules, and the full path to |
| ``sys.executable`` is not shown. |
| |
| :param path: The Python file being executed. Python puts this in |
| ``sys.argv[0]``, which is used by default. |
| :param _main: The ``__main__`` module. This should only be passed |
| during internal testing. |
| |
| .. versionadded:: 8.0 |
| Based on command args detection in the Werkzeug reloader. |
| |
| :meta private: |
| """ |
| if not path: |
| path = sys.argv[0] |
| |
| # The value of __package__ indicates how Python was called. It may |
| # not exist if a setuptools script is installed as an egg. It may be |
| # set incorrectly for entry points created with pip on Windows. |
| if getattr(_main, "__package__", None) is None or ( |
| os.name == "nt" |
| and _main.__package__ == "" |
| and not os.path.exists(path) |
| and os.path.exists(f"{path}.exe") |
| ): |
| # Executed a file, like "python app.py". |
| return os.path.basename(path) |
| |
| # Executed a module, like "python -m example". |
| # Rewritten by Python from "-m script" to "/path/to/script.py". |
| # Need to look at main module to determine how it was executed. |
| py_module = t.cast(str, _main.__package__) |
| name = os.path.splitext(os.path.basename(path))[0] |
| |
| # A submodule like "example.cli". |
| if name != "__main__": |
| py_module = f"{py_module}.{name}" |
| |
| return f"python -m {py_module.lstrip('.')}" |
| |
| |
| def _expand_args( |
| args: t.Iterable[str], |
| *, |
| user: bool = True, |
| env: bool = True, |
| glob_recursive: bool = True, |
| ) -> t.List[str]: |
| """Simulate Unix shell expansion with Python functions. |
| |
| See :func:`glob.glob`, :func:`os.path.expanduser`, and |
| :func:`os.path.expandvars`. |
| |
| This intended for use on Windows, where the shell does not do any |
| expansion. It may not exactly match what a Unix shell would do. |
| |
| :param args: List of command line arguments to expand. |
| :param user: Expand user home directory. |
| :param env: Expand environment variables. |
| :param glob_recursive: ``**`` matches directories recursively. |
| |
| .. versionadded:: 8.0 |
| |
| :meta private: |
| """ |
| from glob import glob |
| |
| out = [] |
| |
| for arg in args: |
| if user: |
| arg = os.path.expanduser(arg) |
| |
| if env: |
| arg = os.path.expandvars(arg) |
| |
| matches = glob(arg, recursive=glob_recursive) |
| |
| if not matches: |
| out.append(arg) |
| else: |
| out.extend(matches) |
| |
| return out |