Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-14 02:13:51 +01:00
commit e54dd0b92b
3 changed files with 100 additions and 25 deletions

View File

@ -96,32 +96,61 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def kill(self):
self._proc.kill()
def _kill_wait(self):
"""Close pipes, kill the subprocess and read its return status.
Function called when an exception is raised during the creation
of a subprocess.
"""
if self._loop.get_debug():
logger.warning('Exception during subprocess creation, '
'kill the subprocess %r',
self,
exc_info=True)
proc = self._proc
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
if proc.stdin:
proc.stdin.close()
try:
proc.kill()
except ProcessLookupError:
pass
proc.wait()
@coroutine
def _post_init(self):
proc = self._proc
loop = self._loop
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
self._pipes[2] = pipe
try:
proc = self._proc
loop = self._loop
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
self._pipes[2] = pipe
assert self._pending_calls is not None
assert self._pending_calls is not None
self._loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
self._loop.call_soon(callback, *data)
self._pending_calls = None
self._loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
self._loop.call_soon(callback, *data)
self._pending_calls = None
except:
self._kill_wait()
raise
def _call(self, cb, *data):
if self._pending_calls is not None:

View File

@ -60,7 +60,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
protocol=self,
reader=None,
loop=self._loop)
self.waiter.set_result(None)
if not self.waiter.cancelled():
self.waiter.set_result(None)
def pipe_data_received(self, fd, data):
if fd == 1:
@ -216,7 +218,11 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
yield from protocol.waiter
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)
@coroutine
@ -232,5 +238,9 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
yield from protocol.waiter
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)

View File

@ -251,6 +251,42 @@ class SubprocessMixin:
self.loop.run_until_complete(cancel_wait())
def test_cancel_make_subprocess_transport_exec(self):
@asyncio.coroutine
def cancel_make_transport():
coro = asyncio.create_subprocess_exec(*PROGRAM_BLOCKED,
loop=self.loop)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
yield from task
except asyncio.CancelledError:
pass
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with test_utils.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
def test_cancel_post_init(self):
@asyncio.coroutine
def cancel_make_transport():
coro = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
*PROGRAM_BLOCKED)
task = self.loop.create_task(coro)
self.loop.call_soon(task.cancel)
try:
yield from task
except asyncio.CancelledError:
pass
# ignore the log:
# "Exception during subprocess creation, kill the subprocess"
with test_utils.disable_logger():
self.loop.run_until_complete(cancel_make_transport())
if sys.platform != 'win32':
# Unix