|
"""Contains the base class :class:`.BaseSimplePrompt`.""" |
|
import os |
|
import re |
|
from abc import ABC, abstractmethod |
|
from typing import ( |
|
TYPE_CHECKING, |
|
Any, |
|
Callable, |
|
Dict, |
|
List, |
|
Optional, |
|
Tuple, |
|
Union, |
|
cast, |
|
) |
|
|
|
from prompt_toolkit.enums import EditingMode |
|
from prompt_toolkit.filters.base import Condition, FilterOrBool |
|
from prompt_toolkit.key_binding.key_bindings import KeyBindings, KeyHandlerCallable |
|
from prompt_toolkit.keys import Keys |
|
from prompt_toolkit.styles.style import Style |
|
from prompt_toolkit.validation import Validator |
|
|
|
from InquirerPy.enum import INQUIRERPY_KEYBOARD_INTERRUPT |
|
from InquirerPy.exceptions import RequiredKeyNotFound |
|
from InquirerPy.utils import ( |
|
InquirerPyMessage, |
|
InquirerPySessionResult, |
|
InquirerPyStyle, |
|
InquirerPyValidate, |
|
get_style, |
|
) |
|
|
|
if TYPE_CHECKING: |
|
from prompt_toolkit.key_binding.key_processor import KeyPressEvent |
|
|
|
|
|
class BaseSimplePrompt(ABC): |
|
"""The base class to create a simple terminal input prompt. |
|
|
|
Note: |
|
No actual :class:`~prompt_toolkit.application.Application` is created by this class. |
|
This class only creates some common interface and attributes that can be easily used |
|
by `prompt_toolkit`. |
|
|
|
To have a functional prompt, you'll at least have to implement the :meth:`.BaseSimplePrompt._run` |
|
and :meth:`.BaseSimplePrompt._get_prompt_message`. |
|
|
|
See Also: |
|
:class:`~InquirerPy.prompts.input.InputPrompt` |
|
""" |
|
|
|
def __init__( |
|
self, |
|
message: InquirerPyMessage, |
|
style: Optional[InquirerPyStyle] = None, |
|
vi_mode: bool = False, |
|
qmark: str = "?", |
|
amark: str = "?", |
|
instruction: str = "", |
|
validate: Optional[InquirerPyValidate] = None, |
|
invalid_message: str = "Invalid input", |
|
transformer: Optional[Callable[[Any], Any]] = None, |
|
filter: Optional[Callable[[Any], Any]] = None, |
|
default: Any = "", |
|
wrap_lines: bool = True, |
|
raise_keyboard_interrupt: bool = True, |
|
mandatory: bool = True, |
|
mandatory_message: str = "Mandatory prompt", |
|
session_result: Optional[InquirerPySessionResult] = None, |
|
) -> None: |
|
self._mandatory = mandatory |
|
self._mandatory_message = mandatory_message |
|
self._result = session_result or {} |
|
self._message = ( |
|
message |
|
if not isinstance(message, Callable) |
|
else cast(Callable, message)(self._result) |
|
) |
|
self._instruction = instruction |
|
self._default = ( |
|
default if not isinstance(default, Callable) else default(self._result) |
|
) |
|
self._style = Style.from_dict(style.dict if style else get_style().dict) |
|
self._qmark = qmark |
|
self._amark = amark |
|
self._status = {"answered": False, "result": None, "skipped": False} |
|
self._kb = KeyBindings() |
|
self._lexer = "class:input" |
|
self._transformer = transformer |
|
self._filter = filter |
|
self._wrap_lines = wrap_lines |
|
self._editing_mode = ( |
|
EditingMode.VI |
|
if vi_mode or bool(os.getenv("INQUIRERPY_VI_MODE", False)) |
|
else EditingMode.EMACS |
|
) |
|
if isinstance(validate, Validator): |
|
self._validator = validate |
|
else: |
|
self._validator = Validator.from_callable( |
|
validate if validate else lambda _: True, |
|
invalid_message, |
|
move_cursor_to_end=True, |
|
) |
|
self._raise_kbi = not os.getenv( |
|
"INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt |
|
) |
|
self._is_rasing_kbi = Condition(lambda: self._raise_kbi) |
|
|
|
self._kb_maps = { |
|
"answer": [{"key": Keys.Enter}], |
|
"interrupt": [ |
|
{"key": "c-c", "filter": self._is_rasing_kbi}, |
|
{"key": "c-d", "filter": ~self._is_rasing_kbi}, |
|
], |
|
"skip": [{"key": "c-z"}, {"key": "c-c", "filter": ~self._is_rasing_kbi}], |
|
} |
|
self._kb_func_lookup = { |
|
"answer": [{"func": self._handle_enter}], |
|
"interrupt": [{"func": self._handle_interrupt}], |
|
"skip": [{"func": self._handle_skip}], |
|
} |
|
|
|
def _keybinding_factory(self): |
|
"""Register all keybindings in `self._kb_maps`. |
|
|
|
It's required to call this function at the end of prompt constructor if |
|
it inherits from :class:`~InquirerPy.base.simple.BaseSimplePrompt` or |
|
:class:`~InquirerPy.base.complex.BaseComplexPrompt`. |
|
""" |
|
|
|
def _factory(keys, filter, action): |
|
if action not in self.kb_func_lookup: |
|
raise RequiredKeyNotFound(f"keybinding action {action} not found") |
|
if not isinstance(keys, list): |
|
keys = [keys] |
|
|
|
@self.register_kb(*keys, filter=filter) |
|
def _(event): |
|
for method in self.kb_func_lookup[action]: |
|
method["func"](event, *method.get("args", [])) |
|
|
|
for key, item in self.kb_maps.items(): |
|
if not isinstance(item, list): |
|
item = [item] |
|
for kb in item: |
|
_factory(kb["key"], kb.get("filter", Condition(lambda: True)), key) |
|
|
|
@abstractmethod |
|
def _set_error(self, message: str) -> None: |
|
"""Set the error message for the prompt. |
|
|
|
Args: |
|
message: Error message to set. |
|
""" |
|
pass |
|
|
|
def _handle_skip(self, event: Optional["KeyPressEvent"]) -> None: |
|
"""Handle the event when attempting to skip a prompt. |
|
|
|
Skip the prompt if the `_mandatory` field is False, otherwise |
|
show an error message that the prompt cannot be skipped. |
|
""" |
|
if not self._mandatory: |
|
self.status["answered"] = True |
|
self.status["skipped"] = True |
|
self.status["result"] = None |
|
if event: |
|
event.app.exit(result=None) |
|
else: |
|
self._set_error(message=self._mandatory_message) |
|
|
|
def _handle_interrupt(self, event: Optional["KeyPressEvent"]) -> None: |
|
"""Handle the event when a KeyboardInterrupt signal is sent.""" |
|
self.status["answered"] = True |
|
self.status["result"] = INQUIRERPY_KEYBOARD_INTERRUPT |
|
self.status["skipped"] = True |
|
if event: |
|
event.app.exit(result=INQUIRERPY_KEYBOARD_INTERRUPT) |
|
|
|
@abstractmethod |
|
def _handle_enter(self, event: Optional["KeyPressEvent"]) -> None: |
|
"""Handle the event when user attempt to answer the question.""" |
|
pass |
|
|
|
@property |
|
def status(self) -> Dict[str, Any]: |
|
"""Dict[str, Any]: Get current prompt status. |
|
|
|
The status contains 3 keys: "answered" and "result". |
|
answered: If the current prompt is answered. |
|
result: The result of the user answer. |
|
skipped: If the prompt is skipped. |
|
""" |
|
return self._status |
|
|
|
@status.setter |
|
def status(self, value) -> None: |
|
self._status = value |
|
|
|
def register_kb( |
|
self, *keys: Union[Keys, str], filter: FilterOrBool = True, **kwargs |
|
) -> Callable[[KeyHandlerCallable], KeyHandlerCallable]: |
|
"""Keybinding registration decorator. |
|
|
|
This decorator wraps around the :meth:`prompt_toolkit.key_binding.KeyBindings.add` with |
|
added feature to process `alt` realted keybindings. |
|
|
|
By default, `prompt_toolkit` doesn't process `alt` related keybindings, |
|
it requires `alt-ANY` to `escape` + `ANY`. |
|
|
|
Args: |
|
keys: The keys to bind that can trigger the function. |
|
filter: :class:`~prompt_toolkit.filter.Condition` to indicate if this keybinding should be active. |
|
|
|
Returns: |
|
A decorator that should be applied to the function thats intended to be active when the keys |
|
are pressed. |
|
|
|
Examples: |
|
>>> @self.register_kb("alt-j") |
|
... def test(event): |
|
... pass |
|
""" |
|
alt_pattern = re.compile(r"^alt-(.*)") |
|
|
|
def decorator(func: KeyHandlerCallable) -> KeyHandlerCallable: |
|
formatted_keys = [] |
|
for key in keys: |
|
match = alt_pattern.match(key) |
|
if match: |
|
formatted_keys.append("escape") |
|
formatted_keys.append(match.group(1)) |
|
else: |
|
formatted_keys.append(key) |
|
|
|
@self._kb.add(*formatted_keys, filter=filter, **kwargs) |
|
def executable(event) -> None: |
|
func(event) |
|
|
|
return executable |
|
|
|
return decorator |
|
|
|
@abstractmethod |
|
def _get_prompt_message( |
|
self, pre_answer: Tuple[str, str], post_answer: Tuple[str, str] |
|
) -> List[Tuple[str, str]]: |
|
"""Get the question message in formatted text form to display in the prompt. |
|
|
|
This function is mainly used to render the question message dynamically based |
|
on the current status (answered or not answered) of the prompt. |
|
|
|
Note: |
|
The function requires implementation when inheriting :class:`.BaseSimplePrompt`. |
|
You should call `super()._get_prompt_message(pre_answer, post_answer)` in |
|
the implemented `_get_prompt_message`. |
|
|
|
Args: |
|
pre_answer: The message to display before the question is answered. |
|
post_answer: The information to display after the question is answered. |
|
|
|
Returns: |
|
Formatted text in list of tuple format. |
|
""" |
|
display_message = [] |
|
if self.status["skipped"]: |
|
display_message.append(("class:skipped", self._qmark)) |
|
display_message.append( |
|
("class:skipped", "%s%s " % (" " if self._qmark else "", self._message)) |
|
) |
|
elif self.status["answered"]: |
|
display_message.append(("class:answermark", self._amark)) |
|
display_message.append( |
|
( |
|
"class:answered_question", |
|
"%s%s" % (" " if self._amark else "", self._message), |
|
) |
|
) |
|
display_message.append( |
|
post_answer |
|
if not self._transformer |
|
else ( |
|
"class:answer", |
|
" %s" % self._transformer(self.status["result"]), |
|
) |
|
) |
|
else: |
|
display_message.append(("class:questionmark", self._qmark)) |
|
display_message.append( |
|
( |
|
"class:question", |
|
"%s%s" % (" " if self._qmark else "", self._message), |
|
) |
|
) |
|
display_message.append(pre_answer) |
|
return display_message |
|
|
|
@abstractmethod |
|
def _run(self) -> Any: |
|
"""Abstractmethod to enforce a run function is implemented. |
|
|
|
All prompt instance requires a `_run` call to initialise and run an instance of |
|
`PromptSession` or `Application`. |
|
""" |
|
pass |
|
|
|
@abstractmethod |
|
async def _run_async(self) -> Any: |
|
"""Abstractmethod to enforce a run function is implemented. |
|
|
|
All prompt instance requires a `_run_async` call to initialise and run an instance of |
|
`PromptSession` or `Application`. |
|
""" |
|
pass |
|
|
|
def execute(self, raise_keyboard_interrupt: Optional[bool] = None) -> Any: |
|
"""Run the prompt and get the result. |
|
|
|
Args: |
|
raise_keyboard_interrupt: **Deprecated**. Set this parameter on the prompt initialisation instead. |
|
|
|
Returns: |
|
Value of the user answer. Types varies depending on the prompt. |
|
|
|
Raises: |
|
KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. |
|
""" |
|
result = self._run() |
|
if raise_keyboard_interrupt is not None: |
|
self._raise_kbi = not os.getenv( |
|
"INQUIRERPY_NO_RAISE_KBI", not raise_keyboard_interrupt |
|
) |
|
if result == INQUIRERPY_KEYBOARD_INTERRUPT: |
|
raise KeyboardInterrupt |
|
if not self._filter: |
|
return result |
|
return self._filter(result) |
|
|
|
async def execute_async(self) -> None: |
|
"""Run the prompt asynchronously and get the result. |
|
|
|
Returns: |
|
Value of the user answer. Types varies depending on the prompt. |
|
|
|
Raises: |
|
KeyboardInterrupt: When `ctrl-c` is pressed and `raise_keyboard_interrupt` is True. |
|
""" |
|
result = await self._run_async() |
|
if result == INQUIRERPY_KEYBOARD_INTERRUPT: |
|
raise KeyboardInterrupt |
|
if not self._filter: |
|
return result |
|
return self._filter(result) |
|
|
|
@property |
|
def instruction(self) -> str: |
|
"""str: Instruction to display next to question.""" |
|
return self._instruction |
|
|
|
@property |
|
def kb_maps(self) -> Dict[str, Any]: |
|
"""Dict[str, Any]: Keybinding mappings.""" |
|
return self._kb_maps |
|
|
|
@kb_maps.setter |
|
def kb_maps(self, value: Dict[str, Any]) -> None: |
|
self._kb_maps = {**self._kb_maps, **value} |
|
|
|
@property |
|
def kb_func_lookup(self) -> Dict[str, Any]: |
|
"""Dict[str, Any]: Keybinding function lookup mappings..""" |
|
return self._kb_func_lookup |
|
|
|
@kb_func_lookup.setter |
|
def kb_func_lookup(self, value: Dict[str, Any]) -> None: |
|
self._kb_func_lookup = {**self._kb_func_lookup, **value} |
|
|