gh-124694: Add concurrent.futures.InterpreterPoolExecutor (gh-124548)

This is an implementation of InterpreterPoolExecutor that builds on ThreadPoolExecutor.

(Note that this is not tied to PEP 734, which is strictly about adding a new stdlib module.)

Possible future improvements:

* support passing a script for the initializer or to submit()
* support passing (most) arbitrary functions without pickling
* support passing closures
* optionally exec functions against __main__ instead of the their original module
This commit is contained in:
Eric Snow 2024-10-16 16:50:46 -06:00 committed by GitHub
parent a38fef4439
commit a5a7f5e16d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 828 additions and 40 deletions

View File

@ -103,7 +103,8 @@ To handle signals the event loop must be
run in the main thread. run in the main thread.
The :meth:`loop.run_in_executor` method can be used with a The :meth:`loop.run_in_executor` method can be used with a
:class:`concurrent.futures.ThreadPoolExecutor` to execute :class:`concurrent.futures.ThreadPoolExecutor` or
:class:`~concurrent.futures.InterpreterPoolExecutor` to execute
blocking code in a different OS thread without blocking the OS thread blocking code in a different OS thread without blocking the OS thread
that the event loop runs in. that the event loop runs in.
@ -128,7 +129,8 @@ if a function performs a CPU-intensive calculation for 1 second,
all concurrent asyncio Tasks and IO operations would be delayed all concurrent asyncio Tasks and IO operations would be delayed
by 1 second. by 1 second.
An executor can be used to run a task in a different thread or even in An executor can be used to run a task in a different thread,
including in a different interpreter, or even in
a different process to avoid blocking the OS thread with the a different process to avoid blocking the OS thread with the
event loop. See the :meth:`loop.run_in_executor` method for more event loop. See the :meth:`loop.run_in_executor` method for more
details. details.

View File

@ -1305,6 +1305,12 @@ Executing code in thread or process pools
pool, cpu_bound) pool, cpu_bound)
print('custom process pool', result) print('custom process pool', result)
# 4. Run in a custom interpreter pool:
with concurrent.futures.InterpreterPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom interpreter pool', result)
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(main()) asyncio.run(main())
@ -1329,7 +1335,8 @@ Executing code in thread or process pools
Set *executor* as the default executor used by :meth:`run_in_executor`. Set *executor* as the default executor used by :meth:`run_in_executor`.
*executor* must be an instance of *executor* must be an instance of
:class:`~concurrent.futures.ThreadPoolExecutor`. :class:`~concurrent.futures.ThreadPoolExecutor`, which includes
:class:`~concurrent.futures.InterpreterPoolExecutor`.
.. versionchanged:: 3.11 .. versionchanged:: 3.11
*executor* must be an instance of *executor* must be an instance of

View File

@ -96,7 +96,7 @@ See also the main documentation section about the
- Invoke a callback *at* the given time. - Invoke a callback *at* the given time.
.. rubric:: Thread/Process Pool .. rubric:: Thread/Interpreter/Process Pool
.. list-table:: .. list-table::
:widths: 50 50 :widths: 50 50
:class: full-width-table :class: full-width-table

View File

