Spaces:
Running
Running
import os | |
import re | |
import sys | |
import typing as t | |
from functools import update_wrapper | |
from types import ModuleType | |
from types import TracebackType | |
from ._compat import _default_text_stderr | |
from ._compat import _default_text_stdout | |
from ._compat import _find_binary_writer | |
from ._compat import auto_wrap_for_ansi | |
from ._compat import binary_streams | |
from ._compat import open_stream | |
from ._compat import should_strip_ansi | |
from ._compat import strip_ansi | |
from ._compat import text_streams | |
from ._compat import WIN | |
from .globals import resolve_color_default | |
if t.TYPE_CHECKING: | |
import typing_extensions as te | |
P = te.ParamSpec("P") | |
R = t.TypeVar("R") | |
def _posixify(name: str) -> str: | |
return "-".join(name.split()).lower() | |
def safecall(func: "t.Callable[P, R]") -> "t.Callable[P, t.Optional[R]]": | |
"""Wraps a function so that it swallows exceptions.""" | |
def wrapper(*args: "P.args", **kwargs: "P.kwargs") -> t.Optional[R]: | |
try: | |
return func(*args, **kwargs) | |
except Exception: | |
pass | |
return None | |
return update_wrapper(wrapper, func) | |
def make_str(value: t.Any) -> str: | |
"""Converts a value into a valid string.""" | |
if isinstance(value, bytes): | |
try: | |
return value.decode(sys.getfilesystemencoding()) | |
except UnicodeError: | |
return value.decode("utf-8", "replace") | |
return str(value) | |
def make_default_short_help(help: str, max_length: int = 45) -> str: | |
"""Returns a condensed version of help string.""" | |
# Consider only the first paragraph. | |
paragraph_end = help.find("\n\n") | |
if paragraph_end != -1: | |
help = help[:paragraph_end] | |
# Collapse newlines, tabs, and spaces. | |
words = help.split() | |
if not words: | |
return "" | |
# The first paragraph started with a "no rewrap" marker, ignore it. | |
if words[0] == "\b": | |
words = words[1:] | |
total_length = 0 | |
last_index = len(words) - 1 | |
for i, word in enumerate(words): | |
total_length += len(word) + (i > 0) | |
if total_length > max_length: # too long, truncate | |
break | |
if word[-1] == ".": # sentence end, truncate without "..." | |
return " ".join(words[: i + 1]) | |
if total_length == max_length and i != last_index: | |
break # not at sentence end, truncate with "..." | |
else: | |
return " ".join(words) # no truncation needed | |
# Account for the length of the suffix. | |
total_length += len("...") | |
# remove words until the length is short enough | |
while i > 0: | |
total_length -= len(words[i]) + (i > 0) | |
if total_length <= max_length: | |
break | |
i -= 1 | |
return " ".join(words[:i]) + "..." | |
class LazyFile: | |
"""A lazy file works like a regular file but it does not fully open | |
the file but it does perform some basic checks early to see if the | |
filename parameter does make sense. This is useful for safely opening | |
files for writing. | |
""" | |
def __init__( | |
self, | |
filename: t.Union[str, "os.PathLike[str]"], | |
mode: str = "r", | |
encoding: t.Optional[str] = None, | |
errors: t.Optional[str] = "strict", | |
atomic: bool = False, | |
): | |
self.name: str = os.fspath(filename) | |
self.mode = mode | |
self.encoding = encoding | |
self.errors = errors | |
self.atomic = atomic | |
self._f: t.Optional[t.IO[t.Any]] | |
self.should_close: bool | |
if self.name == "-": | |
self._f, self.should_close = open_stream(filename, mode, encoding, errors) | |
else: | |
if "r" in mode: | |
# Open and close the file in case we're opening it for | |
# reading so that we can catch at least some errors in | |
# some cases early. | |
open(filename, mode).close() | |
self._f = None | |
self.should_close = True | |
def __getattr__(self, name: str) -> t.Any: | |
return getattr(self.open(), name) | |
def __repr__(self) -> str: | |
if self._f is not None: | |
return repr(self._f) | |
return f"<unopened file '{format_filename(self.name)}' {self.mode}>" | |
def open(self) -> t.IO[t.Any]: | |
"""Opens the file if it's not yet open. This call might fail with | |
a :exc:`FileError`. Not handling this error will produce an error | |
that Click shows. | |
""" | |
if self._f is not None: | |
return self._f | |
try: | |
rv, self.should_close = open_stream( | |
self.name, self.mode, self.encoding, self.errors, atomic=self.atomic | |
) | |
except OSError as e: # noqa: E402 | |
from .exceptions import FileError | |
raise FileError(self.name, hint=e.strerror) from e | |
self._f = rv | |
return rv | |
def close(self) -> None: | |
"""Closes the underlying file, no matter what.""" | |
if self._f is not None: | |
self._f.close() | |
def close_intelligently(self) -> None: | |
"""This function only closes the file if it was opened by the lazy | |
file wrapper. For instance this will never close stdin. | |
""" | |
if self.should_close: | |
self.close() | |
def __enter__(self) -> "LazyFile": | |
return self | |
def __exit__( | |
self, | |
exc_type: t.Optional[t.Type[BaseException]], | |
exc_value: t.Optional[BaseException], | |
tb: t.Optional[TracebackType], | |
) -> None: | |
self.close_intelligently() | |
def __iter__(self) -> t.Iterator[t.AnyStr]: | |
self.open() | |
return iter(self._f) # type: ignore | |
class KeepOpenFile: | |
def __init__(self, file: t.IO[t.Any]) -> None: | |
self._file: t.IO[t.Any] = file | |
def __getattr__(self, name: str) -> t.Any: | |
return getattr(self._file, name) | |
def __enter__(self) -> "KeepOpenFile": | |
return self | |
def __exit__( | |
self, | |
exc_type: t.Optional[t.Type[BaseException]], | |
exc_value: t.Optional[BaseException], | |
tb: t.Optional[TracebackType], | |
) -> None: | |
pass | |
def __repr__(self) -> str: | |
return repr(self._file) | |
def __iter__(self) -> t.Iterator[t.AnyStr]: | |
return iter(self._file) | |
def echo( | |
message: t.Optional[t.Any] = None, | |
file: t.Optional[t.IO[t.Any]] = None, | |
nl: bool = True, | |
err: bool = False, | |
color: t.Optional[bool] = None, | |
) -> None: | |
"""Print a message and newline to stdout or a file. This should be | |
used instead of :func:`print` because it provides better support | |
for different data, files, and environments. | |
Compared to :func:`print`, this does the following: | |
- Ensures that the output encoding is not misconfigured on Linux. | |
- Supports Unicode in the Windows console. | |
- Supports writing to binary outputs, and supports writing bytes | |
to text outputs. | |
- Supports colors and styles on Windows. | |
- Removes ANSI color and style codes if the output does not look | |
like an interactive terminal. | |
- Always flushes the output. | |
:param message: The string or bytes to output. Other objects are | |
converted to strings. | |
:param file: The file to write to. Defaults to ``stdout``. | |
:param err: Write to ``stderr`` instead of ``stdout``. | |
:param nl: Print a newline after the message. Enabled by default. | |
:param color: Force showing or hiding colors and other styles. By | |
default Click will remove color if the output does not look like | |
an interactive terminal. | |
.. versionchanged:: 6.0 | |
Support Unicode output on the Windows console. Click does not | |
modify ``sys.stdout``, so ``sys.stdout.write()`` and ``print()`` | |
will still not support Unicode. | |
.. versionchanged:: 4.0 | |
Added the ``color`` parameter. | |
.. versionadded:: 3.0 | |
Added the ``err`` parameter. | |
.. versionchanged:: 2.0 | |
Support colors on Windows if colorama is installed. | |
""" | |
if file is None: | |
if err: | |
file = _default_text_stderr() | |
else: | |
file = _default_text_stdout() | |
# There are no standard streams attached to write to. For example, | |
# pythonw on Windows. | |
if file is None: | |
return | |
# Convert non bytes/text into the native string type. | |
if message is not None and not isinstance(message, (str, bytes, bytearray)): | |
out: t.Optional[t.Union[str, bytes]] = str(message) | |
else: | |
out = message | |
if nl: | |
out = out or "" | |
if isinstance(out, str): | |
out += "\n" | |
else: | |
out += b"\n" | |
if not out: | |
file.flush() | |
return | |
# If there is a message and the value looks like bytes, we manually | |
# need to find the binary stream and write the message in there. | |
# This is done separately so that most stream types will work as you | |
# would expect. Eg: you can write to StringIO for other cases. | |
if isinstance(out, (bytes, bytearray)): | |
binary_file = _find_binary_writer(file) | |
if binary_file is not None: | |
file.flush() | |
binary_file.write(out) | |
binary_file.flush() | |
return | |
# ANSI style code support. For no message or bytes, nothing happens. | |
# When outputting to a file instead of a terminal, strip codes. | |
else: | |
color = resolve_color_default(color) | |
if should_strip_ansi(file, color): | |
out = strip_ansi(out) | |
elif WIN: | |
if auto_wrap_for_ansi is not None: | |
file = auto_wrap_for_ansi(file) # type: ignore | |
elif not color: | |
out = strip_ansi(out) | |
file.write(out) # type: ignore | |
file.flush() | |
def get_binary_stream(name: "te.Literal['stdin', 'stdout', 'stderr']") -> t.BinaryIO: | |
"""Returns a system stream for byte processing. | |
:param name: the name of the stream to open. Valid names are ``'stdin'``, | |
``'stdout'`` and ``'stderr'`` | |
""" | |
opener = binary_streams.get(name) | |
if opener is None: | |
raise TypeError(f"Unknown standard stream '{name}'") | |
return opener() | |
def get_text_stream( | |
name: "te.Literal['stdin', 'stdout', 'stderr']", | |
encoding: t.Optional[str] = None, | |
errors: t.Optional[str] = "strict", | |
) -> t.TextIO: | |
"""Returns a system stream for text processing. This usually returns | |
a wrapped stream around a binary stream returned from | |
:func:`get_binary_stream` but it also can take shortcuts for already | |
correctly configured streams. | |
:param name: the name of the stream to open. Valid names are ``'stdin'``, | |
``'stdout'`` and ``'stderr'`` | |
:param encoding: overrides the detected default encoding. | |
:param errors: overrides the default error mode. | |
""" | |
opener = text_streams.get(name) | |
if opener is None: | |
raise TypeError(f"Unknown standard stream '{name}'") | |
return opener(encoding, errors) | |
def open_file( | |
filename: str, | |
mode: str = "r", | |
encoding: t.Optional[str] = None, | |
errors: t.Optional[str] = "strict", | |
lazy: bool = False, | |
atomic: bool = False, | |
) -> t.IO[t.Any]: | |
"""Open a file, with extra behavior to handle ``'-'`` to indicate | |
a standard stream, lazy open on write, and atomic write. Similar to | |
the behavior of the :class:`~click.File` param type. | |
If ``'-'`` is given to open ``stdout`` or ``stdin``, the stream is | |
wrapped so that using it in a context manager will not close it. | |
This makes it possible to use the function without accidentally | |
closing a standard stream: | |
.. code-block:: python | |
with open_file(filename) as f: | |
... | |
:param filename: The name of the file to open, or ``'-'`` for | |
``stdin``/``stdout``. | |
:param mode: The mode in which to open the file. | |
:param encoding: The encoding to decode or encode a file opened in | |
text mode. | |
:param errors: The error handling mode. | |
:param lazy: Wait to open the file until it is accessed. For read | |
mode, the file is temporarily opened to raise access errors | |
early, then closed until it is read again. | |
:param atomic: Write to a temporary file and replace the given file | |
on close. | |
.. versionadded:: 3.0 | |
""" | |
if lazy: | |
return t.cast( | |
t.IO[t.Any], LazyFile(filename, mode, encoding, errors, atomic=atomic) | |
) | |
f, should_close = open_stream(filename, mode, encoding, errors, atomic=atomic) | |
if not should_close: | |
f = t.cast(t.IO[t.Any], KeepOpenFile(f)) | |
return f | |
def format_filename( | |
filename: "t.Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]", | |
shorten: bool = False, | |
) -> str: | |
"""Format a filename as a string for display. Ensures the filename can be | |
displayed by replacing any invalid bytes or surrogate escapes in the name | |
with the replacement character ``�``. | |
Invalid bytes or surrogate escapes will raise an error when written to a | |
stream with ``errors="strict". This will typically happen with ``stdout`` | |
when the locale is something like ``en_GB.UTF-8``. | |
Many scenarios *are* safe to write surrogates though, due to PEP 538 and | |
PEP 540, including: | |
- Writing to ``stderr``, which uses ``errors="backslashreplace"``. | |
- The system has ``LANG=C.UTF-8``, ``C``, or ``POSIX``. Python opens | |
stdout and stderr with ``errors="surrogateescape"``. | |
- None of ``LANG/LC_*`` are set. Python assumes ``LANG=C.UTF-8``. | |
- Python is started in UTF-8 mode with ``PYTHONUTF8=1`` or ``-X utf8``. | |
Python opens stdout and stderr with ``errors="surrogateescape"``. | |
:param filename: formats a filename for UI display. This will also convert | |
the filename into unicode without failing. | |
:param shorten: this optionally shortens the filename to strip of the | |
path that leads up to it. | |
""" | |
if shorten: | |
filename = os.path.basename(filename) | |
else: | |
filename = os.fspath(filename) | |
if isinstance(filename, bytes): | |
filename = filename.decode(sys.getfilesystemencoding(), "replace") | |
else: | |
filename = filename.encode("utf-8", "surrogateescape").decode( | |
"utf-8", "replace" | |
) | |
return filename | |
def get_app_dir(app_name: str, roaming: bool = True, force_posix: bool = False) -> str: | |
r"""Returns the config folder for the application. The default behavior | |
is to return whatever is most appropriate for the operating system. | |
To give you an idea, for an app called ``"Foo Bar"``, something like | |
the following folders could be returned: | |
Mac OS X: | |
``~/Library/Application Support/Foo Bar`` | |
Mac OS X (POSIX): | |
``~/.foo-bar`` | |
Unix: | |
``~/.config/foo-bar`` | |
Unix (POSIX): | |
``~/.foo-bar`` | |
Windows (roaming): | |
``C:\Users\<user>\AppData\Roaming\Foo Bar`` | |
Windows (not roaming): | |
``C:\Users\<user>\AppData\Local\Foo Bar`` | |
.. versionadded:: 2.0 | |
:param app_name: the application name. This should be properly capitalized | |
and can contain whitespace. | |
:param roaming: controls if the folder should be roaming or not on Windows. | |
Has no effect otherwise. | |
:param force_posix: if this is set to `True` then on any POSIX system the | |
folder will be stored in the home folder with a leading | |
dot instead of the XDG config home or darwin's | |
application support folder. | |
""" | |
if WIN: | |
key = "APPDATA" if roaming else "LOCALAPPDATA" | |
folder = os.environ.get(key) | |
if folder is None: | |
folder = os.path.expanduser("~") | |
return os.path.join(folder, app_name) | |
if force_posix: | |
return os.path.join(os.path.expanduser(f"~/.{_posixify(app_name)}")) | |
if sys.platform == "darwin": | |
return os.path.join( | |
os.path.expanduser("~/Library/Application Support"), app_name | |
) | |
return os.path.join( | |
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), | |
_posixify(app_name), | |
) | |
class PacifyFlushWrapper: | |
"""This wrapper is used to catch and suppress BrokenPipeErrors resulting | |
from ``.flush()`` being called on broken pipe during the shutdown/final-GC | |
of the Python interpreter. Notably ``.flush()`` is always called on | |
``sys.stdout`` and ``sys.stderr``. So as to have minimal impact on any | |
other cleanup code, and the case where the underlying file is not a broken | |
pipe, all calls and attributes are proxied. | |
""" | |
def __init__(self, wrapped: t.IO[t.Any]) -> None: | |
self.wrapped = wrapped | |
def flush(self) -> None: | |
try: | |
self.wrapped.flush() | |
except OSError as e: | |
import errno | |
if e.errno != errno.EPIPE: | |
raise | |
def __getattr__(self, attr: str) -> t.Any: | |
return getattr(self.wrapped, attr) | |
def _detect_program_name( | |
path: t.Optional[str] = None, _main: t.Optional[ModuleType] = None | |
) -> str: | |
"""Determine the command used to run the program, for use in help | |
text. If a file or entry point was executed, the file name is | |
returned. If ``python -m`` was used to execute a module or package, | |
``python -m name`` is returned. | |
This doesn't try to be too precise, the goal is to give a concise | |
name for help text. Files are only shown as their name without the | |
path. ``python`` is only shown for modules, and the full path to | |
``sys.executable`` is not shown. | |
:param path: The Python file being executed. Python puts this in | |
``sys.argv[0]``, which is used by default. | |
:param _main: The ``__main__`` module. This should only be passed | |
during internal testing. | |
.. versionadded:: 8.0 | |
Based on command args detection in the Werkzeug reloader. | |
:meta private: | |
""" | |
if _main is None: | |
_main = sys.modules["__main__"] | |
if not path: | |
path = sys.argv[0] | |
# The value of __package__ indicates how Python was called. It may | |
# not exist if a setuptools script is installed as an egg. It may be | |
# set incorrectly for entry points created with pip on Windows. | |
# It is set to "" inside a Shiv or PEX zipapp. | |
if getattr(_main, "__package__", None) in {None, ""} or ( | |
os.name == "nt" | |
and _main.__package__ == "" | |
and not os.path.exists(path) | |
and os.path.exists(f"{path}.exe") | |
): | |
# Executed a file, like "python app.py". | |
return os.path.basename(path) | |
# Executed a module, like "python -m example". | |
# Rewritten by Python from "-m script" to "/path/to/script.py". | |
# Need to look at main module to determine how it was executed. | |
py_module = t.cast(str, _main.__package__) | |
name = os.path.splitext(os.path.basename(path))[0] | |
# A submodule like "example.cli". | |
if name != "__main__": | |
py_module = f"{py_module}.{name}" | |
return f"python -m {py_module.lstrip('.')}" | |
def _expand_args( | |
args: t.Iterable[str], | |
*, | |
user: bool = True, | |
env: bool = True, | |
glob_recursive: bool = True, | |
) -> t.List[str]: | |
"""Simulate Unix shell expansion with Python functions. | |
See :func:`glob.glob`, :func:`os.path.expanduser`, and | |
:func:`os.path.expandvars`. | |
This is intended for use on Windows, where the shell does not do any | |
expansion. It may not exactly match what a Unix shell would do. | |
:param args: List of command line arguments to expand. | |
:param user: Expand user home directory. | |
:param env: Expand environment variables. | |
:param glob_recursive: ``**`` matches directories recursively. | |
.. versionchanged:: 8.1 | |
Invalid glob patterns are treated as empty expansions rather | |
than raising an error. | |
.. versionadded:: 8.0 | |
:meta private: | |
""" | |
from glob import glob | |
out = [] | |
for arg in args: | |
if user: | |
arg = os.path.expanduser(arg) | |
if env: | |
arg = os.path.expandvars(arg) | |
try: | |
matches = glob(arg, recursive=glob_recursive) | |
except re.error: | |
matches = [] | |
if not matches: | |
out.append(arg) | |
else: | |
out.extend(matches) | |
return out | |