Spaces:
Running
Running
import contextlib | |
import io | |
import os | |
import shlex | |
import shutil | |
import sys | |
import tempfile | |
import typing as t | |
from types import TracebackType | |
from . import formatting | |
from . import termui | |
from . import utils | |
from ._compat import _find_binary_reader | |
if t.TYPE_CHECKING: | |
from .core import BaseCommand | |
class EchoingStdin: | |
def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: | |
self._input = input | |
self._output = output | |
self._paused = False | |
def __getattr__(self, x: str) -> t.Any: | |
return getattr(self._input, x) | |
def _echo(self, rv: bytes) -> bytes: | |
if not self._paused: | |
self._output.write(rv) | |
return rv | |
def read(self, n: int = -1) -> bytes: | |
return self._echo(self._input.read(n)) | |
def read1(self, n: int = -1) -> bytes: | |
return self._echo(self._input.read1(n)) # type: ignore | |
def readline(self, n: int = -1) -> bytes: | |
return self._echo(self._input.readline(n)) | |
def readlines(self) -> t.List[bytes]: | |
return [self._echo(x) for x in self._input.readlines()] | |
def __iter__(self) -> t.Iterator[bytes]: | |
return iter(self._echo(x) for x in self._input) | |
def __repr__(self) -> str: | |
return repr(self._input) | |
def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]: | |
if stream is None: | |
yield | |
else: | |
stream._paused = True | |
yield | |
stream._paused = False | |
class _NamedTextIOWrapper(io.TextIOWrapper): | |
def __init__( | |
self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any | |
) -> None: | |
super().__init__(buffer, **kwargs) | |
self._name = name | |
self._mode = mode | |
def name(self) -> str: | |
return self._name | |
def mode(self) -> str: | |
return self._mode | |
def make_input_stream( | |
input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str | |
) -> t.BinaryIO: | |
# Is already an input stream. | |
if hasattr(input, "read"): | |
rv = _find_binary_reader(t.cast(t.IO[t.Any], input)) | |
if rv is not None: | |
return rv | |
raise TypeError("Could not find binary reader for input stream.") | |
if input is None: | |
input = b"" | |
elif isinstance(input, str): | |
input = input.encode(charset) | |
return io.BytesIO(input) | |
class Result: | |
"""Holds the captured result of an invoked CLI script.""" | |
def __init__( | |
self, | |
runner: "CliRunner", | |
stdout_bytes: bytes, | |
stderr_bytes: t.Optional[bytes], | |
return_value: t.Any, | |
exit_code: int, | |
exception: t.Optional[BaseException], | |
exc_info: t.Optional[ | |
t.Tuple[t.Type[BaseException], BaseException, TracebackType] | |
] = None, | |
): | |
#: The runner that created the result | |
self.runner = runner | |
#: The standard output as bytes. | |
self.stdout_bytes = stdout_bytes | |
#: The standard error as bytes, or None if not available | |
self.stderr_bytes = stderr_bytes | |
#: The value returned from the invoked command. | |
#: | |
#: .. versionadded:: 8.0 | |
self.return_value = return_value | |
#: The exit code as integer. | |
self.exit_code = exit_code | |
#: The exception that happened if one did. | |
self.exception = exception | |
#: The traceback | |
self.exc_info = exc_info | |
def output(self) -> str: | |
"""The (standard) output as unicode string.""" | |
return self.stdout | |
def stdout(self) -> str: | |
"""The standard output as unicode string.""" | |
return self.stdout_bytes.decode(self.runner.charset, "replace").replace( | |
"\r\n", "\n" | |
) | |
def stderr(self) -> str: | |
"""The standard error as unicode string.""" | |
if self.stderr_bytes is None: | |
raise ValueError("stderr not separately captured") | |
return self.stderr_bytes.decode(self.runner.charset, "replace").replace( | |
"\r\n", "\n" | |
) | |
def __repr__(self) -> str: | |
exc_str = repr(self.exception) if self.exception else "okay" | |
return f"<{type(self).__name__} {exc_str}>" | |
class CliRunner: | |
"""The CLI runner provides functionality to invoke a Click command line | |
script for unittesting purposes in a isolated environment. This only | |
works in single-threaded systems without any concurrency as it changes the | |
global interpreter state. | |
:param charset: the character set for the input and output data. | |
:param env: a dictionary with environment variables for overriding. | |
:param echo_stdin: if this is set to `True`, then reading from stdin writes | |
to stdout. This is useful for showing examples in | |
some circumstances. Note that regular prompts | |
will automatically echo the input. | |
:param mix_stderr: if this is set to `False`, then stdout and stderr are | |
preserved as independent streams. This is useful for | |
Unix-philosophy apps that have predictable stdout and | |
noisy stderr, such that each may be measured | |
independently | |
""" | |
def __init__( | |
self, | |
charset: str = "utf-8", | |
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, | |
echo_stdin: bool = False, | |
mix_stderr: bool = True, | |
) -> None: | |
self.charset = charset | |
self.env: t.Mapping[str, t.Optional[str]] = env or {} | |
self.echo_stdin = echo_stdin | |
self.mix_stderr = mix_stderr | |
def get_default_prog_name(self, cli: "BaseCommand") -> str: | |
"""Given a command object it will return the default program name | |
for it. The default is the `name` attribute or ``"root"`` if not | |
set. | |
""" | |
return cli.name or "root" | |
def make_env( | |
self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None | |
) -> t.Mapping[str, t.Optional[str]]: | |
"""Returns the environment overrides for invoking a script.""" | |
rv = dict(self.env) | |
if overrides: | |
rv.update(overrides) | |
return rv | |
def isolation( | |
self, | |
input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, | |
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, | |
color: bool = False, | |
) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]: | |
"""A context manager that sets up the isolation for invoking of a | |
command line tool. This sets up stdin with the given input data | |
and `os.environ` with the overrides from the given dictionary. | |
This also rebinds some internals in Click to be mocked (like the | |
prompt functionality). | |
This is automatically done in the :meth:`invoke` method. | |
:param input: the input stream to put into sys.stdin. | |
:param env: the environment overrides as dictionary. | |
:param color: whether the output should contain color codes. The | |
application can still override this explicitly. | |
.. versionchanged:: 8.0 | |
``stderr`` is opened with ``errors="backslashreplace"`` | |
instead of the default ``"strict"``. | |
.. versionchanged:: 4.0 | |
Added the ``color`` parameter. | |
""" | |
bytes_input = make_input_stream(input, self.charset) | |
echo_input = None | |
old_stdin = sys.stdin | |
old_stdout = sys.stdout | |
old_stderr = sys.stderr | |
old_forced_width = formatting.FORCED_WIDTH | |
formatting.FORCED_WIDTH = 80 | |
env = self.make_env(env) | |
bytes_output = io.BytesIO() | |
if self.echo_stdin: | |
bytes_input = echo_input = t.cast( | |
t.BinaryIO, EchoingStdin(bytes_input, bytes_output) | |
) | |
sys.stdin = text_input = _NamedTextIOWrapper( | |
bytes_input, encoding=self.charset, name="<stdin>", mode="r" | |
) | |
if self.echo_stdin: | |
# Force unbuffered reads, otherwise TextIOWrapper reads a | |
# large chunk which is echoed early. | |
text_input._CHUNK_SIZE = 1 # type: ignore | |
sys.stdout = _NamedTextIOWrapper( | |
bytes_output, encoding=self.charset, name="<stdout>", mode="w" | |
) | |
bytes_error = None | |
if self.mix_stderr: | |
sys.stderr = sys.stdout | |
else: | |
bytes_error = io.BytesIO() | |
sys.stderr = _NamedTextIOWrapper( | |
bytes_error, | |
encoding=self.charset, | |
name="<stderr>", | |
mode="w", | |
errors="backslashreplace", | |
) | |
# type: ignore | |
def visible_input(prompt: t.Optional[str] = None) -> str: | |
sys.stdout.write(prompt or "") | |
val = text_input.readline().rstrip("\r\n") | |
sys.stdout.write(f"{val}\n") | |
sys.stdout.flush() | |
return val | |
# type: ignore | |
def hidden_input(prompt: t.Optional[str] = None) -> str: | |
sys.stdout.write(f"{prompt or ''}\n") | |
sys.stdout.flush() | |
return text_input.readline().rstrip("\r\n") | |
# type: ignore | |
def _getchar(echo: bool) -> str: | |
char = sys.stdin.read(1) | |
if echo: | |
sys.stdout.write(char) | |
sys.stdout.flush() | |
return char | |
default_color = color | |
def should_strip_ansi( | |
stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None | |
) -> bool: | |
if color is None: | |
return not default_color | |
return not color | |
old_visible_prompt_func = termui.visible_prompt_func | |
old_hidden_prompt_func = termui.hidden_prompt_func | |
old__getchar_func = termui._getchar | |
old_should_strip_ansi = utils.should_strip_ansi # type: ignore | |
termui.visible_prompt_func = visible_input | |
termui.hidden_prompt_func = hidden_input | |
termui._getchar = _getchar | |
utils.should_strip_ansi = should_strip_ansi # type: ignore | |
old_env = {} | |
try: | |
for key, value in env.items(): | |
old_env[key] = os.environ.get(key) | |
if value is None: | |
try: | |
del os.environ[key] | |
except Exception: | |
pass | |
else: | |
os.environ[key] = value | |
yield (bytes_output, bytes_error) | |
finally: | |
for key, value in old_env.items(): | |
if value is None: | |
try: | |
del os.environ[key] | |
except Exception: | |
pass | |
else: | |
os.environ[key] = value | |
sys.stdout = old_stdout | |
sys.stderr = old_stderr | |
sys.stdin = old_stdin | |
termui.visible_prompt_func = old_visible_prompt_func | |
termui.hidden_prompt_func = old_hidden_prompt_func | |
termui._getchar = old__getchar_func | |
utils.should_strip_ansi = old_should_strip_ansi # type: ignore | |
formatting.FORCED_WIDTH = old_forced_width | |
def invoke( | |
self, | |
cli: "BaseCommand", | |
args: t.Optional[t.Union[str, t.Sequence[str]]] = None, | |
input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, | |
env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, | |
catch_exceptions: bool = True, | |
color: bool = False, | |
**extra: t.Any, | |
) -> Result: | |
"""Invokes a command in an isolated environment. The arguments are | |
forwarded directly to the command line script, the `extra` keyword | |
arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
the command. | |
This returns a :class:`Result` object. | |
:param cli: the command to invoke | |
:param args: the arguments to invoke. It may be given as an iterable | |
or a string. When given as string it will be interpreted | |
as a Unix shell command. More details at | |
:func:`shlex.split`. | |
:param input: the input data for `sys.stdin`. | |
:param env: the environment overrides. | |
:param catch_exceptions: Whether to catch any other exceptions than | |
``SystemExit``. | |
:param extra: the keyword arguments to pass to :meth:`main`. | |
:param color: whether the output should contain color codes. The | |
application can still override this explicitly. | |
.. versionchanged:: 8.0 | |
The result object has the ``return_value`` attribute with | |
the value returned from the invoked command. | |
.. versionchanged:: 4.0 | |
Added the ``color`` parameter. | |
.. versionchanged:: 3.0 | |
Added the ``catch_exceptions`` parameter. | |
.. versionchanged:: 3.0 | |
The result object has the ``exc_info`` attribute with the | |
traceback if available. | |
""" | |
exc_info = None | |
with self.isolation(input=input, env=env, color=color) as outstreams: | |
return_value = None | |
exception: t.Optional[BaseException] = None | |
exit_code = 0 | |
if isinstance(args, str): | |
args = shlex.split(args) | |
try: | |
prog_name = extra.pop("prog_name") | |
except KeyError: | |
prog_name = self.get_default_prog_name(cli) | |
try: | |
return_value = cli.main(args=args or (), prog_name=prog_name, **extra) | |
except SystemExit as e: | |
exc_info = sys.exc_info() | |
e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code) | |
if e_code is None: | |
e_code = 0 | |
if e_code != 0: | |
exception = e | |
if not isinstance(e_code, int): | |
sys.stdout.write(str(e_code)) | |
sys.stdout.write("\n") | |
e_code = 1 | |
exit_code = e_code | |
except Exception as e: | |
if not catch_exceptions: | |
raise | |
exception = e | |
exit_code = 1 | |
exc_info = sys.exc_info() | |
finally: | |
sys.stdout.flush() | |
stdout = outstreams[0].getvalue() | |
if self.mix_stderr: | |
stderr = None | |
else: | |
stderr = outstreams[1].getvalue() # type: ignore | |
return Result( | |
runner=self, | |
stdout_bytes=stdout, | |
stderr_bytes=stderr, | |
return_value=return_value, | |
exit_code=exit_code, | |
exception=exception, | |
exc_info=exc_info, # type: ignore | |
) | |
def isolated_filesystem( | |
self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None | |
) -> t.Iterator[str]: | |
"""A context manager that creates a temporary directory and | |
changes the current working directory to it. This isolates tests | |
that affect the contents of the CWD to prevent them from | |
interfering with each other. | |
:param temp_dir: Create the temporary directory under this | |
directory. If given, the created directory is not removed | |
when exiting. | |
.. versionchanged:: 8.0 | |
Added the ``temp_dir`` parameter. | |
""" | |
cwd = os.getcwd() | |
dt = tempfile.mkdtemp(dir=temp_dir) | |
os.chdir(dt) | |
try: | |
yield dt | |
finally: | |
os.chdir(cwd) | |
if temp_dir is None: | |
try: | |
shutil.rmtree(dt) | |
except OSError: # noqa: B014 | |
pass | |