Spaces:
Sleeping
Sleeping
import contextlib | |
import itertools | |
import logging | |
import sys | |
import time | |
from typing import IO, Generator, Optional | |
from pip._internal.utils.compat import WINDOWS | |
from pip._internal.utils.logging import get_indentation | |
logger = logging.getLogger(__name__) | |
class SpinnerInterface: | |
def spin(self) -> None: | |
raise NotImplementedError() | |
def finish(self, final_status: str) -> None: | |
raise NotImplementedError() | |
class InteractiveSpinner(SpinnerInterface): | |
def __init__( | |
self, | |
message: str, | |
file: Optional[IO[str]] = None, | |
spin_chars: str = "-\\|/", | |
# Empirically, 8 updates/second looks nice | |
min_update_interval_seconds: float = 0.125, | |
): | |
self._message = message | |
if file is None: | |
file = sys.stdout | |
self._file = file | |
self._rate_limiter = RateLimiter(min_update_interval_seconds) | |
self._finished = False | |
self._spin_cycle = itertools.cycle(spin_chars) | |
self._file.write(" " * get_indentation() + self._message + " ... ") | |
self._width = 0 | |
def _write(self, status: str) -> None: | |
assert not self._finished | |
# Erase what we wrote before by backspacing to the beginning, writing | |
# spaces to overwrite the old text, and then backspacing again | |
backup = "\b" * self._width | |
self._file.write(backup + " " * self._width + backup) | |
# Now we have a blank slate to add our status | |
self._file.write(status) | |
self._width = len(status) | |
self._file.flush() | |
self._rate_limiter.reset() | |
def spin(self) -> None: | |
if self._finished: | |
return | |
if not self._rate_limiter.ready(): | |
return | |
self._write(next(self._spin_cycle)) | |
def finish(self, final_status: str) -> None: | |
if self._finished: | |
return | |
self._write(final_status) | |
self._file.write("\n") | |
self._file.flush() | |
self._finished = True | |
# Used for dumb terminals, non-interactive installs (no tty), etc. | |
# We still print updates occasionally (once every 60 seconds by default) to | |
# act as a keep-alive for systems like Travis-CI that take lack-of-output as | |
# an indication that a task has frozen. | |
class NonInteractiveSpinner(SpinnerInterface): | |
def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None: | |
self._message = message | |
self._finished = False | |
self._rate_limiter = RateLimiter(min_update_interval_seconds) | |
self._update("started") | |
def _update(self, status: str) -> None: | |
assert not self._finished | |
self._rate_limiter.reset() | |
logger.info("%s: %s", self._message, status) | |
def spin(self) -> None: | |
if self._finished: | |
return | |
if not self._rate_limiter.ready(): | |
return | |
self._update("still running...") | |
def finish(self, final_status: str) -> None: | |
if self._finished: | |
return | |
self._update(f"finished with status '{final_status}'") | |
self._finished = True | |
class RateLimiter: | |
def __init__(self, min_update_interval_seconds: float) -> None: | |
self._min_update_interval_seconds = min_update_interval_seconds | |
self._last_update: float = 0 | |
def ready(self) -> bool: | |
now = time.time() | |
delta = now - self._last_update | |
return delta >= self._min_update_interval_seconds | |
def reset(self) -> None: | |
self._last_update = time.time() | |
def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]: | |
# Interactive spinner goes directly to sys.stdout rather than being routed | |
# through the logging system, but it acts like it has level INFO, | |
# i.e. it's only displayed if we're at level INFO or better. | |
# Non-interactive spinner goes through the logging system, so it is always | |
# in sync with logging configuration. | |
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: | |
spinner: SpinnerInterface = InteractiveSpinner(message) | |
else: | |
spinner = NonInteractiveSpinner(message) | |
try: | |
with hidden_cursor(sys.stdout): | |
yield spinner | |
except KeyboardInterrupt: | |
spinner.finish("canceled") | |
raise | |
except Exception: | |
spinner.finish("error") | |
raise | |
else: | |
spinner.finish("done") | |
HIDE_CURSOR = "\x1b[?25l" | |
SHOW_CURSOR = "\x1b[?25h" | |
def hidden_cursor(file: IO[str]) -> Generator[None, None, None]: | |
# The Windows terminal does not support the hide/show cursor ANSI codes, | |
# even via colorama. So don't even try. | |
if WINDOWS: | |
yield | |
# We don't want to clutter the output with control characters if we're | |
# writing to a file, or if the user is running with --quiet. | |
# See https://github.com/pypa/pip/issues/3418 | |
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: | |
yield | |
else: | |
file.write(HIDE_CURSOR) | |
try: | |
yield | |
finally: | |
file.write(SHOW_CURSOR) | |