gh-108973: Fix asyncio test_subprocess_consistent_callbacks() (#109431)

SubprocessProtocol process_exited() method can be called before
pipe_data_received() and pipe_connection_lost() methods. Document it
and adapt the test for that.

Revert commit 282edd7b2a.
_child_watcher_callback() calls immediately _process_exited(): don't
add an additional delay with call_soon(). The reverted change didn't
make _process_exited() more determistic: it can still be called
before pipe_connection_lost() for example.

Co-authored-by: Davide Rizzo <sorcio@gmail.com>
This commit is contained in:
Victor Stinner 2023-09-20 15:54:19 +02:00 committed by GitHub
parent 850cc8d0b1
commit ced6924630
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 19 deletions

View File

@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**:
:widths: 50 50
:class: full-width-table
* - ``callback`` :meth:`pipe_data_received()
<SubprocessProtocol.pipe_data_received>`
* - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received`
- Called when the child process writes data into its
*stdout* or *stderr* pipe.
* - ``callback`` :meth:`pipe_connection_lost()
<SubprocessProtocol.pipe_connection_lost>`
* - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost`
- Called when one of the pipes communicating with
the child process is closed.
* - ``callback`` :meth:`process_exited()
<SubprocessProtocol.process_exited>`
- Called when the child process has exited.
- Called when the child process has exited. It can be called before
:meth:`~SubprocessProtocol.pipe_data_received` and
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.
Event Loop Policies

View File

@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and
Called when the child process has exited.
It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.
Examples
========
@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method::
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
self.pipe_closed = False
self.exited = False
def pipe_connection_lost(self, fd, exc):
self.pipe_closed = True
self.check_for_exit()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
self.exited = True
# process_exited() method can be called before
# pipe_connection_lost() method: wait until both methods are
# called.
self.check_for_exit()
def check_for_exit(self):
if self.pipe_closed and self.exited:
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use

View File

@ -226,8 +226,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
return transp
def _child_watcher_callback(self, pid, returncode, transp):
# Skip one iteration for callbacks to be executed
self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
self.call_soon_threadsafe(transp._process_exited, returncode)
async def create_unix_connection(
self, protocol_factory, path=None, *,

View File

@ -753,21 +753,44 @@ class SubprocessMixin:
self.loop.run_until_complete(main())
def test_subprocess_consistent_callbacks(self):
def test_subprocess_protocol_events(self):
# gh-108973: Test that all subprocess protocol methods are called.
# The protocol methods are not called in a determistic order.
# The order depends on the event loop and the operating system.
events = []
fds = [1, 2]
expected = [
('pipe_data_received', 1, b'stdout'),
('pipe_data_received', 2, b'stderr'),
('pipe_connection_lost', 1),
('pipe_connection_lost', 2),
'process_exited',
]
per_fd_expected = [
'pipe_data_received',
'pipe_connection_lost',
]
class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future: asyncio.Future) -> None:
self.exit_future = exit_future
def pipe_data_received(self, fd, data) -> None:
events.append(('pipe_data_received', fd, data))
self.exit_maybe()
def pipe_connection_lost(self, fd, exc) -> None:
events.append('pipe_connection_lost')
events.append(('pipe_connection_lost', fd))
self.exit_maybe()
def process_exited(self) -> None:
events.append('process_exited')
self.exit_future.set_result(True)
self.exit_maybe()
def exit_maybe(self):
# Only exit when we got all expected events
if len(events) >= len(expected):
self.exit_future.set_result(True)
async def main() -> None:
loop = asyncio.get_running_loop()
@ -777,15 +800,24 @@ class SubprocessMixin:
sys.executable, '-c', code, stdin=None)
await exit_future
transport.close()
self.assertEqual(events, [
('pipe_data_received', 1, b'stdout'),
('pipe_data_received', 2, b'stderr'),
'pipe_connection_lost',
'pipe_connection_lost',
'process_exited',
])
self.loop.run_until_complete(main())
return events
events = self.loop.run_until_complete(main())
# First, make sure that we received all events
self.assertSetEqual(set(events), set(expected))
# Second, check order of pipe events per file descriptor
per_fd_events = {fd: [] for fd in fds}
for event in events:
if event == 'process_exited':
continue
name, fd = event[:2]
per_fd_events[fd].append(name)
for fd in fds:
self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))
def test_subprocess_communicate_stdout(self):
# See https://github.com/python/cpython/issues/100133