@ -15,9 +15,10 @@ The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables. asynchronously executing callables.
The asynchronous execution can be performed with threads, using The asynchronous execution can be performed with threads, using
:class:`ThreadPoolExecutor`, or separate processes, using :class:`ThreadPoolExecutor` or :class:`InterpreterPoolExecutor`,
:class:`ProcessPoolExecutor`. Both implement the same interface, which is or separate processes, using :class:`ProcessPoolExecutor`.
defined by the abstract :class:`Executor` class. Each implements the same interface, which is defined
by the abstract :class:`Executor` class.
.. include:: ../includes/wasm-notavail.rst .. include:: ../includes/wasm-notavail.rst
@ -63,7 +64,8 @@ Executor Objects
setting *chunksize* to a positive integer. For very long iterables, setting *chunksize* to a positive integer. For very long iterables,
using a large value for *chunksize* can significantly improve using a large value for *chunksize* can significantly improve
performance compared to the default size of 1. With performance compared to the default size of 1. With
:class:`ThreadPoolExecutor`, *chunksize* has no effect. :class:`ThreadPoolExecutor` and :class:`InterpreterPoolExecutor`,
*chunksize* has no effect.
.. versionchanged:: 3.5 .. versionchanged:: 3.5
Added the *chunksize* argument. Added the *chunksize* argument.
@ -227,6 +229,111 @@ ThreadPoolExecutor Example
print('%r page is %d bytes' % (url, len(data))) print('%r page is %d bytes' % (url, len(data)))
InterpreterPoolExecutor
-----------------------
The :class:`InterpreterPoolExecutor` class uses a pool of interpreters
to execute calls asynchronously. It is a :class:`ThreadPoolExecutor`
subclass, which means each worker is running in its own thread.
The difference here is that each worker has its own interpreter,
and runs each task using that interpreter.
The biggest benefit to using interpreters instead of only threads
is true multi-core parallelism. Each interpreter has its own
:term:`Global Interpreter Lock <global interpreter lock>`, so code
running in one interpreter can run on one CPU core, while code in
another interpreter runs unblocked on a different core.
The tradeoff is that writing concurrent code for use with multiple
interpreters can take extra effort. However, this is because it
forces you to be deliberate about how and when interpreters interact,
and to be explicit about what data is shared between interpreters.
This results in several benefits that help balance the extra effort,
including true multi-core parallelism, For example, code written
this way can make it easier to reason about concurrency. Another
major benefit is that you don't have to deal with several of the
big pain points of using threads, like nrace conditions.
Each worker's interpreter is isolated from all the other interpreters.
"Isolated" means each interpreter has its own runtime state and
operates completely independently. For example, if you redirect
:data:`sys.stdout` in one interpreter, it will not be automatically
redirected any other interpreter. If you import a module in one
interpreter, it is not automatically imported in any other. You
would need to import the module separately in interpreter where
you need it. In fact, each module imported in an interpreter is
a completely separate object from the same module in a different
interpreter, including :mod:`sys`, :mod:`builtins`,
and even ``__main__``.
Isolation means a mutable object, or other data, cannot be used
by more than one interpreter at the same time. That effectively means
interpreters cannot actually share such objects or data. Instead,
each interpreter must have its own copy, and you will have to
synchronize any changes between the copies manually. Immutable
objects and data, like the builtin singletons, strings, and tuples
of immutable objects, don't have these limitations.
Communicating and synchronizing between interpreters is most effectively
done using dedicated tools, like those proposed in :pep:`734`. One less
efficient alternative is to serialize with :mod:`pickle` and then send
the bytes over a shared :mod:`socket <socket>` or
:func:`pipe <os.pipe>`.
.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)
A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
using a pool of at most *max_workers* threads. Each thread runs
tasks in its own interpreter. The worker interpreters are isolated
from each other, which means each has its own runtime state and that
they can't share any mutable objects or other data. Each interpreter
has its own :term:`Global Interpreter Lock <global interpreter lock>`,
which means code run with this executor has true multi-core parallelism.
The optional *initializer* and *initargs* arguments have the same
meaning as for :class:`!ThreadPoolExecutor`: the initializer is run
when each worker is created, though in this case it is run.in
the worker's interpreter. The executor serializes the *initializer*
and *initargs* using :mod:`pickle` when sending them to the worker's
interpreter.
.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.
.. note::
The executor may replace uncaught exceptions from *initializer*
with :class:`~concurrent.futures.interpreter.ExecutionFailed`.
The optional *shared* argument is a :class:`dict` of objects that all
interpreters in the pool share. The *shared* items are added to each
interpreter's ``__main__`` module. Not all objects are shareable.
Shareable objects include the builtin singletons, :class:`str`
and :class:`bytes`, and :class:`memoryview`. See :pep:`734`
for more info.
Other caveats from parent :class:`ThreadPoolExecutor` apply here.
:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
except the worker serializes the callable and arguments using
:mod:`pickle` when sending them to its interpreter. The worker
likewise serializes the return value when sending it back.
.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.
When a worker's current task raises an uncaught exception, the worker
always tries to preserve the exception as-is. If that is successful
then it also sets the ``__cause__`` to a corresponding
:class:`~concurrent.futures.interpreter.ExecutionFailed`
instance, which contains a summary of the original exception.
In the uncommon case that the worker is not able to preserve the
original as-is then it directly preserves the corresponding
:class:`~concurrent.futures.interpreter.ExecutionFailed`
instance instead.
ProcessPoolExecutor ProcessPoolExecutor
------------------- -------------------
@ -574,6 +681,26 @@ Exception classes
.. versionadded:: 3.7 .. versionadded:: 3.7
.. currentmodule:: concurrent.futures.interpreter
.. exception:: BrokenInterpreterPool
Derived from :exc:`~concurrent.futures.thread.BrokenThreadPool`,
this exception class is raised when one of the workers
of a :class:`~concurrent.futures.InterpreterPoolExecutor`
has failed initializing.
.. versionadded:: next
.. exception:: ExecutionFailed
Raised from :class:`~concurrent.futures.InterpreterPoolExecutor` when
the given initializer fails or from
:meth:`~concurrent.futures.Executor.submit` when there's an uncaught
exception from the submitted task.
.. versionadded:: next
.. currentmodule:: concurrent.futures.process .. currentmodule:: concurrent.futures.process
.. exception:: BrokenProcessPool .. exception:: BrokenProcessPool

