[3.6] bpo-26819: Prevent proactor double read on resume (GH-6921) (#7110)

The proactor event loop has a race condition when reading with
pausing/resuming. `resume_reading()` unconditionally schedules the read
function to read from the current future. If `resume_reading()` was
called before the previously scheduled done callback fires, this results
in two attempts to get the data from the most recent read and an
assertion failure. This commit tracks whether or not `resume_reading`
needs to reschedule the callback to restart the loop, preventing a
second attempt to read the data..
(cherry picked from commit 4151061855)

Co-authored-by: CtrlZvi <viz+github@flippedperspective.com>
This commit is contained in:
CtrlZvi 2018-05-25 01:03:25 -07:00 committed by Andrew Svetlov
parent 36f066a974
commit 749afe81ec
3 changed files with 15 additions and 2 deletions

View File

@ -156,6 +156,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
extra=None, server=None): extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server) super().__init__(loop, sock, protocol, waiter, extra, server)
self._paused = False self._paused = False
self._reschedule_on_resume = False
self._loop.call_soon(self._loop_reading) self._loop.call_soon(self._loop_reading)
def pause_reading(self): def pause_reading(self):
@ -173,12 +174,15 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
self._paused = False self._paused = False
if self._closing: if self._closing:
return return
self._loop.call_soon(self._loop_reading, self._read_fut) if self._reschedule_on_resume:
self._loop.call_soon(self._loop_reading, self._read_fut)
self._reschedule_on_resume = False
if self._loop.get_debug(): if self._loop.get_debug():
logger.debug("%r resumes reading", self) logger.debug("%r resumes reading", self)
def _loop_reading(self, fut=None): def _loop_reading(self, fut=None):
if self._paused: if self._paused:
self._reschedule_on_resume = True
return return
data = None data = None

View File

@ -330,7 +330,7 @@ class ProactorSocketTransportTests(test_utils.TestCase):
def test_pause_resume_reading(self): def test_pause_resume_reading(self):
tr = self.socket_transport() tr = self.socket_transport()
futures = [] futures = []
for msg in [b'data1', b'data2', b'data3', b'data4', b'']: for msg in [b'data1', b'data2', b'data3', b'data4', b'data5', b'']:
f = asyncio.Future(loop=self.loop) f = asyncio.Future(loop=self.loop)
f.set_result(msg) f.set_result(msg)
futures.append(f) futures.append(f)
@ -352,6 +352,13 @@ class ProactorSocketTransportTests(test_utils.TestCase):
self.protocol.data_received.assert_called_with(b'data3') self.protocol.data_received.assert_called_with(b'data3')
self.loop._run_once() self.loop._run_once()
self.protocol.data_received.assert_called_with(b'data4') self.protocol.data_received.assert_called_with(b'data4')
tr.pause_reading()
tr.resume_reading()
self.loop.call_exception_handler = mock.Mock()
self.loop._run_once()
self.loop.call_exception_handler.assert_not_called()
self.protocol.data_received.assert_called_with(b'data5')
tr.close() tr.close()

View File

@ -0,0 +1,2 @@
Fix race condition with `ReadTransport.resume_reading` in Windows proactor
event loop.