diff --git a/Lib/test/test_os.py b/Lib/test/test_os.py index 6a6371746cf..4b7ded2aabd 100644 --- a/Lib/test/test_os.py +++ b/Lib/test/test_os.py @@ -933,20 +933,64 @@ else: @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32KillTests(unittest.TestCase): - def _kill(self, sig, *args): - # Send a subprocess a signal (or in some cases, just an int to be - # the return value) - proc = subprocess.Popen(*args) + def _kill(self, sig): + # Start sys.executable as a subprocess and communicate from the + # subprocess to the parent that the interpreter is ready. When it + # becomes ready, send *sig* via os.kill to the subprocess and check + # that the return code is equal to *sig*. + import ctypes + from ctypes import wintypes + import msvcrt + + # Since we can't access the contents of the process' stdout until the + # process has exited, use PeekNamedPipe to see what's inside stdout + # without waiting. This is done so we can tell that the interpreter + # is started and running at a point where it could handle a signal. + PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe + PeekNamedPipe.restype = wintypes.BOOL + PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle + ctypes.POINTER(ctypes.c_char), # stdout buf + wintypes.DWORD, # Buffer size + ctypes.POINTER(wintypes.DWORD), # bytes read + ctypes.POINTER(wintypes.DWORD), # bytes avail + ctypes.POINTER(wintypes.DWORD)) # bytes left + msg = "running" + proc = subprocess.Popen([sys.executable, "-c", + "import sys;" + "sys.stdout.write('{}');" + "sys.stdout.flush();" + "input()".format(msg)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE) + + count, max = 0, 100 + while count < max and proc.poll() is None: + # Create a string buffer to store the result of stdout from the pipe + buf = ctypes.create_string_buffer(len(msg)) + # Obtain the text currently in proc.stdout + # Bytes read/avail/left are left as NULL and unused + rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), + buf, ctypes.sizeof(buf), None, None, None) + self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") + if buf.value: + self.assertEqual(msg, buf.value.decode()) + break + time.sleep(0.1) + count += 1 + else: + self.fail("Did not receive communication from the subprocess") + os.kill(proc.pid, sig) self.assertEqual(proc.wait(), sig) def test_kill_sigterm(self): # SIGTERM doesn't mean anything special, but make sure it works - self._kill(signal.SIGTERM, [sys.executable]) + self._kill(signal.SIGTERM) def test_kill_int(self): # os.kill on Windows can take an int which gets set as the exit code - self._kill(100, [sys.executable]) + self._kill(100) def _kill_with_event(self, event, name): # Run a script which has console control handling enabled.