bpo-36719: Fix regrtest MultiprocessThread (GH-13301)
MultiprocessThread.kill() now closes stdout and stderr to prevent popen.communicate() to hang.
This commit is contained in:
parent
1a10a6b980
commit
c923c3449f
|
@ -21,6 +21,9 @@ from test.libregrtest.utils import format_duration
|
|||
# Display the running tests if nothing happened last N seconds
|
||||
PROGRESS_UPDATE = 30.0 # seconds
|
||||
|
||||
# Time to wait until a worker completes: should be immediate
|
||||
JOIN_TIMEOUT = 30.0 # seconds
|
||||
|
||||
|
||||
def must_stop(result, ns):
|
||||
if result.result == INTERRUPTED:
|
||||
|
@ -91,6 +94,10 @@ class MultiprocessIterator:
|
|||
MultiprocessResult = collections.namedtuple('MultiprocessResult',
|
||||
'result stdout stderr error_msg')
|
||||
|
||||
class ExitThread(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MultiprocessThread(threading.Thread):
|
||||
def __init__(self, pending, output, ns):
|
||||
super().__init__()
|
||||
|
@ -100,13 +107,31 @@ class MultiprocessThread(threading.Thread):
|
|||
self.current_test_name = None
|
||||
self.start_time = None
|
||||
self._popen = None
|
||||
self._killed = False
|
||||
|
||||
def __repr__(self):
|
||||
info = ['MultiprocessThread']
|
||||
test = self.current_test_name
|
||||
if self.is_alive():
|
||||
info.append('alive')
|
||||
if test:
|
||||
info.append(f'test={test}')
|
||||
popen = self._popen
|
||||
if popen:
|
||||
info.append(f'pid={popen.pid}')
|
||||
return '<%s>' % ' '.join(info)
|
||||
|
||||
def kill(self):
|
||||
self._killed = True
|
||||
|
||||
popen = self._popen
|
||||
if popen is None:
|
||||
return
|
||||
print("Kill regrtest worker process %s" % popen.pid)
|
||||
popen.kill()
|
||||
# stdout and stderr must be closed to ensure that communicate()
|
||||
# does not hang
|
||||
popen.stdout.close()
|
||||
popen.stderr.close()
|
||||
|
||||
def _runtest(self, test_name):
|
||||
try:
|
||||
|
@ -117,7 +142,21 @@ class MultiprocessThread(threading.Thread):
|
|||
popen = self._popen
|
||||
with popen:
|
||||
try:
|
||||
stdout, stderr = popen.communicate()
|
||||
if self._killed:
|
||||
# If kill() has been called before self._popen is set,
|
||||
# self._popen is still running. Call again kill()
|
||||
# to ensure that the process is killed.
|
||||
self.kill()
|
||||
raise ExitThread
|
||||
|
||||
try:
|
||||
stdout, stderr = popen.communicate()
|
||||
except OSError:
|
||||
if self._killed:
|
||||
# kill() has been called: communicate() fails
|
||||
# on reading closed stdout/stderr
|
||||
raise ExitThread
|
||||
raise
|
||||
except:
|
||||
self.kill()
|
||||
popen.wait()
|
||||
|
@ -154,7 +193,7 @@ class MultiprocessThread(threading.Thread):
|
|||
return MultiprocessResult(result, stdout, stderr, err_msg)
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
while not self._killed:
|
||||
try:
|
||||
try:
|
||||
test_name = next(self.pending)
|
||||
|
@ -166,6 +205,8 @@ class MultiprocessThread(threading.Thread):
|
|||
|
||||
if must_stop(mp_result.result, self.ns):
|
||||
break
|
||||
except ExitThread:
|
||||
break
|
||||
except BaseException:
|
||||
self.output.put((True, traceback.format_exc()))
|
||||
break
|
||||
|
@ -205,10 +246,20 @@ class MultiprocessRunner:
|
|||
worker.start()
|
||||
|
||||
def wait_workers(self):
|
||||
start_time = time.monotonic()
|
||||
for worker in self.workers:
|
||||
worker.kill()
|
||||
for worker in self.workers:
|
||||
worker.join()
|
||||
while True:
|
||||
worker.join(1.0)
|
||||
if not worker.is_alive():
|
||||
break
|
||||
dt = time.monotonic() - start_time
|
||||
print("Wait for regrtest worker %r for %.1f sec" % (worker, dt))
|
||||
if dt > JOIN_TIMEOUT:
|
||||
print("Warning -- failed to join a regrtest worker %s"
|
||||
% worker)
|
||||
break
|
||||
|
||||
def _get_result(self):
|
||||
if not any(worker.is_alive() for worker in self.workers):
|
||||
|
|
Loading…
Reference in New Issue