mirror of https://github.com/python/cpython
gh-91227: Ignore ERROR_PORT_UNREACHABLE in proactor recvfrom() (#32011)
This commit is contained in:
parent
9967b568ed
commit
f11d0d8be8
|
@ -8,6 +8,7 @@ if sys.platform != 'win32': # pragma: no cover
|
|||
import _overlapped
|
||||
import _winapi
|
||||
import errno
|
||||
from functools import partial
|
||||
import math
|
||||
import msvcrt
|
||||
import socket
|
||||
|
@ -467,6 +468,18 @@ class IocpProactor:
|
|||
else:
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def _finish_recvfrom(cls, trans, key, ov, *, empty_result):
|
||||
try:
|
||||
return cls.finish_socket_func(trans, key, ov)
|
||||
except OSError as exc:
|
||||
# WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same
|
||||
# socket is used to send to an address that is not listening.
|
||||
if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE:
|
||||
return empty_result, None
|
||||
else:
|
||||
raise
|
||||
|
||||
def recv(self, conn, nbytes, flags=0):
|
||||
self._register_with_iocp(conn)
|
||||
ov = _overlapped.Overlapped(NULL)
|
||||
|
@ -501,7 +514,8 @@ class IocpProactor:
|
|||
except BrokenPipeError:
|
||||
return self._result((b'', None))
|
||||
|
||||
return self._register(ov, conn, self.finish_socket_func)
|
||||
return self._register(ov, conn, partial(self._finish_recvfrom,
|
||||
empty_result=b''))
|
||||
|
||||
def recvfrom_into(self, conn, buf, flags=0):
|
||||
self._register_with_iocp(conn)
|
||||
|
@ -511,17 +525,8 @@ class IocpProactor:
|
|||
except BrokenPipeError:
|
||||
return self._result((0, None))
|
||||
|
||||
def finish_recv(trans, key, ov):
|
||||
try:
|
||||
return ov.getresult()
|
||||
except OSError as exc:
|
||||
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
|
||||
_overlapped.ERROR_OPERATION_ABORTED):
|
||||
raise ConnectionResetError(*exc.args)
|
||||
else:
|
||||
raise
|
||||
|
||||
return self._register(ov, conn, finish_recv)
|
||||
return self._register(ov, conn, partial(self._finish_recvfrom,
|
||||
empty_result=0))
|
||||
|
||||
def sendto(self, conn, buf, flags=0, addr=None):
|
||||
self._register_with_iocp(conn)
|
||||
|
|
|
@ -1378,6 +1378,80 @@ class EventLoopTestsMixin:
|
|||
tr.close()
|
||||
self.loop.run_until_complete(pr.done)
|
||||
|
||||
def test_datagram_send_to_non_listening_address(self):
|
||||
# see:
|
||||
# https://github.com/python/cpython/issues/91227
|
||||
# https://github.com/python/cpython/issues/88906
|
||||
# https://bugs.python.org/issue47071
|
||||
# https://bugs.python.org/issue44743
|
||||
# The Proactor event loop would fail to receive datagram messages after
|
||||
# sending a message to an address that wasn't listening.
|
||||
loop = self.loop
|
||||
|
||||
class Protocol(asyncio.DatagramProtocol):
|
||||
|
||||
_received_datagram = None
|
||||
|
||||
def datagram_received(self, data, addr):
|
||||
self._received_datagram.set_result(data)
|
||||
|
||||
async def wait_for_datagram_received(self):
|
||||
self._received_datagram = loop.create_future()
|
||||
result = await asyncio.wait_for(self._received_datagram, 10)
|
||||
self._received_datagram = None
|
||||
return result
|
||||
|
||||
def create_socket():
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setblocking(False)
|
||||
sock.bind(('127.0.0.1', 0))
|
||||
return sock
|
||||
|
||||
socket_1 = create_socket()
|
||||
transport_1, protocol_1 = loop.run_until_complete(
|
||||
loop.create_datagram_endpoint(Protocol, sock=socket_1)
|
||||
)
|
||||
addr_1 = socket_1.getsockname()
|
||||
|
||||
socket_2 = create_socket()
|
||||
transport_2, protocol_2 = loop.run_until_complete(
|
||||
loop.create_datagram_endpoint(Protocol, sock=socket_2)
|
||||
)
|
||||
addr_2 = socket_2.getsockname()
|
||||
|
||||
# creating and immediately closing this to try to get an address that
|
||||
# is not listening
|
||||
socket_3 = create_socket()
|
||||
transport_3, protocol_3 = loop.run_until_complete(
|
||||
loop.create_datagram_endpoint(Protocol, sock=socket_3)
|
||||
)
|
||||
addr_3 = socket_3.getsockname()
|
||||
transport_3.abort()
|
||||
|
||||
transport_1.sendto(b'a', addr=addr_2)
|
||||
self.assertEqual(loop.run_until_complete(
|
||||
protocol_2.wait_for_datagram_received()
|
||||
), b'a')
|
||||
|
||||
transport_2.sendto(b'b', addr=addr_1)
|
||||
self.assertEqual(loop.run_until_complete(
|
||||
protocol_1.wait_for_datagram_received()
|
||||
), b'b')
|
||||
|
||||
# this should send to an address that isn't listening
|
||||
transport_1.sendto(b'c', addr=addr_3)
|
||||
loop.run_until_complete(asyncio.sleep(0))
|
||||
|
||||
# transport 1 should still be able to receive messages after sending to
|
||||
# an address that wasn't listening
|
||||
transport_2.sendto(b'd', addr=addr_1)
|
||||
self.assertEqual(loop.run_until_complete(
|
||||
protocol_1.wait_for_datagram_received()
|
||||
), b'd')
|
||||
|
||||
transport_1.close()
|
||||
transport_2.close()
|
||||
|
||||
def test_internal_fds(self):
|
||||
loop = self.create_event_loop()
|
||||
if not isinstance(loop, selector_events.BaseSelectorEventLoop):
|
||||
|
|
|
@ -555,12 +555,93 @@ if sys.platform == 'win32':
|
|||
def create_event_loop(self):
|
||||
return asyncio.SelectorEventLoop()
|
||||
|
||||
|
||||
class ProactorEventLoopTests(BaseSockTestsMixin,
|
||||
test_utils.TestCase):
|
||||
|
||||
def create_event_loop(self):
|
||||
return asyncio.ProactorEventLoop()
|
||||
|
||||
|
||||
async def _basetest_datagram_send_to_non_listening_address(self,
|
||||
recvfrom):
|
||||
# see:
|
||||
# https://github.com/python/cpython/issues/91227
|
||||
# https://github.com/python/cpython/issues/88906
|
||||
# https://bugs.python.org/issue47071
|
||||
# https://bugs.python.org/issue44743
|
||||
# The Proactor event loop would fail to receive datagram messages
|
||||
# after sending a message to an address that wasn't listening.
|
||||
|
||||
def create_socket():
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
sock.setblocking(False)
|
||||
sock.bind(('127.0.0.1', 0))
|
||||
return sock
|
||||
|
||||
socket_1 = create_socket()
|
||||
addr_1 = socket_1.getsockname()
|
||||
|
||||
socket_2 = create_socket()
|
||||
addr_2 = socket_2.getsockname()
|
||||
|
||||
# creating and immediately closing this to try to get an address
|
||||
# that is not listening
|
||||
socket_3 = create_socket()
|
||||
addr_3 = socket_3.getsockname()
|
||||
socket_3.shutdown(socket.SHUT_RDWR)
|
||||
socket_3.close()
|
||||
|
||||
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
|
||||
socket_2_recv_task = self.loop.create_task(recvfrom(socket_2))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
await self.loop.sock_sendto(socket_1, b'a', addr_2)
|
||||
self.assertEqual(await socket_2_recv_task, b'a')
|
||||
|
||||
await self.loop.sock_sendto(socket_2, b'b', addr_1)
|
||||
self.assertEqual(await socket_1_recv_task, b'b')
|
||||
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# this should send to an address that isn't listening
|
||||
await self.loop.sock_sendto(socket_1, b'c', addr_3)
|
||||
self.assertEqual(await socket_1_recv_task, b'')
|
||||
socket_1_recv_task = self.loop.create_task(recvfrom(socket_1))
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# socket 1 should still be able to receive messages after sending
|
||||
# to an address that wasn't listening
|
||||
socket_2.sendto(b'd', addr_1)
|
||||
self.assertEqual(await socket_1_recv_task, b'd')
|
||||
|
||||
socket_1.shutdown(socket.SHUT_RDWR)
|
||||
socket_1.close()
|
||||
socket_2.shutdown(socket.SHUT_RDWR)
|
||||
socket_2.close()
|
||||
|
||||
|
||||
def test_datagram_send_to_non_listening_address_recvfrom(self):
|
||||
async def recvfrom(socket):
|
||||
data, _ = await self.loop.sock_recvfrom(socket, 4096)
|
||||
return data
|
||||
|
||||
self.loop.run_until_complete(
|
||||
self._basetest_datagram_send_to_non_listening_address(
|
||||
recvfrom))
|
||||
|
||||
|
||||
def test_datagram_send_to_non_listening_address_recvfrom_into(self):
|
||||
async def recvfrom_into(socket):
|
||||
buf = bytearray(4096)
|
||||
length, _ = await self.loop.sock_recvfrom_into(socket, buf,
|
||||
4096)
|
||||
return buf[:length]
|
||||
|
||||
self.loop.run_until_complete(
|
||||
self._basetest_datagram_send_to_non_listening_address(
|
||||
recvfrom_into))
|
||||
|
||||
else:
|
||||
import selectors
|
||||
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Fix the asyncio ProactorEventLoop implementation so that sending a datagram to an address that is not listening does not prevent receiving any more datagrams.
|
|
@ -2056,6 +2056,7 @@ overlapped_exec(PyObject *module)
|
|||
WINAPI_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED);
|
||||
WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
|
||||
WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
|
||||
WINAPI_CONSTANT(F_DWORD, ERROR_PORT_UNREACHABLE);
|
||||
WINAPI_CONSTANT(F_DWORD, INFINITE);
|
||||
WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE);
|
||||
WINAPI_CONSTANT(F_HANDLE, NULL);
|
||||
|
|
Loading…
Reference in New Issue