Issue #23293, asyncio: Rewrite IocpProactor.connect_pipe() as a coroutine

Use a coroutine with asyncio.sleep() instead of call_later() to ensure that the
schedule call is cancelled.

Add also a unit test cancelling connect_pipe().
This commit is contained in:
Victor Stinner 2015-01-26 15:04:03 +01:00
parent 2a3f38fd29
commit e0fd157ba0
2 changed files with 31 additions and 21 deletions

View File

@ -518,28 +518,25 @@ class IocpProactor:
return self._register(ov, pipe, finish_accept_pipe)
def _connect_pipe(self, fut, address, delay):
@coroutine
def connect_pipe(self, address):
delay = CONNECT_PIPE_INIT_DELAY
while True:
# Unfortunately there is no way to do an overlapped connect to a pipe.
# Call CreateFile() in a loop until it doesn't fail with
# ERROR_PIPE_BUSY
try:
handle = _overlapped.ConnectPipe(address)
break
except OSError as exc:
if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
# Polling: retry later
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
self._loop.call_later(delay,
self._connect_pipe, fut, address, delay)
else:
fut.set_exception(exc)
else:
pipe = windows_utils.PipeHandle(handle)
fut.set_result(pipe)
if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
raise
def connect_pipe(self, address):
fut = futures.Future(loop=self._loop)
self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
return fut
# ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
yield from tasks.sleep(delay, loop=self._loop)
return windows_utils.PipeHandle(handle)
def wait_for_handle(self, handle, timeout=None):
"""Wait for a handle.

View File

@ -1,6 +1,7 @@
import os
import sys
import unittest
from unittest import mock
if sys.platform != 'win32':
raise unittest.SkipTest('Windows only')
@ -91,6 +92,18 @@ class ProactorTests(test_utils.TestCase):
return 'done'
def test_connect_pipe_cancel(self):
exc = OSError()
exc.winerror = _overlapped.ERROR_PIPE_BUSY
with mock.patch.object(_overlapped, 'ConnectPipe', side_effect=exc) as connect:
coro = self.loop._proactor.connect_pipe('pipe_address')
task = self.loop.create_task(coro)
# check that it's possible to cancel connect_pipe()
task.cancel()
with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(task)
def test_wait_for_handle(self):
event = _overlapped.CreateEvent(None, True, False, None)
self.addCleanup(_winapi.CloseHandle, event)