Spaces:
Sleeping
Sleeping
# | |
# Copyright 2011 Facebook | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
"""Miscellaneous network utility code.""" | |
import asyncio | |
import concurrent.futures | |
import errno | |
import os | |
import sys | |
import socket | |
import ssl | |
import stat | |
from tornado.concurrent import dummy_executor, run_on_executor | |
from tornado.ioloop import IOLoop | |
from tornado.util import Configurable, errno_from_exception | |
from typing import List, Callable, Any, Type, Dict, Union, Tuple, Awaitable, Optional | |
# Note that the naming of ssl.Purpose is confusing; the purpose | |
# of a context is to authenticate the opposite side of the connection. | |
_client_ssl_defaults = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) | |
_server_ssl_defaults = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) | |
if hasattr(ssl, "OP_NO_COMPRESSION"): | |
# See netutil.ssl_options_to_context | |
_client_ssl_defaults.options |= ssl.OP_NO_COMPRESSION | |
_server_ssl_defaults.options |= ssl.OP_NO_COMPRESSION | |
# ThreadedResolver runs getaddrinfo on a thread. If the hostname is unicode, | |
# getaddrinfo attempts to import encodings.idna. If this is done at | |
# module-import time, the import lock is already held by the main thread, | |
# leading to deadlock. Avoid it by caching the idna encoder on the main | |
# thread now. | |
"foo".encode("idna") | |
# For undiagnosed reasons, 'latin1' codec may also need to be preloaded. | |
"foo".encode("latin1") | |
# Default backlog used when calling sock.listen() | |
_DEFAULT_BACKLOG = 128 | |
def bind_sockets( | |
port: int, | |
address: Optional[str] = None, | |
family: socket.AddressFamily = socket.AF_UNSPEC, | |
backlog: int = _DEFAULT_BACKLOG, | |
flags: Optional[int] = None, | |
reuse_port: bool = False, | |
) -> List[socket.socket]: | |
"""Creates listening sockets bound to the given port and address. | |
Returns a list of socket objects (multiple sockets are returned if | |
the given address maps to multiple IP addresses, which is most common | |
for mixed IPv4 and IPv6 use). | |
Address may be either an IP address or hostname. If it's a hostname, | |
the server will listen on all IP addresses associated with the | |
name. Address may be an empty string or None to listen on all | |
available interfaces. Family may be set to either `socket.AF_INET` | |
or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise | |
both will be used if available. | |
The ``backlog`` argument has the same meaning as for | |
`socket.listen() <socket.socket.listen>`. | |
``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like | |
``socket.AI_PASSIVE | socket.AI_NUMERICHOST``. | |
``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket | |
in the list. If your platform doesn't support this option ValueError will | |
be raised. | |
""" | |
if reuse_port and not hasattr(socket, "SO_REUSEPORT"): | |
raise ValueError("the platform doesn't support SO_REUSEPORT") | |
sockets = [] | |
if address == "": | |
address = None | |
if not socket.has_ipv6 and family == socket.AF_UNSPEC: | |
# Python can be compiled with --disable-ipv6, which causes | |
# operations on AF_INET6 sockets to fail, but does not | |
# automatically exclude those results from getaddrinfo | |
# results. | |
# http://bugs.python.org/issue16208 | |
family = socket.AF_INET | |
if flags is None: | |
flags = socket.AI_PASSIVE | |
bound_port = None | |
unique_addresses = set() # type: set | |
for res in sorted( | |
socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags), | |
key=lambda x: x[0], | |
): | |
if res in unique_addresses: | |
continue | |
unique_addresses.add(res) | |
af, socktype, proto, canonname, sockaddr = res | |
if ( | |
sys.platform == "darwin" | |
and address == "localhost" | |
and af == socket.AF_INET6 | |
and sockaddr[3] != 0 # type: ignore | |
): | |
# Mac OS X includes a link-local address fe80::1%lo0 in the | |
# getaddrinfo results for 'localhost'. However, the firewall | |
# doesn't understand that this is a local address and will | |
# prompt for access (often repeatedly, due to an apparent | |
# bug in its ability to remember granting access to an | |
# application). Skip these addresses. | |
continue | |
try: | |
sock = socket.socket(af, socktype, proto) | |
except socket.error as e: | |
if errno_from_exception(e) == errno.EAFNOSUPPORT: | |
continue | |
raise | |
if os.name != "nt": | |
try: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
except socket.error as e: | |
if errno_from_exception(e) != errno.ENOPROTOOPT: | |
# Hurd doesn't support SO_REUSEADDR. | |
raise | |
if reuse_port: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
if af == socket.AF_INET6: | |
# On linux, ipv6 sockets accept ipv4 too by default, | |
# but this makes it impossible to bind to both | |
# 0.0.0.0 in ipv4 and :: in ipv6. On other systems, | |
# separate sockets *must* be used to listen for both ipv4 | |
# and ipv6. For consistency, always disable ipv4 on our | |
# ipv6 sockets and use a separate ipv4 socket when needed. | |
# | |
# Python 2.x on windows doesn't have IPPROTO_IPV6. | |
if hasattr(socket, "IPPROTO_IPV6"): | |
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) | |
# automatic port allocation with port=None | |
# should bind on the same port on IPv4 and IPv6 | |
host, requested_port = sockaddr[:2] | |
if requested_port == 0 and bound_port is not None: | |
sockaddr = tuple([host, bound_port] + list(sockaddr[2:])) | |
sock.setblocking(False) | |
try: | |
sock.bind(sockaddr) | |
except OSError as e: | |
if ( | |
errno_from_exception(e) == errno.EADDRNOTAVAIL | |
and address == "localhost" | |
and sockaddr[0] == "::1" | |
): | |
# On some systems (most notably docker with default | |
# configurations), ipv6 is partially disabled: | |
# socket.has_ipv6 is true, we can create AF_INET6 | |
# sockets, and getaddrinfo("localhost", ..., | |
# AF_PASSIVE) resolves to ::1, but we get an error | |
# when binding. | |
# | |
# Swallow the error, but only for this specific case. | |
# If EADDRNOTAVAIL occurs in other situations, it | |
# might be a real problem like a typo in a | |
# configuration. | |
sock.close() | |
continue | |
else: | |
raise | |
bound_port = sock.getsockname()[1] | |
sock.listen(backlog) | |
sockets.append(sock) | |
return sockets | |
if hasattr(socket, "AF_UNIX"): | |
def bind_unix_socket( | |
file: str, mode: int = 0o600, backlog: int = _DEFAULT_BACKLOG | |
) -> socket.socket: | |
"""Creates a listening unix socket. | |
If a socket with the given name already exists, it will be deleted. | |
If any other file with that name exists, an exception will be | |
raised. | |
Returns a socket object (not a list of socket objects like | |
`bind_sockets`) | |
""" | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
try: | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
except socket.error as e: | |
if errno_from_exception(e) != errno.ENOPROTOOPT: | |
# Hurd doesn't support SO_REUSEADDR | |
raise | |
sock.setblocking(False) | |
try: | |
st = os.stat(file) | |
except FileNotFoundError: | |
pass | |
else: | |
if stat.S_ISSOCK(st.st_mode): | |
os.remove(file) | |
else: | |
raise ValueError("File %s exists and is not a socket", file) | |
sock.bind(file) | |
os.chmod(file, mode) | |
sock.listen(backlog) | |
return sock | |
def add_accept_handler( | |
sock: socket.socket, callback: Callable[[socket.socket, Any], None] | |
) -> Callable[[], None]: | |
"""Adds an `.IOLoop` event handler to accept new connections on ``sock``. | |
When a connection is accepted, ``callback(connection, address)`` will | |
be run (``connection`` is a socket object, and ``address`` is the | |
address of the other end of the connection). Note that this signature | |
is different from the ``callback(fd, events)`` signature used for | |
`.IOLoop` handlers. | |
A callable is returned which, when called, will remove the `.IOLoop` | |
event handler and stop processing further incoming connections. | |
.. versionchanged:: 5.0 | |
The ``io_loop`` argument (deprecated since version 4.1) has been removed. | |
.. versionchanged:: 5.0 | |
A callable is returned (``None`` was returned before). | |
""" | |
io_loop = IOLoop.current() | |
removed = [False] | |
def accept_handler(fd: socket.socket, events: int) -> None: | |
# More connections may come in while we're handling callbacks; | |
# to prevent starvation of other tasks we must limit the number | |
# of connections we accept at a time. Ideally we would accept | |
# up to the number of connections that were waiting when we | |
# entered this method, but this information is not available | |
# (and rearranging this method to call accept() as many times | |
# as possible before running any callbacks would have adverse | |
# effects on load balancing in multiprocess configurations). | |
# Instead, we use the (default) listen backlog as a rough | |
# heuristic for the number of connections we can reasonably | |
# accept at once. | |
for i in range(_DEFAULT_BACKLOG): | |
if removed[0]: | |
# The socket was probably closed | |
return | |
try: | |
connection, address = sock.accept() | |
except BlockingIOError: | |
# EWOULDBLOCK indicates we have accepted every | |
# connection that is available. | |
return | |
except ConnectionAbortedError: | |
# ECONNABORTED indicates that there was a connection | |
# but it was closed while still in the accept queue. | |
# (observed on FreeBSD). | |
continue | |
callback(connection, address) | |
def remove_handler() -> None: | |
io_loop.remove_handler(sock) | |
removed[0] = True | |
io_loop.add_handler(sock, accept_handler, IOLoop.READ) | |
return remove_handler | |
def is_valid_ip(ip: str) -> bool: | |
"""Returns ``True`` if the given string is a well-formed IP address. | |
Supports IPv4 and IPv6. | |
""" | |
if not ip or "\x00" in ip: | |
# getaddrinfo resolves empty strings to localhost, and truncates | |
# on zero bytes. | |
return False | |
try: | |
res = socket.getaddrinfo( | |
ip, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_NUMERICHOST | |
) | |
return bool(res) | |
except socket.gaierror as e: | |
if e.args[0] == socket.EAI_NONAME: | |
return False | |
raise | |
except UnicodeError: | |
# `socket.getaddrinfo` will raise a UnicodeError from the | |
# `idna` decoder if the input is longer than 63 characters, | |
# even for socket.AI_NUMERICHOST. See | |
# https://bugs.python.org/issue32958 for discussion | |
return False | |
return True | |
class Resolver(Configurable): | |
"""Configurable asynchronous DNS resolver interface. | |
By default, a blocking implementation is used (which simply calls | |
`socket.getaddrinfo`). An alternative implementation can be | |
chosen with the `Resolver.configure <.Configurable.configure>` | |
class method:: | |
Resolver.configure('tornado.netutil.ThreadedResolver') | |
The implementations of this interface included with Tornado are | |
* `tornado.netutil.DefaultLoopResolver` | |
* `tornado.netutil.DefaultExecutorResolver` (deprecated) | |
* `tornado.netutil.BlockingResolver` (deprecated) | |
* `tornado.netutil.ThreadedResolver` (deprecated) | |
* `tornado.netutil.OverrideResolver` | |
* `tornado.platform.twisted.TwistedResolver` (deprecated) | |
* `tornado.platform.caresresolver.CaresResolver` (deprecated) | |
.. versionchanged:: 5.0 | |
The default implementation has changed from `BlockingResolver` to | |
`DefaultExecutorResolver`. | |
.. versionchanged:: 6.2 | |
The default implementation has changed from `DefaultExecutorResolver` to | |
`DefaultLoopResolver`. | |
""" | |
def configurable_base(cls) -> Type["Resolver"]: | |
return Resolver | |
def configurable_default(cls) -> Type["Resolver"]: | |
return DefaultLoopResolver | |
def resolve( | |
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> Awaitable[List[Tuple[int, Any]]]: | |
"""Resolves an address. | |
The ``host`` argument is a string which may be a hostname or a | |
literal IP address. | |
Returns a `.Future` whose result is a list of (family, | |
address) pairs, where address is a tuple suitable to pass to | |
`socket.connect <socket.socket.connect>` (i.e. a ``(host, | |
port)`` pair for IPv4; additional fields may be present for | |
IPv6). If a ``callback`` is passed, it will be run with the | |
result as an argument when it is complete. | |
:raises IOError: if the address cannot be resolved. | |
.. versionchanged:: 4.4 | |
Standardized all implementations to raise `IOError`. | |
.. versionchanged:: 6.0 The ``callback`` argument was removed. | |
Use the returned awaitable object instead. | |
""" | |
raise NotImplementedError() | |
def close(self) -> None: | |
"""Closes the `Resolver`, freeing any resources used. | |
.. versionadded:: 3.1 | |
""" | |
pass | |
def _resolve_addr( | |
host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> List[Tuple[int, Any]]: | |
# On Solaris, getaddrinfo fails if the given port is not found | |
# in /etc/services and no socket type is given, so we must pass | |
# one here. The socket type used here doesn't seem to actually | |
# matter (we discard the one we get back in the results), | |
# so the addresses we return should still be usable with SOCK_DGRAM. | |
addrinfo = socket.getaddrinfo(host, port, family, socket.SOCK_STREAM) | |
results = [] | |
for fam, socktype, proto, canonname, address in addrinfo: | |
results.append((fam, address)) | |
return results # type: ignore | |
class DefaultExecutorResolver(Resolver): | |
"""Resolver implementation using `.IOLoop.run_in_executor`. | |
.. versionadded:: 5.0 | |
.. deprecated:: 6.2 | |
Use `DefaultLoopResolver` instead. | |
""" | |
async def resolve( | |
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> List[Tuple[int, Any]]: | |
result = await IOLoop.current().run_in_executor( | |
None, _resolve_addr, host, port, family | |
) | |
return result | |
class DefaultLoopResolver(Resolver): | |
"""Resolver implementation using `asyncio.loop.getaddrinfo`.""" | |
async def resolve( | |
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> List[Tuple[int, Any]]: | |
# On Solaris, getaddrinfo fails if the given port is not found | |
# in /etc/services and no socket type is given, so we must pass | |
# one here. The socket type used here doesn't seem to actually | |
# matter (we discard the one we get back in the results), | |
# so the addresses we return should still be usable with SOCK_DGRAM. | |
return [ | |
(fam, address) | |
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo( | |
host, port, family=family, type=socket.SOCK_STREAM | |
) | |
] | |
class ExecutorResolver(Resolver): | |
"""Resolver implementation using a `concurrent.futures.Executor`. | |
Use this instead of `ThreadedResolver` when you require additional | |
control over the executor being used. | |
The executor will be shut down when the resolver is closed unless | |
``close_resolver=False``; use this if you want to reuse the same | |
executor elsewhere. | |
.. versionchanged:: 5.0 | |
The ``io_loop`` argument (deprecated since version 4.1) has been removed. | |
.. deprecated:: 5.0 | |
The default `Resolver` now uses `asyncio.loop.getaddrinfo`; | |
use that instead of this class. | |
""" | |
def initialize( | |
self, | |
executor: Optional[concurrent.futures.Executor] = None, | |
close_executor: bool = True, | |
) -> None: | |
if executor is not None: | |
self.executor = executor | |
self.close_executor = close_executor | |
else: | |
self.executor = dummy_executor | |
self.close_executor = False | |
def close(self) -> None: | |
if self.close_executor: | |
self.executor.shutdown() | |
self.executor = None # type: ignore | |
def resolve( | |
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> List[Tuple[int, Any]]: | |
return _resolve_addr(host, port, family) | |
class BlockingResolver(ExecutorResolver): | |
"""Default `Resolver` implementation, using `socket.getaddrinfo`. | |
The `.IOLoop` will be blocked during the resolution, although the | |
callback will not be run until the next `.IOLoop` iteration. | |
.. deprecated:: 5.0 | |
The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead | |
of this class. | |
""" | |
def initialize(self) -> None: # type: ignore | |
super().initialize() | |
class ThreadedResolver(ExecutorResolver): | |
"""Multithreaded non-blocking `Resolver` implementation. | |
Requires the `concurrent.futures` package to be installed | |
(available in the standard library since Python 3.2, | |
installable with ``pip install futures`` in older versions). | |
The thread pool size can be configured with:: | |
Resolver.configure('tornado.netutil.ThreadedResolver', | |
num_threads=10) | |
.. versionchanged:: 3.1 | |
All ``ThreadedResolvers`` share a single thread pool, whose | |
size is set by the first one to be created. | |
.. deprecated:: 5.0 | |
The default `Resolver` now uses `.IOLoop.run_in_executor`; use that instead | |
of this class. | |
""" | |
_threadpool = None # type: ignore | |
_threadpool_pid = None # type: int | |
def initialize(self, num_threads: int = 10) -> None: # type: ignore | |
threadpool = ThreadedResolver._create_threadpool(num_threads) | |
super().initialize(executor=threadpool, close_executor=False) | |
def _create_threadpool( | |
cls, num_threads: int | |
) -> concurrent.futures.ThreadPoolExecutor: | |
pid = os.getpid() | |
if cls._threadpool_pid != pid: | |
# Threads cannot survive after a fork, so if our pid isn't what it | |
# was when we created the pool then delete it. | |
cls._threadpool = None | |
if cls._threadpool is None: | |
cls._threadpool = concurrent.futures.ThreadPoolExecutor(num_threads) | |
cls._threadpool_pid = pid | |
return cls._threadpool | |
class OverrideResolver(Resolver): | |
"""Wraps a resolver with a mapping of overrides. | |
This can be used to make local DNS changes (e.g. for testing) | |
without modifying system-wide settings. | |
The mapping can be in three formats:: | |
{ | |
# Hostname to host or ip | |
"example.com": "127.0.1.1", | |
# Host+port to host+port | |
("login.example.com", 443): ("localhost", 1443), | |
# Host+port+address family to host+port | |
("login.example.com", 443, socket.AF_INET6): ("::1", 1443), | |
} | |
.. versionchanged:: 5.0 | |
Added support for host-port-family triplets. | |
""" | |
def initialize(self, resolver: Resolver, mapping: dict) -> None: | |
self.resolver = resolver | |
self.mapping = mapping | |
def close(self) -> None: | |
self.resolver.close() | |
def resolve( | |
self, host: str, port: int, family: socket.AddressFamily = socket.AF_UNSPEC | |
) -> Awaitable[List[Tuple[int, Any]]]: | |
if (host, port, family) in self.mapping: | |
host, port = self.mapping[(host, port, family)] | |
elif (host, port) in self.mapping: | |
host, port = self.mapping[(host, port)] | |
elif host in self.mapping: | |
host = self.mapping[host] | |
return self.resolver.resolve(host, port, family) | |
# These are the keyword arguments to ssl.wrap_socket that must be translated | |
# to their SSLContext equivalents (the other arguments are still passed | |
# to SSLContext.wrap_socket). | |
_SSL_CONTEXT_KEYWORDS = frozenset( | |
["ssl_version", "certfile", "keyfile", "cert_reqs", "ca_certs", "ciphers"] | |
) | |
def ssl_options_to_context( | |
ssl_options: Union[Dict[str, Any], ssl.SSLContext], | |
server_side: Optional[bool] = None, | |
) -> ssl.SSLContext: | |
"""Try to convert an ``ssl_options`` dictionary to an | |
`~ssl.SSLContext` object. | |
The ``ssl_options`` dictionary contains keywords to be passed to | |
``ssl.SSLContext.wrap_socket``. In Python 2.7.9+, `ssl.SSLContext` objects can | |
be used instead. This function converts the dict form to its | |
`~ssl.SSLContext` equivalent, and may be used when a component which | |
accepts both forms needs to upgrade to the `~ssl.SSLContext` version | |
to use features like SNI or NPN. | |
.. versionchanged:: 6.2 | |
Added server_side argument. Omitting this argument will | |
result in a DeprecationWarning on Python 3.10. | |
""" | |
if isinstance(ssl_options, ssl.SSLContext): | |
return ssl_options | |
assert isinstance(ssl_options, dict) | |
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options | |
# TODO: Now that we have the server_side argument, can we switch to | |
# create_default_context or would that change behavior? | |
default_version = ssl.PROTOCOL_TLS | |
if server_side: | |
default_version = ssl.PROTOCOL_TLS_SERVER | |
elif server_side is not None: | |
default_version = ssl.PROTOCOL_TLS_CLIENT | |
context = ssl.SSLContext(ssl_options.get("ssl_version", default_version)) | |
if "certfile" in ssl_options: | |
context.load_cert_chain( | |
ssl_options["certfile"], ssl_options.get("keyfile", None) | |
) | |
if "cert_reqs" in ssl_options: | |
if ssl_options["cert_reqs"] == ssl.CERT_NONE: | |
# This may have been set automatically by PROTOCOL_TLS_CLIENT but is | |
# incompatible with CERT_NONE so we must manually clear it. | |
context.check_hostname = False | |
context.verify_mode = ssl_options["cert_reqs"] | |
if "ca_certs" in ssl_options: | |
context.load_verify_locations(ssl_options["ca_certs"]) | |
if "ciphers" in ssl_options: | |
context.set_ciphers(ssl_options["ciphers"]) | |
if hasattr(ssl, "OP_NO_COMPRESSION"): | |
# Disable TLS compression to avoid CRIME and related attacks. | |
# This constant depends on openssl version 1.0. | |
# TODO: Do we need to do this ourselves or can we trust | |
# the defaults? | |
context.options |= ssl.OP_NO_COMPRESSION | |
return context | |
def ssl_wrap_socket( | |
socket: socket.socket, | |
ssl_options: Union[Dict[str, Any], ssl.SSLContext], | |
server_hostname: Optional[str] = None, | |
server_side: Optional[bool] = None, | |
**kwargs: Any | |
) -> ssl.SSLSocket: | |
"""Returns an ``ssl.SSLSocket`` wrapping the given socket. | |
``ssl_options`` may be either an `ssl.SSLContext` object or a | |
dictionary (as accepted by `ssl_options_to_context`). Additional | |
keyword arguments are passed to `ssl.SSLContext.wrap_socket`. | |
.. versionchanged:: 6.2 | |
Added server_side argument. Omitting this argument will | |
result in a DeprecationWarning on Python 3.10. | |
""" | |
context = ssl_options_to_context(ssl_options, server_side=server_side) | |
if server_side is None: | |
server_side = False | |
assert ssl.HAS_SNI | |
# TODO: add a unittest for hostname validation (python added server-side SNI support in 3.4) | |
# In the meantime it can be manually tested with | |
# python3 -m tornado.httpclient https://sni.velox.ch | |
return context.wrap_socket( | |
socket, server_hostname=server_hostname, server_side=server_side, **kwargs | |
) | |