View File

@ -225,6 +225,14 @@ ast
* The ``repr()`` output for AST nodes now includes more information. * The ``repr()`` output for AST nodes now includes more information.
(Contributed by Tomas R in :gh:`116022`.) (Contributed by Tomas R in :gh:`116022`.)
concurrent.futures
------------------
* Add :class:`~concurrent.futures.InterpreterPoolExecutor`,
which exposes "subinterpreters (multiple Python interpreters in the
same process) to Python code. This is separate from the proposed API
in :pep:`734`.
(Contributed by Eric Snow in :gh:`124548`.)
ctypes ctypes
------ ------

View File

@ -29,6 +29,7 @@ __all__ = (
'Executor', 'Executor',
'wait', 'wait',
'as_completed', 'as_completed',
'InterpreterPoolExecutor',
'ProcessPoolExecutor', 'ProcessPoolExecutor',
'ThreadPoolExecutor', 'ThreadPoolExecutor',
) )
@ -39,7 +40,7 @@ def __dir__():
def __getattr__(name): def __getattr__(name):
global ProcessPoolExecutor, ThreadPoolExecutor global ProcessPoolExecutor, ThreadPoolExecutor, InterpreterPoolExecutor
if name == 'ProcessPoolExecutor': if name == 'ProcessPoolExecutor':
from .process import ProcessPoolExecutor as pe from .process import ProcessPoolExecutor as pe
@ -51,4 +52,13 @@ def __getattr__(name):
ThreadPoolExecutor = te ThreadPoolExecutor = te
return te return te
if name == 'InterpreterPoolExecutor':
try:
from .interpreter import InterpreterPoolExecutor as ie
except ModuleNotFoundError:
ie = InterpreterPoolExecutor = None
else:
InterpreterPoolExecutor = ie
return ie
raise AttributeError(f"module {__name__!r} has no attribute {name!r}") raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@ -0,0 +1,241 @@
"""Implements InterpreterPoolExecutor."""
import contextlib
import pickle
import textwrap
from . import thread as _thread
import _interpreters
import _interpqueues
class ExecutionFailed(_interpreters.InterpreterError):
"""An unhandled exception happened during execution."""
def __init__(self, excinfo):
msg = excinfo.formatted
if not msg:
if excinfo.type and excinfo.msg:
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
else:
msg = excinfo.type.__name__ or excinfo.msg
super().__init__(msg)
self.excinfo = excinfo
def __str__(self):
try:
formatted = self.excinfo.errdisplay
except Exception:
return super().__str__()
else:
return textwrap.dedent(f"""
{super().__str__()}
Uncaught in the interpreter:
{formatted}
""".strip())
UNBOUND = 2 # error; this should not happen.
class WorkerContext(_thread.WorkerContext):
@classmethod
def prepare(cls, initializer, initargs, shared):
def resolve_task(fn, args, kwargs):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)
if initializer is not None:
try:
initdata = resolve_task(initializer, initargs, {})
except ValueError:
if isinstance(initializer, str) and initargs:
raise ValueError(f'an initializer script does not take args, got {initargs!r}')
raise # re-raise
else:
initdata = None
def create_context():
return cls(initdata, shared)
return create_context, resolve_task
@classmethod
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)
@classmethod
def _call_pickled(cls, pickled, resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)
def __init__(self, initdata, shared=None):
self.initdata = initdata
self.shared = dict(shared) if shared else None
self.interpid = None
self.resultsid = None
def __del__(self):
if self.interpid is not None:
self.finalize()
def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)
def initialize(self):
assert self.interpid is None, self.interpid
self.interpid = _interpreters.create(reqrefs=True)
try:
_interpreters.incref(self.interpid)
maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
self._exec(f'from {__name__} import WorkerContext')
if self.shared:
_interpreters.set___main___attrs(
self.interpid, self.shared, restrict=True)
if self.initdata:
self.run(self.initdata)
except BaseException:
self.finalize()
raise # re-raise
def finalize(self):
interpid = self.interpid
resultsid = self.resultsid
self.resultsid = None
self.interpid = None
if resultsid is not None:
try:
_interpqueues.destroy(resultsid)
except _interpqueues.QueueNotFoundError:
pass
if interpid is not None:
try:
_interpreters.decref(interpid)
except _interpreters.InterpreterNotFoundError:
pass
def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)
try:
self._exec(script)
except ExecutionFailed as exc:
exc_wrapper = exc
else:
exc_wrapper = None
# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise
except _interpqueues.QueueError:
continue
except ModuleNotFoundError:
# interpreters.queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, excdata), pickled, unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res
class BrokenInterpreterPool(_thread.BrokenThreadPool):
"""
Raised when a worker thread in an InterpreterPoolExecutor failed initializing.
"""
class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
BROKEN = BrokenInterpreterPool
@classmethod
def prepare_context(cls, initializer, initargs, shared):
return WorkerContext.prepare(initializer, initargs, shared)
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), shared=None):
"""Initializes a new InterpreterPoolExecutor instance.
Args:
max_workers: The maximum number of interpreters that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable or script used to initialize
each worker interpreter.
initargs: A tuple of arguments to pass to the initializer.
shared: A mapping of shareabled objects to be inserted into
each worker interpreter.
"""
super().__init__(max_workers, thread_name_prefix,
initializer, initargs, shared=shared)

