bpo-26819: Prevent proactor double read on resume (GH-6921)

The proactor event loop has a race condition when reading with
pausing/resuming. `resume_reading()` unconditionally schedules the read
function to read from the current future. If `resume_reading()` was
called before the previously scheduled done callback fires, this results
in two attempts to get the data from the most recent read and an
assertion failure. This commit tracks whether or not `resume_reading`
needs to reschedule the callback to restart the loop, preventing a
second attempt to read the data.
(cherry picked from commit 4151061855)

Co-authored-by: CtrlZvi <viz+github@flippedperspective.com>
This commit is contained in:
Miss Islington (bot) 2018-05-20 03:57:32 -07:00 committed by GitHub
parent f0af69faee
commit 28ea38b97b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 2 deletions

View File

@ -161,6 +161,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
self._paused = False
self._reschedule_on_resume = False
if protocols._is_buffered_protocol(protocol):
self._loop_reading = self._loop_reading__get_buffer
@ -180,6 +181,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._read_fut is not None and not self._read_fut.done():
self._read_fut.cancel()
self._read_fut = None
self._reschedule_on_resume = True
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
@ -188,7 +190,9 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if self._closing or not self._paused:
return
self._paused = False
self._loop.call_soon(self._loop_reading, self._read_fut)
if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@ -208,6 +212,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def _loop_reading__data_received(self, fut=None):
if self._paused:
self._reschedule_on_resume = True
return
data = None
@ -257,6 +262,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def _loop_reading__get_buffer(self, fut=None):
if self._paused:
self._reschedule_on_resume = True
return
nbytes = None

View File

@ -334,7 +334,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
def test_pause_resume_reading(self):
tr = self.socket_transport()
futures = []
for msg in [b'data1', b'data2', b'data3', b'data4', b'']:
for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
f = asyncio.Future(loop=self.loop)
f.set_result(msg)
futures.append(f)
@ -364,6 +364,13 @@ class ProactorSocketTransportTests(test_utils.TestCase):
self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4')
tr.pause_reading()
tr.resume_reading()
self.loop.call_exception_handler = mock.Mock()
self.loop._run_once()
self.loop.call_exception_handler.assert_not_called()
self.protocol.data_received.assert_called_with(b'data5')
tr.close()
self.assertFalse(tr.is_reading())

View File

@ -0,0 +1,2 @@
Fix race condition with `ReadTransport.resume_reading` in Windows proactor
event loop.