Spaces:
Running
Running
File size: 10,230 Bytes
b72ab63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
import re
from typing import AnyStr, cast, List, overload, Sequence, Tuple, TYPE_CHECKING, Union
from ._abnf import field_name, field_value
from ._util import bytesify, LocalProtocolError, validate
if TYPE_CHECKING:
from ._events import Request
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal # type: ignore
# Facts
# -----
#
# Headers are:
# keys: case-insensitive ascii
# values: mixture of ascii and raw bytes
#
# "Historically, HTTP has allowed field content with text in the ISO-8859-1
# charset [ISO-8859-1], supporting other charsets only through use of
# [RFC2047] encoding. In practice, most HTTP header field values use only a
# subset of the US-ASCII charset [USASCII]. Newly defined header fields SHOULD
# limit their field values to US-ASCII octets. A recipient SHOULD treat other
# octets in field content (obs-text) as opaque data."
# And it deprecates all non-ascii values
#
# Leading/trailing whitespace in header names is forbidden
#
# Values get leading/trailing whitespace stripped
#
# Content-Disposition actually needs to contain unicode semantically; to
# accomplish this it has a terrifically weird way of encoding the filename
# itself as ascii (and even this still has lots of cross-browser
# incompatibilities)
#
# Order is important:
# "a proxy MUST NOT change the order of these field values when forwarding a
# message"
# (and there are several headers where the order indicates a preference)
#
# Multiple occurences of the same header:
# "A sender MUST NOT generate multiple header fields with the same field name
# in a message unless either the entire field value for that header field is
# defined as a comma-separated list [or the header is Set-Cookie which gets a
# special exception]" - RFC 7230. (cookies are in RFC 6265)
#
# So every header aside from Set-Cookie can be merged by b", ".join if it
# occurs repeatedly. But, of course, they can't necessarily be split by
# .split(b","), because quoting.
#
# Given all this mess (case insensitive, duplicates allowed, order is
# important, ...), there doesn't appear to be any standard way to handle
# headers in Python -- they're almost like dicts, but... actually just
# aren't. For now we punt and just use a super simple representation: headers
# are a list of pairs
#
# [(name1, value1), (name2, value2), ...]
#
# where all entries are bytestrings, names are lowercase and have no
# leading/trailing whitespace, and values are bytestrings with no
# leading/trailing whitespace. Searching and updating are done via naive O(n)
# methods.
#
# Maybe a dict-of-lists would be better?
_content_length_re = re.compile(rb"[0-9]+")
_field_name_re = re.compile(field_name.encode("ascii"))
_field_value_re = re.compile(field_value.encode("ascii"))
class Headers(Sequence[Tuple[bytes, bytes]]):
"""
A list-like interface that allows iterating over headers as byte-pairs
of (lowercased-name, value).
Internally we actually store the representation as three-tuples,
including both the raw original casing, in order to preserve casing
over-the-wire, and the lowercased name, for case-insensitive comparisions.
r = Request(
method="GET",
target="/",
headers=[("Host", "example.org"), ("Connection", "keep-alive")],
http_version="1.1",
)
assert r.headers == [
(b"host", b"example.org"),
(b"connection", b"keep-alive")
]
assert r.headers.raw_items() == [
(b"Host", b"example.org"),
(b"Connection", b"keep-alive")
]
"""
__slots__ = "_full_items"
def __init__(self, full_items: List[Tuple[bytes, bytes, bytes]]) -> None:
self._full_items = full_items
def __bool__(self) -> bool:
return bool(self._full_items)
def __eq__(self, other: object) -> bool:
return list(self) == list(other) # type: ignore
def __len__(self) -> int:
return len(self._full_items)
def __repr__(self) -> str:
return "<Headers(%s)>" % repr(list(self))
def __getitem__(self, idx: int) -> Tuple[bytes, bytes]: # type: ignore[override]
_, name, value = self._full_items[idx]
return (name, value)
def raw_items(self) -> List[Tuple[bytes, bytes]]:
return [(raw_name, value) for raw_name, _, value in self._full_items]
HeaderTypes = Union[
List[Tuple[bytes, bytes]],
List[Tuple[bytes, str]],
List[Tuple[str, bytes]],
List[Tuple[str, str]],
]
@overload
def normalize_and_validate(headers: Headers, _parsed: Literal[True]) -> Headers:
...
@overload
def normalize_and_validate(headers: HeaderTypes, _parsed: Literal[False]) -> Headers:
...
@overload
def normalize_and_validate(
headers: Union[Headers, HeaderTypes], _parsed: bool = False
) -> Headers:
...
def normalize_and_validate(
headers: Union[Headers, HeaderTypes], _parsed: bool = False
) -> Headers:
new_headers = []
seen_content_length = None
saw_transfer_encoding = False
for name, value in headers:
# For headers coming out of the parser, we can safely skip some steps,
# because it always returns bytes and has already run these regexes
# over the data:
if not _parsed:
name = bytesify(name)
value = bytesify(value)
validate(_field_name_re, name, "Illegal header name {!r}", name)
validate(_field_value_re, value, "Illegal header value {!r}", value)
assert isinstance(name, bytes)
assert isinstance(value, bytes)
raw_name = name
name = name.lower()
if name == b"content-length":
lengths = {length.strip() for length in value.split(b",")}
if len(lengths) != 1:
raise LocalProtocolError("conflicting Content-Length headers")
value = lengths.pop()
validate(_content_length_re, value, "bad Content-Length")
if seen_content_length is None:
seen_content_length = value
new_headers.append((raw_name, name, value))
elif seen_content_length != value:
raise LocalProtocolError("conflicting Content-Length headers")
elif name == b"transfer-encoding":
# "A server that receives a request message with a transfer coding
# it does not understand SHOULD respond with 501 (Not
# Implemented)."
# https://tools.ietf.org/html/rfc7230#section-3.3.1
if saw_transfer_encoding:
raise LocalProtocolError(
"multiple Transfer-Encoding headers", error_status_hint=501
)
# "All transfer-coding names are case-insensitive"
# -- https://tools.ietf.org/html/rfc7230#section-4
value = value.lower()
if value != b"chunked":
raise LocalProtocolError(
"Only Transfer-Encoding: chunked is supported",
error_status_hint=501,
)
saw_transfer_encoding = True
new_headers.append((raw_name, name, value))
else:
new_headers.append((raw_name, name, value))
return Headers(new_headers)
def get_comma_header(headers: Headers, name: bytes) -> List[bytes]:
# Should only be used for headers whose value is a list of
# comma-separated, case-insensitive values.
#
# The header name `name` is expected to be lower-case bytes.
#
# Connection: meets these criteria (including cast insensitivity).
#
# Content-Length: technically is just a single value (1*DIGIT), but the
# standard makes reference to implementations that do multiple values, and
# using this doesn't hurt. Ditto, case insensitivity doesn't things either
# way.
#
# Transfer-Encoding: is more complex (allows for quoted strings), so
# splitting on , is actually wrong. For example, this is legal:
#
# Transfer-Encoding: foo; options="1,2", chunked
#
# and should be parsed as
#
# foo; options="1,2"
# chunked
#
# but this naive function will parse it as
#
# foo; options="1
# 2"
# chunked
#
# However, this is okay because the only thing we are going to do with
# any Transfer-Encoding is reject ones that aren't just "chunked", so
# both of these will be treated the same anyway.
#
# Expect: the only legal value is the literal string
# "100-continue". Splitting on commas is harmless. Case insensitive.
#
out: List[bytes] = []
for _, found_name, found_raw_value in headers._full_items:
if found_name == name:
found_raw_value = found_raw_value.lower()
for found_split_value in found_raw_value.split(b","):
found_split_value = found_split_value.strip()
if found_split_value:
out.append(found_split_value)
return out
def set_comma_header(headers: Headers, name: bytes, new_values: List[bytes]) -> Headers:
# The header name `name` is expected to be lower-case bytes.
#
# Note that when we store the header we use title casing for the header
# names, in order to match the conventional HTTP header style.
#
# Simply calling `.title()` is a blunt approach, but it's correct
# here given the cases where we're using `set_comma_header`...
#
# Connection, Content-Length, Transfer-Encoding.
new_headers: List[Tuple[bytes, bytes]] = []
for found_raw_name, found_name, found_raw_value in headers._full_items:
if found_name != name:
new_headers.append((found_raw_name, found_raw_value))
for new_value in new_values:
new_headers.append((name.title(), new_value))
return normalize_and_validate(new_headers)
def has_expect_100_continue(request: "Request") -> bool:
# https://tools.ietf.org/html/rfc7231#section-5.1.1
# "A server that receives a 100-continue expectation in an HTTP/1.0 request
# MUST ignore that expectation."
if request.http_version < b"1.1":
return False
expect = get_comma_header(request.headers, b"expect")
return b"100-continue" in expect
|