Spaces:
Sleeping
Sleeping
from __future__ import annotations | |
import select | |
import socket | |
from functools import partial | |
__all__ = ["wait_for_read", "wait_for_write"] | |
# How should we wait on sockets? | |
# | |
# There are two types of APIs you can use for waiting on sockets: the fancy | |
# modern stateful APIs like epoll/kqueue, and the older stateless APIs like | |
# select/poll. The stateful APIs are more efficient when you have a lots of | |
# sockets to keep track of, because you can set them up once and then use them | |
# lots of times. But we only ever want to wait on a single socket at a time | |
# and don't want to keep track of state, so the stateless APIs are actually | |
# more efficient. So we want to use select() or poll(). | |
# | |
# Now, how do we choose between select() and poll()? On traditional Unixes, | |
# select() has a strange calling convention that makes it slow, or fail | |
# altogether, for high-numbered file descriptors. The point of poll() is to fix | |
# that, so on Unixes, we prefer poll(). | |
# | |
# On Windows, there is no poll() (or at least Python doesn't provide a wrapper | |
# for it), but that's OK, because on Windows, select() doesn't have this | |
# strange calling convention; plain select() works fine. | |
# | |
# So: on Windows we use select(), and everywhere else we use poll(). We also | |
# fall back to select() in case poll() is somehow broken or missing. | |
def select_wait_for_socket( | |
sock: socket.socket, | |
read: bool = False, | |
write: bool = False, | |
timeout: float | None = None, | |
) -> bool: | |
if not read and not write: | |
raise RuntimeError("must specify at least one of read=True, write=True") | |
rcheck = [] | |
wcheck = [] | |
if read: | |
rcheck.append(sock) | |
if write: | |
wcheck.append(sock) | |
# When doing a non-blocking connect, most systems signal success by | |
# marking the socket writable. Windows, though, signals success by marked | |
# it as "exceptional". We paper over the difference by checking the write | |
# sockets for both conditions. (The stdlib selectors module does the same | |
# thing.) | |
fn = partial(select.select, rcheck, wcheck, wcheck) | |
rready, wready, xready = fn(timeout) | |
return bool(rready or wready or xready) | |
def poll_wait_for_socket( | |
sock: socket.socket, | |
read: bool = False, | |
write: bool = False, | |
timeout: float | None = None, | |
) -> bool: | |
if not read and not write: | |
raise RuntimeError("must specify at least one of read=True, write=True") | |
mask = 0 | |
if read: | |
mask |= select.POLLIN | |
if write: | |
mask |= select.POLLOUT | |
poll_obj = select.poll() | |
poll_obj.register(sock, mask) | |
# For some reason, poll() takes timeout in milliseconds | |
def do_poll(t: float | None) -> list[tuple[int, int]]: | |
if t is not None: | |
t *= 1000 | |
return poll_obj.poll(t) | |
return bool(do_poll(timeout)) | |
def _have_working_poll() -> bool: | |
# Apparently some systems have a select.poll that fails as soon as you try | |
# to use it, either due to strange configuration or broken monkeypatching | |
# from libraries like eventlet/greenlet. | |
try: | |
poll_obj = select.poll() | |
poll_obj.poll(0) | |
except (AttributeError, OSError): | |
return False | |
else: | |
return True | |
def wait_for_socket( | |
sock: socket.socket, | |
read: bool = False, | |
write: bool = False, | |
timeout: float | None = None, | |
) -> bool: | |
# We delay choosing which implementation to use until the first time we're | |
# called. We could do it at import time, but then we might make the wrong | |
# decision if someone goes wild with monkeypatching select.poll after | |
# we're imported. | |
global wait_for_socket | |
if _have_working_poll(): | |
wait_for_socket = poll_wait_for_socket | |
elif hasattr(select, "select"): | |
wait_for_socket = select_wait_for_socket | |
return wait_for_socket(sock, read, write, timeout) | |
def wait_for_read(sock: socket.socket, timeout: float | None = None) -> bool: | |
"""Waits for reading to be available on a given socket. | |
Returns True if the socket is readable, or False if the timeout expired. | |
""" | |
return wait_for_socket(sock, read=True, timeout=timeout) | |
def wait_for_write(sock: socket.socket, timeout: float | None = None) -> bool: | |
"""Waits for writing to be available on a given socket. | |
Returns True if the socket is readable, or False if the timeout expired. | |
""" | |
return wait_for_socket(sock, write=True, timeout=timeout) | |