Spaces:
Running
Running
from __future__ import annotations | |
from collections.abc import Callable, Sequence | |
from functools import partial | |
from inspect import getmro, isclass | |
from typing import TYPE_CHECKING, Generic, Type, TypeVar, cast, overload | |
_BaseExceptionT_co = TypeVar("_BaseExceptionT_co", bound=BaseException, covariant=True) | |
_BaseExceptionT = TypeVar("_BaseExceptionT", bound=BaseException) | |
_ExceptionT_co = TypeVar("_ExceptionT_co", bound=Exception, covariant=True) | |
_ExceptionT = TypeVar("_ExceptionT", bound=Exception) | |
# using typing.Self would require a typing_extensions dependency on py<3.11 | |
_ExceptionGroupSelf = TypeVar("_ExceptionGroupSelf", bound="ExceptionGroup") | |
_BaseExceptionGroupSelf = TypeVar("_BaseExceptionGroupSelf", bound="BaseExceptionGroup") | |
def check_direct_subclass( | |
exc: BaseException, parents: tuple[type[BaseException]] | |
) -> bool: | |
for cls in getmro(exc.__class__)[:-1]: | |
if cls in parents: | |
return True | |
return False | |
def get_condition_filter( | |
condition: type[_BaseExceptionT] | |
| tuple[type[_BaseExceptionT], ...] | |
| Callable[[_BaseExceptionT_co], bool], | |
) -> Callable[[_BaseExceptionT_co], bool]: | |
if isclass(condition) and issubclass( | |
cast(Type[BaseException], condition), BaseException | |
): | |
return partial(check_direct_subclass, parents=(condition,)) | |
elif isinstance(condition, tuple): | |
if all(isclass(x) and issubclass(x, BaseException) for x in condition): | |
return partial(check_direct_subclass, parents=condition) | |
elif callable(condition): | |
return cast("Callable[[BaseException], bool]", condition) | |
raise TypeError("expected a function, exception type or tuple of exception types") | |
def _derive_and_copy_attributes(self, excs): | |
eg = self.derive(excs) | |
eg.__cause__ = self.__cause__ | |
eg.__context__ = self.__context__ | |
eg.__traceback__ = self.__traceback__ | |
if hasattr(self, "__notes__"): | |
# Create a new list so that add_note() only affects one exceptiongroup | |
eg.__notes__ = list(self.__notes__) | |
return eg | |
class BaseExceptionGroup(BaseException, Generic[_BaseExceptionT_co]): | |
"""A combination of multiple unrelated exceptions.""" | |
def __new__( | |
cls: type[_BaseExceptionGroupSelf], | |
__message: str, | |
__exceptions: Sequence[_BaseExceptionT_co], | |
) -> _BaseExceptionGroupSelf: | |
if not isinstance(__message, str): | |
raise TypeError(f"argument 1 must be str, not {type(__message)}") | |
if not isinstance(__exceptions, Sequence): | |
raise TypeError("second argument (exceptions) must be a sequence") | |
if not __exceptions: | |
raise ValueError( | |
"second argument (exceptions) must be a non-empty sequence" | |
) | |
for i, exc in enumerate(__exceptions): | |
if not isinstance(exc, BaseException): | |
raise ValueError( | |
f"Item {i} of second argument (exceptions) is not an exception" | |
) | |
if cls is BaseExceptionGroup: | |
if all(isinstance(exc, Exception) for exc in __exceptions): | |
cls = ExceptionGroup | |
if issubclass(cls, Exception): | |
for exc in __exceptions: | |
if not isinstance(exc, Exception): | |
if cls is ExceptionGroup: | |
raise TypeError( | |
"Cannot nest BaseExceptions in an ExceptionGroup" | |
) | |
else: | |
raise TypeError( | |
f"Cannot nest BaseExceptions in {cls.__name__!r}" | |
) | |
instance = super().__new__(cls, __message, __exceptions) | |
instance._message = __message | |
instance._exceptions = __exceptions | |
return instance | |
def add_note(self, note: str) -> None: | |
if not isinstance(note, str): | |
raise TypeError( | |
f"Expected a string, got note={note!r} (type {type(note).__name__})" | |
) | |
if not hasattr(self, "__notes__"): | |
self.__notes__: list[str] = [] | |
self.__notes__.append(note) | |
def message(self) -> str: | |
return self._message | |
def exceptions( | |
self, | |
) -> tuple[_BaseExceptionT_co | BaseExceptionGroup[_BaseExceptionT_co], ...]: | |
return tuple(self._exceptions) | |
def subgroup( | |
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...] | |
) -> ExceptionGroup[_ExceptionT] | None: ... | |
def subgroup( | |
self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...] | |
) -> BaseExceptionGroup[_BaseExceptionT] | None: ... | |
def subgroup( | |
self, | |
__condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool], | |
) -> BaseExceptionGroup[_BaseExceptionT_co] | None: ... | |
def subgroup( | |
self, | |
__condition: type[_BaseExceptionT] | |
| tuple[type[_BaseExceptionT], ...] | |
| Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool], | |
) -> BaseExceptionGroup[_BaseExceptionT] | None: | |
condition = get_condition_filter(__condition) | |
modified = False | |
if condition(self): | |
return self | |
exceptions: list[BaseException] = [] | |
for exc in self.exceptions: | |
if isinstance(exc, BaseExceptionGroup): | |
subgroup = exc.subgroup(__condition) | |
if subgroup is not None: | |
exceptions.append(subgroup) | |
if subgroup is not exc: | |
modified = True | |
elif condition(exc): | |
exceptions.append(exc) | |
else: | |
modified = True | |
if not modified: | |
return self | |
elif exceptions: | |
group = _derive_and_copy_attributes(self, exceptions) | |
return group | |
else: | |
return None | |
def split( | |
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...] | |
) -> tuple[ | |
ExceptionGroup[_ExceptionT] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
]: ... | |
def split( | |
self, __condition: type[_BaseExceptionT] | tuple[type[_BaseExceptionT], ...] | |
) -> tuple[ | |
BaseExceptionGroup[_BaseExceptionT] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
]: ... | |
def split( | |
self, | |
__condition: Callable[[_BaseExceptionT_co | _BaseExceptionGroupSelf], bool], | |
) -> tuple[ | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
]: ... | |
def split( | |
self, | |
__condition: type[_BaseExceptionT] | |
| tuple[type[_BaseExceptionT], ...] | |
| Callable[[_BaseExceptionT_co], bool], | |
) -> ( | |
tuple[ | |
ExceptionGroup[_ExceptionT] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
] | |
| tuple[ | |
BaseExceptionGroup[_BaseExceptionT] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
] | |
| tuple[ | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
BaseExceptionGroup[_BaseExceptionT_co] | None, | |
] | |
): | |
condition = get_condition_filter(__condition) | |
if condition(self): | |
return self, None | |
matching_exceptions: list[BaseException] = [] | |
nonmatching_exceptions: list[BaseException] = [] | |
for exc in self.exceptions: | |
if isinstance(exc, BaseExceptionGroup): | |
matching, nonmatching = exc.split(condition) | |
if matching is not None: | |
matching_exceptions.append(matching) | |
if nonmatching is not None: | |
nonmatching_exceptions.append(nonmatching) | |
elif condition(exc): | |
matching_exceptions.append(exc) | |
else: | |
nonmatching_exceptions.append(exc) | |
matching_group: _BaseExceptionGroupSelf | None = None | |
if matching_exceptions: | |
matching_group = _derive_and_copy_attributes(self, matching_exceptions) | |
nonmatching_group: _BaseExceptionGroupSelf | None = None | |
if nonmatching_exceptions: | |
nonmatching_group = _derive_and_copy_attributes( | |
self, nonmatching_exceptions | |
) | |
return matching_group, nonmatching_group | |
def derive(self, __excs: Sequence[_ExceptionT]) -> ExceptionGroup[_ExceptionT]: ... | |
def derive( | |
self, __excs: Sequence[_BaseExceptionT] | |
) -> BaseExceptionGroup[_BaseExceptionT]: ... | |
def derive( | |
self, __excs: Sequence[_BaseExceptionT] | |
) -> BaseExceptionGroup[_BaseExceptionT]: | |
return BaseExceptionGroup(self.message, __excs) | |
def __str__(self) -> str: | |
suffix = "" if len(self._exceptions) == 1 else "s" | |
return f"{self.message} ({len(self._exceptions)} sub-exception{suffix})" | |
def __repr__(self) -> str: | |
return f"{self.__class__.__name__}({self.message!r}, {self._exceptions!r})" | |
class ExceptionGroup(BaseExceptionGroup[_ExceptionT_co], Exception): | |
def __new__( | |
cls: type[_ExceptionGroupSelf], | |
__message: str, | |
__exceptions: Sequence[_ExceptionT_co], | |
) -> _ExceptionGroupSelf: | |
return super().__new__(cls, __message, __exceptions) | |
if TYPE_CHECKING: | |
def exceptions( | |
self, | |
) -> tuple[_ExceptionT_co | ExceptionGroup[_ExceptionT_co], ...]: ... | |
# type: ignore[override] | |
def subgroup( | |
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...] | |
) -> ExceptionGroup[_ExceptionT] | None: ... | |
def subgroup( | |
self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool] | |
) -> ExceptionGroup[_ExceptionT_co] | None: ... | |
def subgroup( | |
self, | |
__condition: type[_ExceptionT] | |
| tuple[type[_ExceptionT], ...] | |
| Callable[[_ExceptionT_co], bool], | |
) -> ExceptionGroup[_ExceptionT] | None: | |
return super().subgroup(__condition) | |
def split( | |
self, __condition: type[_ExceptionT] | tuple[type[_ExceptionT], ...] | |
) -> tuple[ | |
ExceptionGroup[_ExceptionT] | None, ExceptionGroup[_ExceptionT_co] | None | |
]: ... | |
def split( | |
self, __condition: Callable[[_ExceptionT_co | _ExceptionGroupSelf], bool] | |
) -> tuple[ | |
ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None | |
]: ... | |
def split( | |
self: _ExceptionGroupSelf, | |
__condition: type[_ExceptionT] | |
| tuple[type[_ExceptionT], ...] | |
| Callable[[_ExceptionT_co], bool], | |
) -> tuple[ | |
ExceptionGroup[_ExceptionT_co] | None, ExceptionGroup[_ExceptionT_co] | None | |
]: | |
return super().split(__condition) | |