View File

@ -43,19 +43,46 @@ if hasattr(os, 'register_at_fork'):
after_in_parent=_global_shutdown_lock.release) after_in_parent=_global_shutdown_lock.release)
class _WorkItem: class WorkerContext:
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self): @classmethod
def prepare(cls, initializer, initargs):
if initializer is not None:
if not callable(initializer):
raise TypeError("initializer must be a callable")
def create_context():
return cls(initializer, initargs)
def resolve_task(fn, args, kwargs):
return (fn, args, kwargs)
return create_context, resolve_task
def __init__(self, initializer, initargs):
self.initializer = initializer
self.initargs = initargs
def initialize(self):
if self.initializer is not None:
self.initializer(*self.initargs)
def finalize(self):
pass
def run(self, task):
fn, args, kwargs = task
return fn(*args, **kwargs)
class _WorkItem:
def __init__(self, future, task):
self.future = future
self.task = task
def run(self, ctx):
if not self.future.set_running_or_notify_cancel(): if not self.future.set_running_or_notify_cancel():
return return
try: try:
result = self.fn(*self.args, **self.kwargs) result = ctx.run(self.task)
except BaseException as exc: except BaseException as exc:
self.future.set_exception(exc) self.future.set_exception(exc)
# Break a reference cycle with the exception 'exc' # Break a reference cycle with the exception 'exc'
@ -66,16 +93,15 @@ class _WorkItem:
__class_getitem__ = classmethod(types.GenericAlias) __class_getitem__ = classmethod(types.GenericAlias)
def _worker(executor_reference, work_queue, initializer, initargs): def _worker(executor_reference, ctx, work_queue):
if initializer is not None: try:
try: ctx.initialize()
initializer(*initargs) except BaseException:
except BaseException: _base.LOGGER.critical('Exception in initializer:', exc_info=True)
_base.LOGGER.critical('Exception in initializer:', exc_info=True) executor = executor_reference()
executor = executor_reference() if executor is not None:
if executor is not None: executor._initializer_failed()
executor._initializer_failed() return
return
try: try:
while True: while True:
try: try:
@ -89,7 +115,7 @@ def _worker(executor_reference, work_queue, initializer, initargs):
work_item = work_queue.get(block=True) work_item = work_queue.get(block=True)
if work_item is not None: if work_item is not None:
work_item.run() work_item.run(ctx)
# Delete references to object. See GH-60488 # Delete references to object. See GH-60488
del work_item del work_item
continue continue
@ -110,6 +136,8 @@ def _worker(executor_reference, work_queue, initializer, initargs):
del executor del executor
except BaseException: except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True) _base.LOGGER.critical('Exception in worker', exc_info=True)
finally:
ctx.finalize()
class BrokenThreadPool(_base.BrokenExecutor): class BrokenThreadPool(_base.BrokenExecutor):
@ -120,11 +148,17 @@ class BrokenThreadPool(_base.BrokenExecutor):
class ThreadPoolExecutor(_base.Executor): class ThreadPoolExecutor(_base.Executor):
BROKEN = BrokenThreadPool
# Used to assign unique thread names when thread_name_prefix is not supplied. # Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__ _counter = itertools.count().__next__
@classmethod
def prepare_context(cls, initializer, initargs):
return WorkerContext.prepare(initializer, initargs)
def __init__(self, max_workers=None, thread_name_prefix='', def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()): initializer=None, initargs=(), **ctxkwargs):
"""Initializes a new ThreadPoolExecutor instance. """Initializes a new ThreadPoolExecutor instance.
Args: Args:
@ -133,6 +167,7 @@ class ThreadPoolExecutor(_base.Executor):
thread_name_prefix: An optional name prefix to give our threads. thread_name_prefix: An optional name prefix to give our threads.
initializer: A callable used to initialize worker threads. initializer: A callable used to initialize worker threads.
initargs: A tuple of arguments to pass to the initializer. initargs: A tuple of arguments to pass to the initializer.
ctxkwargs: Additional arguments to cls.prepare_context().
""" """
if max_workers is None: if max_workers is None:
# ThreadPoolExecutor is often used to: # ThreadPoolExecutor is often used to:
@ -146,8 +181,9 @@ class ThreadPoolExecutor(_base.Executor):
if max_workers <= 0: if max_workers <= 0:
raise ValueError("max_workers must be greater than 0") raise ValueError("max_workers must be greater than 0")
if initializer is not None and not callable(initializer): (self._create_worker_context,
raise TypeError("initializer must be a callable") self._resolve_work_item_task,
) = type(self).prepare_context(initializer, initargs, **ctxkwargs)
self._max_workers = max_workers self._max_workers = max_workers
self._work_queue = queue.SimpleQueue() self._work_queue = queue.SimpleQueue()
@ -158,13 +194,11 @@ class ThreadPoolExecutor(_base.Executor):
self._shutdown_lock = threading.Lock() self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter())) ("ThreadPoolExecutor-%d" % self._counter()))
self._initializer = initializer
self._initargs = initargs
def submit(self, fn, /, *args, **kwargs): def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock, _global_shutdown_lock: with self._shutdown_lock, _global_shutdown_lock:
if self._broken: if self._broken:
raise BrokenThreadPool(self._broken) raise self.BROKEN(self._broken)
if self._shutdown: if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown') raise RuntimeError('cannot schedule new futures after shutdown')
@ -173,7 +207,8 @@ class ThreadPoolExecutor(_base.Executor):
'interpreter shutdown') 'interpreter shutdown')
f = _base.Future() f = _base.Future()
w = _WorkItem(f, fn, args, kwargs) task = self._resolve_work_item_task(fn, args, kwargs)
w = _WorkItem(f, task)
self._work_queue.put(w) self._work_queue.put(w)
self._adjust_thread_count() self._adjust_thread_count()
@ -196,9 +231,8 @@ class ThreadPoolExecutor(_base.Executor):
num_threads) num_threads)
t = threading.Thread(name=thread_name, target=_worker, t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb), args=(weakref.ref(self, weakref_cb),
self._work_queue, self._create_worker_context(),
self._initializer, self._work_queue))
self._initargs))
t.start() t.start()
self._threads.add(t) self._threads.add(t)
_threads_queues[t] = self._work_queue _threads_queues[t] = self._work_queue
@ -214,7 +248,7 @@ class ThreadPoolExecutor(_base.Executor):
except queue.Empty: except queue.Empty:
break break
if work_item is not None: if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken)) work_item.future.set_exception(self.BROKEN(self._broken))
def shutdown(self, wait=True, *, cancel_futures=False): def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock: with self._shutdown_lock:

