bpo-35424: emit ResourceWarning at multiprocessing.Pool destruction (GH-10974)
multiprocessing.Pool destructor now emits ResourceWarning if the pool is still running.
This commit is contained in:
parent
c5d5dfdb22
commit
9a8d1d7562
|
@ -13,13 +13,14 @@ __all__ = ['Pool', 'ThreadPool']
|
||||||
# Imports
|
# Imports
|
||||||
#
|
#
|
||||||
|
|
||||||
import threading
|
|
||||||
import queue
|
|
||||||
import itertools
|
|
||||||
import collections
|
import collections
|
||||||
|
import itertools
|
||||||
import os
|
import os
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
|
import warnings
|
||||||
|
|
||||||
# If threading is available then ThreadPool should be provided. Therefore
|
# If threading is available then ThreadPool should be provided. Therefore
|
||||||
# we avoid top-level imports which are liable to fail on some systems.
|
# we avoid top-level imports which are liable to fail on some systems.
|
||||||
|
@ -30,6 +31,7 @@ from . import get_context, TimeoutError
|
||||||
# Constants representing the state of a pool
|
# Constants representing the state of a pool
|
||||||
#
|
#
|
||||||
|
|
||||||
|
INIT = "INIT"
|
||||||
RUN = "RUN"
|
RUN = "RUN"
|
||||||
CLOSE = "CLOSE"
|
CLOSE = "CLOSE"
|
||||||
TERMINATE = "TERMINATE"
|
TERMINATE = "TERMINATE"
|
||||||
|
@ -154,11 +156,15 @@ class Pool(object):
|
||||||
|
|
||||||
def __init__(self, processes=None, initializer=None, initargs=(),
|
def __init__(self, processes=None, initializer=None, initargs=(),
|
||||||
maxtasksperchild=None, context=None):
|
maxtasksperchild=None, context=None):
|
||||||
|
# Attributes initialized early to make sure that they exist in
|
||||||
|
# __del__() if __init__() raises an exception
|
||||||
|
self._pool = []
|
||||||
|
self._state = INIT
|
||||||
|
|
||||||
self._ctx = context or get_context()
|
self._ctx = context or get_context()
|
||||||
self._setup_queues()
|
self._setup_queues()
|
||||||
self._taskqueue = queue.SimpleQueue()
|
self._taskqueue = queue.SimpleQueue()
|
||||||
self._cache = {}
|
self._cache = {}
|
||||||
self._state = RUN
|
|
||||||
self._maxtasksperchild = maxtasksperchild
|
self._maxtasksperchild = maxtasksperchild
|
||||||
self._initializer = initializer
|
self._initializer = initializer
|
||||||
self._initargs = initargs
|
self._initargs = initargs
|
||||||
|
@ -172,7 +178,6 @@ class Pool(object):
|
||||||
raise TypeError('initializer must be a callable')
|
raise TypeError('initializer must be a callable')
|
||||||
|
|
||||||
self._processes = processes
|
self._processes = processes
|
||||||
self._pool = []
|
|
||||||
try:
|
try:
|
||||||
self._repopulate_pool()
|
self._repopulate_pool()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -216,6 +221,14 @@ class Pool(object):
|
||||||
self._result_handler, self._cache),
|
self._result_handler, self._cache),
|
||||||
exitpriority=15
|
exitpriority=15
|
||||||
)
|
)
|
||||||
|
self._state = RUN
|
||||||
|
|
||||||
|
# Copy globals as function locals to make sure that they are available
|
||||||
|
# during Python shutdown when the Pool is destroyed.
|
||||||
|
def __del__(self, _warn=warnings.warn, RUN=RUN):
|
||||||
|
if self._state == RUN:
|
||||||
|
_warn(f"unclosed running multiprocessing pool {self!r}",
|
||||||
|
ResourceWarning, source=self)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
cls = self.__class__
|
cls = self.__class__
|
||||||
|
|
|
@ -2577,6 +2577,22 @@ class _TestPool(BaseTestCase):
|
||||||
pass
|
pass
|
||||||
pool.join()
|
pool.join()
|
||||||
|
|
||||||
|
def test_resource_warning(self):
|
||||||
|
if self.TYPE == 'manager':
|
||||||
|
self.skipTest("test not applicable to manager")
|
||||||
|
|
||||||
|
pool = self.Pool(1)
|
||||||
|
pool.terminate()
|
||||||
|
pool.join()
|
||||||
|
|
||||||
|
# force state to RUN to emit ResourceWarning in __del__()
|
||||||
|
pool._state = multiprocessing.pool.RUN
|
||||||
|
|
||||||
|
with support.check_warnings(('unclosed running multiprocessing pool',
|
||||||
|
ResourceWarning)):
|
||||||
|
pool = None
|
||||||
|
support.gc_collect()
|
||||||
|
|
||||||
|
|
||||||
def raising():
|
def raising():
|
||||||
raise KeyError("key")
|
raise KeyError("key")
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
:class:`multiprocessing.Pool` destructor now emits :exc:`ResourceWarning`
|
||||||
|
if the pool is still running.
|
Loading…
Reference in New Issue