mirror of https://github.com/python/cpython
Issue #25441: asyncio: Raise error from drain() when socket is closed. (Merge 3.5->3.6)
This commit is contained in:
commit
b09f627f4f
|
@ -301,6 +301,15 @@ class StreamWriter:
|
|||
exc = self._reader.exception()
|
||||
if exc is not None:
|
||||
raise exc
|
||||
if self._transport is not None:
|
||||
if self._transport._closing:
|
||||
# Yield to the event loop so connection_lost() may be
|
||||
# called. Without this, _drain_helper() would return
|
||||
# immediately, and code that calls
|
||||
# write(...); yield from drain()
|
||||
# in a loop would never call connection_lost(), so it
|
||||
# would not see an error when the socket is closed.
|
||||
yield
|
||||
yield from self._protocol._drain_helper()
|
||||
|
||||
|
||||
|
|
|
@ -2,8 +2,10 @@
|
|||
|
||||
import gc
|
||||
import os
|
||||
import queue
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
from unittest import mock
|
||||
try:
|
||||
|
@ -632,6 +634,47 @@ os.close(fd)
|
|||
protocol = asyncio.StreamReaderProtocol(reader)
|
||||
self.assertIs(protocol._loop, self.loop)
|
||||
|
||||
def test_drain_raises(self):
|
||||
# See http://bugs.python.org/issue25441
|
||||
|
||||
# This test should not use asyncio for the mock server; the
|
||||
# whole point of the test is to test for a bug in drain()
|
||||
# where it never gives up the event loop but the socket is
|
||||
# closed on the server side.
|
||||
|
||||
q = queue.Queue()
|
||||
|
||||
def server():
|
||||
# Runs in a separate thread.
|
||||
sock = socket.socket()
|
||||
sock.bind(('localhost', 0))
|
||||
sock.listen(1)
|
||||
addr = sock.getsockname()
|
||||
q.put(addr)
|
||||
clt, _ = sock.accept()
|
||||
clt.close()
|
||||
|
||||
@asyncio.coroutine
|
||||
def client(host, port):
|
||||
reader, writer = yield from asyncio.open_connection(host, port, loop=self.loop)
|
||||
while True:
|
||||
writer.write(b"foo\n")
|
||||
yield from writer.drain()
|
||||
|
||||
# Start the server thread and wait for it to be listening.
|
||||
thread = threading.Thread(target=server)
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
addr = q.get()
|
||||
|
||||
# Should not be stuck in an infinite loop.
|
||||
with self.assertRaises((ConnectionResetError, BrokenPipeError)):
|
||||
self.loop.run_until_complete(client(*addr))
|
||||
|
||||
# Clean up the thread. (Only on success; on failure, it may
|
||||
# be stuck in accept().)
|
||||
thread.join()
|
||||
|
||||
def test___repr__(self):
|
||||
stream = asyncio.StreamReader(loop=self.loop)
|
||||
self.assertEqual("<StreamReader>", repr(stream))
|
||||
|
|
Loading…
Reference in New Issue