View File

@ -23,6 +23,7 @@ def make_dummy_object(_):
class ExecutorTest: class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by # Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest. # ExecutorShutdownTest.
def test_submit(self): def test_submit(self):
@ -52,7 +53,8 @@ class ExecutorTest:
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1)) self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__) with self.assertRaises(ZeroDivisionError):
i.__next__()
@support.requires_resource('walltime') @support.requires_resource('walltime')
def test_map_timeout(self): def test_map_timeout(self):

View File

@ -0,0 +1,346 @@
import asyncio
import contextlib
import io
import os
import pickle
import sys
import time
import unittest
from concurrent.futures.interpreter import (
ExecutionFailed, BrokenInterpreterPool,
)
import _interpreters
from test import support
import test.test_asyncio.utils as testasyncio_utils
from test.support.interpreters import queues
from .executor import ExecutorTest, mul
from .util import BaseTestCase, InterpreterPoolMixin, setup_module
def noop():
pass
def write_msg(fd, msg):
os.write(fd, msg + b'\0')
def read_msg(fd):
msg = b''
while ch := os.read(fd, 1):
if ch == b'\0':
return msg
msg += ch
def get_current_name():
return __name__
def fail(exctype, msg=None):
raise exctype(msg)
def get_current_interpid(*extra):
interpid, _ = _interpreters.get_current()
return (interpid, *extra)
class InterpretersMixin(InterpreterPoolMixin):
def pipe(self):
r, w = os.pipe()
self.addCleanup(lambda: os.close(r))
self.addCleanup(lambda: os.close(w))
return r, w
class InterpreterPoolExecutorTest(
InterpretersMixin, ExecutorTest, BaseTestCase):
@unittest.expectedFailure
def test_init_script(self):
msg1 = b'step: init'
msg2 = b'step: run'
r, w = self.pipe()
initscript = f"""
import os
msg = {msg2!r}
os.write({w}, {msg1!r} + b'\\0')
"""
script = f"""
os.write({w}, msg + b'\\0')
"""
os.write(w, b'\0')
executor = self.executor_type(initializer=initscript)
before_init = os.read(r, 100)
fut = executor.submit(script)
after_init = read_msg(r)
fut.result()
after_run = read_msg(r)
self.assertEqual(before_init, b'\0')
self.assertEqual(after_init, msg1)
self.assertEqual(after_run, msg2)
@unittest.expectedFailure
def test_init_script_args(self):
with self.assertRaises(ValueError):
self.executor_type(initializer='pass', initargs=('spam',))
def test_init_func(self):
msg = b'step: init'
r, w = self.pipe()
os.write(w, b'\0')
executor = self.executor_type(
initializer=write_msg, initargs=(w, msg))
before = os.read(r, 100)
executor.submit(mul, 10, 10)
after = read_msg(r)
self.assertEqual(before, b'\0')
self.assertEqual(after, msg)
def test_init_closure(self):
count = 0
def init1():
assert count == 0, count
def init2():
nonlocal count
count += 1
with self.assertRaises(pickle.PicklingError):
self.executor_type(initializer=init1)
with self.assertRaises(pickle.PicklingError):
self.executor_type(initializer=init2)
def test_init_instance_method(self):
class Spam:
def initializer(self):
raise NotImplementedError
spam = Spam()
with self.assertRaises(pickle.PicklingError):
self.executor_type(initializer=spam.initializer)
def test_init_shared(self):
msg = b'eggs'
r, w = self.pipe()
script = f"""if True:
import os
if __name__ != '__main__':
import __main__
spam = __main__.spam
os.write({w}, spam + b'\\0')
"""
executor = self.executor_type(shared={'spam': msg})
fut = executor.submit(exec, script)
fut.result()
after = read_msg(r)
self.assertEqual(after, msg)
@unittest.expectedFailure
def test_init_exception_in_script(self):
executor = self.executor_type(initializer='raise Exception("spam")')
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit('pass')
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)
def test_init_exception_in_func(self):
executor = self.executor_type(initializer=fail,
initargs=(Exception, 'spam'))
with executor:
with contextlib.redirect_stderr(io.StringIO()) as stderr:
fut = executor.submit(noop)
with self.assertRaises(BrokenInterpreterPool):
fut.result()
stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)
@unittest.expectedFailure
def test_submit_script(self):
msg = b'spam'
r, w = self.pipe()
script = f"""
import os
os.write({w}, __name__.encode('utf-8') + b'\\0')
"""
executor = self.executor_type()
fut = executor.submit(script)
res = fut.result()
after = read_msg(r)
self.assertEqual(after, b'__main__')
self.assertIs(res, None)
def test_submit_closure(self):
spam = True
def task1():
return spam
def task2():
nonlocal spam
spam += 1
return spam
executor = self.executor_type()
with self.assertRaises(pickle.PicklingError):
executor.submit(task1)
with self.assertRaises(pickle.PicklingError):
executor.submit(task2)
def test_submit_local_instance(self):
class Spam:
def __init__(self):
self.value = True
executor = self.executor_type()
with self.assertRaises(pickle.PicklingError):
executor.submit(Spam)
def test_submit_instance_method(self):
class Spam:
def run(self):
return True
spam = Spam()
executor = self.executor_type()
with self.assertRaises(pickle.PicklingError):
executor.submit(spam.run)
def test_submit_func_globals(self):
executor = self.executor_type()
fut = executor.submit(get_current_name)
name = fut.result()
self.assertEqual(name, __name__)
self.assertNotEqual(name, '__main__')
@unittest.expectedFailure
def test_submit_exception_in_script(self):
fut = self.executor.submit('raise Exception("spam")')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')
def test_submit_exception_in_func(self):
fut = self.executor.submit(fail, Exception, 'spam')
with self.assertRaises(Exception) as captured:
fut.result()
self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr))
self.assertEqual(cause.excinfo.msg, 'spam')
def test_saturation(self):
blocker = queues.create()
executor = self.executor_type(4, shared=dict(blocker=blocker))
for i in range(15 * executor._max_workers):
executor.submit(exec, 'import __main__; __main__.blocker.get()')
#executor.submit('blocker.get()')
self.assertEqual(len(executor._threads), executor._max_workers)
for i in range(15 * executor._max_workers):
blocker.put_nowait(None)
executor.shutdown(wait=True)
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
def test_idle_thread_reuse(self):
executor = self.executor_type()
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._threads), 1)
executor.shutdown(wait=True)
class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
self.executor = self.executor_type()
self.addCleanup(lambda: self.executor.shutdown())
def tearDown(self):
if not self.loop.is_closed():
testasyncio_utils.run_briefly(self.loop)
self.doCleanups()
support.gc_collect()
super().tearDown()
def test_run_in_executor(self):
unexpected, _ = _interpreters.get_current()
func = get_current_interpid
fut = self.loop.run_in_executor(self.executor, func, 'yo')
interpid, res = self.loop.run_until_complete(fut)
self.assertEqual(res, 'yo')
self.assertNotEqual(interpid, unexpected)
def test_run_in_executor_cancel(self):
executor = self.executor_type()
called = False
def patched_call_soon(*args):
nonlocal called
called = True
func = time.sleep
fut = self.loop.run_in_executor(self.executor, func, 0.05)
fut.cancel()
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
self.loop.call_soon = patched_call_soon
self.loop.call_soon_threadsafe = patched_call_soon
time.sleep(0.4)
self.assertFalse(called)
def test_default_executor(self):
unexpected, _ = _interpreters.get_current()
self.loop.set_default_executor(self.executor)
fut = self.loop.run_in_executor(None, get_current_interpid)
interpid, = self.loop.run_until_complete(fut)
self.assertNotEqual(interpid, unexpected)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()

