Spaces:
Runtime error
Runtime error
File size: 5,043 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
from __future__ import annotations
import codecs
from dataclasses import InitVar, dataclass, field
from typing import Any, Callable, Mapping
from ..abc import (
AnyByteReceiveStream,
AnyByteSendStream,
AnyByteStream,
ObjectReceiveStream,
ObjectSendStream,
ObjectStream,
)
@dataclass(eq=False)
class TextReceiveStream(ObjectReceiveStream[str]):
"""
Stream wrapper that decodes bytes to strings using the given encoding.
Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any completely
received unicode characters as soon as they come in.
:param transport_stream: any bytes-based receive stream
:param encoding: character encoding to use for decoding bytes to strings (defaults to
``utf-8``)
:param errors: handling scheme for decoding errors (defaults to ``strict``; see the
`codecs module documentation`_ for a comprehensive list of options)
.. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
"""
transport_stream: AnyByteReceiveStream
encoding: InitVar[str] = "utf-8"
errors: InitVar[str] = "strict"
_decoder: codecs.IncrementalDecoder = field(init=False)
def __post_init__(self, encoding: str, errors: str) -> None:
decoder_class = codecs.getincrementaldecoder(encoding)
self._decoder = decoder_class(errors=errors)
async def receive(self) -> str:
while True:
chunk = await self.transport_stream.receive()
decoded = self._decoder.decode(chunk)
if decoded:
return decoded
async def aclose(self) -> None:
await self.transport_stream.aclose()
self._decoder.reset()
@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
return self.transport_stream.extra_attributes
@dataclass(eq=False)
class TextSendStream(ObjectSendStream[str]):
"""
Sends strings to the wrapped stream as bytes using the given encoding.
:param AnyByteSendStream transport_stream: any bytes-based send stream
:param str encoding: character encoding to use for encoding strings to bytes (defaults to
``utf-8``)
:param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
`codecs module documentation`_ for a comprehensive list of options)
.. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
"""
transport_stream: AnyByteSendStream
encoding: InitVar[str] = "utf-8"
errors: str = "strict"
_encoder: Callable[..., tuple[bytes, int]] = field(init=False)
def __post_init__(self, encoding: str) -> None:
self._encoder = codecs.getencoder(encoding)
async def send(self, item: str) -> None:
encoded = self._encoder(item, self.errors)[0]
await self.transport_stream.send(encoded)
async def aclose(self) -> None:
await self.transport_stream.aclose()
@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
return self.transport_stream.extra_attributes
@dataclass(eq=False)
class TextStream(ObjectStream[str]):
"""
A bidirectional stream that decodes bytes to strings on receive and encodes strings to bytes on
send.
Extra attributes will be provided from both streams, with the receive stream providing the
values in case of a conflict.
:param AnyByteStream transport_stream: any bytes-based stream
:param str encoding: character encoding to use for encoding/decoding strings to/from bytes
(defaults to ``utf-8``)
:param str errors: handling scheme for encoding errors (defaults to ``strict``; see the
`codecs module documentation`_ for a comprehensive list of options)
.. _codecs module documentation: https://docs.python.org/3/library/codecs.html#codec-objects
"""
transport_stream: AnyByteStream
encoding: InitVar[str] = "utf-8"
errors: InitVar[str] = "strict"
_receive_stream: TextReceiveStream = field(init=False)
_send_stream: TextSendStream = field(init=False)
def __post_init__(self, encoding: str, errors: str) -> None:
self._receive_stream = TextReceiveStream(
self.transport_stream, encoding=encoding, errors=errors
)
self._send_stream = TextSendStream(
self.transport_stream, encoding=encoding, errors=errors
)
async def receive(self) -> str:
return await self._receive_stream.receive()
async def send(self, item: str) -> None:
await self._send_stream.send(item)
async def send_eof(self) -> None:
await self.transport_stream.send_eof()
async def aclose(self) -> None:
await self._send_stream.aclose()
await self._receive_stream.aclose()
@property
def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
return {
**self._send_stream.extra_attributes,
**self._receive_stream.extra_attributes,
}
|