Spaces:
Running
Running
File size: 5,252 Bytes
b72ab63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
import re
import sys
from typing import List, Optional, Union
__all__ = ["ReceiveBuffer"]
# Operations we want to support:
# - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable),
# or wait until there is one
# - read at-most-N bytes
# Goals:
# - on average, do this fast
# - worst case, do this in O(n) where n is the number of bytes processed
# Plan:
# - store bytearray, offset, how far we've searched for a separator token
# - use the how-far-we've-searched data to avoid rescanning
# - while doing a stream of uninterrupted processing, advance offset instead
# of constantly copying
# WARNING:
# - I haven't benchmarked or profiled any of this yet.
#
# Note that starting in Python 3.4, deleting the initial n bytes from a
# bytearray is amortized O(n), thanks to some excellent work by Antoine
# Martin:
#
# https://bugs.python.org/issue19087
#
# This means that if we only supported 3.4+, we could get rid of the code here
# involving self._start and self.compress, because it's doing exactly the same
# thing that bytearray now does internally.
#
# BUT unfortunately, we still support 2.7, and reading short segments out of a
# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
# delete this code. Yet:
#
# https://pythonclock.org/
#
# (Two things to double-check first though: make sure PyPy also has the
# optimization, and benchmark to make sure it's a win, since we do have a
# slightly clever thing where we delay calling compress() until we've
# processed a whole event, which could in theory be slightly more efficient
# than the internal bytearray support.)
blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE)
class ReceiveBuffer:
def __init__(self) -> None:
self._data = bytearray()
self._next_line_search = 0
self._multiple_lines_search = 0
def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer":
self._data += byteslike
return self
def __bool__(self) -> bool:
return bool(len(self))
def __len__(self) -> int:
return len(self._data)
# for @property unprocessed_data
def __bytes__(self) -> bytes:
return bytes(self._data)
def _extract(self, count: int) -> bytearray:
# extracting an initial slice of the data buffer and return it
out = self._data[:count]
del self._data[:count]
self._next_line_search = 0
self._multiple_lines_search = 0
return out
def maybe_extract_at_most(self, count: int) -> Optional[bytearray]:
"""
Extract a fixed number of bytes from the buffer.
"""
out = self._data[:count]
if not out:
return None
return self._extract(count)
def maybe_extract_next_line(self) -> Optional[bytearray]:
"""
Extract the first line, if it is completed in the buffer.
"""
# Only search in buffer space that we've not already looked at.
search_start_index = max(0, self._next_line_search - 1)
partial_idx = self._data.find(b"\r\n", search_start_index)
if partial_idx == -1:
self._next_line_search = len(self._data)
return None
# + 2 is to compensate len(b"\r\n")
idx = partial_idx + 2
return self._extract(idx)
def maybe_extract_lines(self) -> Optional[List[bytearray]]:
"""
Extract everything up to the first blank line, and return a list of lines.
"""
# Handle the case where we have an immediate empty line.
if self._data[:1] == b"\n":
self._extract(1)
return []
if self._data[:2] == b"\r\n":
self._extract(2)
return []
# Only search in buffer space that we've not already looked at.
match = blank_line_regex.search(self._data, self._multiple_lines_search)
if match is None:
self._multiple_lines_search = max(0, len(self._data) - 2)
return None
# Truncate the buffer and return it.
idx = match.span(0)[-1]
out = self._extract(idx)
lines = out.split(b"\n")
for line in lines:
if line.endswith(b"\r"):
del line[-1]
assert lines[-2] == lines[-1] == b""
del lines[-2:]
return lines
# In theory we should wait until `\r\n` before starting to validate
# incoming data. However it's interesting to detect (very) invalid data
# early given they might not even contain `\r\n` at all (hence only
# timeout will get rid of them).
# This is not a 100% effective detection but more of a cheap sanity check
# allowing for early abort in some useful cases.
# This is especially interesting when peer is messing up with HTTPS and
# sent us a TLS stream where we were expecting plain HTTP given all
# versions of TLS so far start handshake with a 0x16 message type code.
def is_next_line_obviously_invalid_request_line(self) -> bool:
try:
# HTTP header line must not contain non-printable characters
# and should not start with a space
return self._data[0] < 0x21
except IndexError:
return False
|