View File

@ -74,6 +74,10 @@ class ThreadPoolMixin(ExecutorMixin):
executor_type = futures.ThreadPoolExecutor executor_type = futures.ThreadPoolExecutor
class InterpreterPoolMixin(ExecutorMixin):
executor_type = futures.InterpreterPoolExecutor
class ProcessPoolForkMixin(ExecutorMixin): class ProcessPoolForkMixin(ExecutorMixin):
executor_type = futures.ProcessPoolExecutor executor_type = futures.ProcessPoolExecutor
ctx = "fork" ctx = "fork"
@ -120,6 +124,7 @@ class ProcessPoolForkserverMixin(ExecutorMixin):
def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,), def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
executor_mixins=(ThreadPoolMixin, executor_mixins=(ThreadPoolMixin,
InterpreterPoolMixin,
ProcessPoolForkMixin, ProcessPoolForkMixin,
ProcessPoolForkserverMixin, ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin)): ProcessPoolSpawnMixin)):

View File

@ -0,0 +1,6 @@
We've added :class:`concurrent.futures.InterpreterPoolExecutor`, which
allows you to run code in multiple isolated interpreters. This allows you
to circumvent the limitations of CPU-bound threads (due to the GIL). Patch
by Eric Snow.
This addition is unrelated to :pep:`734`.