Restore dummy_threading and _dummy_thread, but deprecate them (bpo-31370) (#3648)

This commit is contained in:
Antoine Pitrou 2017-09-18 22:04:20 +02:00 committed by GitHub
parent a8e7d903d7
commit b43c4caf81
9 changed files with 575 additions and 31 deletions

View File

@ -6,18 +6,15 @@
**Source code:** :source:`Lib/_dummy_thread.py`
.. deprecated:: 3.7
Python now always has threading enabled. Please use :mod:`_thread`
(or, better, :mod:`threading`) instead.
--------------
This module provides a duplicate interface to the :mod:`_thread` module. It is
meant to be imported when the :mod:`_thread` module is not provided on a
platform.
Suggested usage is::
try:
import _thread
except ImportError:
import _dummy_thread as _thread
This module provides a duplicate interface to the :mod:`_thread` module.
It was meant to be imported when the :mod:`_thread` module was not provided
on a platform.
Be careful to not use this module where deadlock might occur from a thread being
created that blocks waiting for another thread to be created. This often occurs

View File

@ -23,14 +23,10 @@ threading API built on top of this module.
single: pthreads
pair: threads; POSIX
The module is optional. It is supported on Windows, Linux, SGI IRIX, Solaris
2.x, as well as on systems that have a POSIX thread (a.k.a. "pthread")
implementation. For systems lacking the :mod:`_thread` module, the
:mod:`_dummy_thread` module is available. It duplicates this module's interface
and can be used as a drop-in replacement.
It defines the following constants and functions:
.. versionchanged:: 3.7
This module used to be optional, it is now always available.
This module defines the following constants and functions:
.. exception:: error

View File

@ -26,6 +26,6 @@ The following are support modules for some of the above services:
.. toctree::
dummy_threading.rst
_thread.rst
_dummy_thread.rst
dummy_threading.rst

View File

@ -6,20 +6,15 @@
**Source code:** :source:`Lib/dummy_threading.py`
.. deprecated:: 3.7
Python now always has threading enabled. Please use :mod:`threading` instead.
--------------
This module provides a duplicate interface to the :mod:`threading` module. It
is meant to be imported when the :mod:`_thread` module is not provided on a
platform.
Suggested usage is::
try:
import threading
except ImportError:
import dummy_threading as threading
This module provides a duplicate interface to the :mod:`threading` module.
It was meant to be imported when the :mod:`_thread` module was not provided
on a platform.
Be careful to not use this module where deadlock might occur from a thread being
created that blocks waiting for another thread to be created. This often occurs
with blocking I/O.

View File

@ -11,8 +11,8 @@
This module constructs higher-level threading interfaces on top of the lower
level :mod:`_thread` module. See also the :mod:`queue` module.
The :mod:`dummy_threading` module is provided for situations where
:mod:`threading` cannot be used because :mod:`_thread` is missing.
.. versionchanged:: 3.7
This module used to be optional, it is now always available.
.. note::

163
Lib/_dummy_thread.py Normal file
View File

@ -0,0 +1,163 @@
"""Drop-in replacement for the thread module.
Meant to be used as a brain-dead substitute so that threaded code does
not need to be rewritten for when the thread module is not present.
Suggested usage is::
try:
import _thread
except ImportError:
import _dummy_thread as _thread
"""
# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
'interrupt_main', 'LockType']
# A dummy value
TIMEOUT_MAX = 2**31
# NOTE: this module can be imported early in the extension building process,
# and so top level imports of other modules should be avoided. Instead, all
# imports are done when needed on a function-by-function basis. Since threads
# are disabled, the import lock should not be an issue anyway (??).
error = RuntimeError
def start_new_thread(function, args, kwargs={}):
"""Dummy implementation of _thread.start_new_thread().
Compatibility is maintained by making sure that ``args`` is a
tuple and ``kwargs`` is a dictionary. If an exception is raised
and it is SystemExit (which can be done by _thread.exit()) it is
caught and nothing is done; all other exceptions are printed out
by using traceback.print_exc().
If the executed function calls interrupt_main the KeyboardInterrupt will be
raised when the function returns.
"""
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def exit():
"""Dummy implementation of _thread.exit()."""
raise SystemExit
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return 1
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def stack_size(size=None):
"""Dummy implementation of _thread.stack_size()."""
if size is not None:
raise error("setting thread stack size not supported")
return 0
def _set_sentinel():
"""Dummy implementation of _thread._set_sentinel()."""
return LockType()
class LockType(object):
"""Class implementing dummy implementation of _thread.LockType.
Compatibility is maintained by maintaining self.locked_status
which is a boolean that stores the state of the lock. Pickling of
the lock, though, should not be done since if the _thread module is
then used with an unpickled ``lock()`` from here problems could
occur from this class not having atomic methods.
"""
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
True and returned appropriately based on value of
``waitflag``. If it is non-blocking, then the value is
actually checked and not set if it is already acquired. This
is all done so that threading.Condition's assert statements
aren't triggered and throw a little fit.
"""
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
"""Release the dummy lock."""
# XXX Perhaps shouldn't actually bother to test? Could lead
# to problems for complex, threaded code.
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
def __repr__(self):
return "<%s %s.%s object at %s>" % (
"locked" if self.locked_status else "unlocked",
self.__class__.__module__,
self.__class__.__qualname__,
hex(id(self))
)
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
_main = True
def interrupt_main():
"""Set _interrupt flag to True to have start_new_thread raise
KeyboardInterrupt upon exiting."""
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True

78
Lib/dummy_threading.py Normal file
View File

@ -0,0 +1,78 @@
"""Faux ``threading`` version using ``dummy_thread`` instead of ``thread``.
The module ``_dummy_threading`` is added to ``sys.modules`` in order
to not have ``threading`` considered imported. Had ``threading`` been
directly imported it would have made all subsequent imports succeed
regardless of whether ``_thread`` was available which is not desired.
"""
from sys import modules as sys_modules
import _dummy_thread
# Declaring now so as to not have to nest ``try``s to get proper clean-up.
holding_thread = False
holding_threading = False
holding__threading_local = False
try:
# Could have checked if ``_thread`` was not in sys.modules and gone
# a different route, but decided to mirror technique used with
# ``threading`` below.
if '_thread' in sys_modules:
held_thread = sys_modules['_thread']
holding_thread = True
# Must have some module named ``_thread`` that implements its API
# in order to initially import ``threading``.
sys_modules['_thread'] = sys_modules['_dummy_thread']
if 'threading' in sys_modules:
# If ``threading`` is already imported, might as well prevent
# trying to import it more than needed by saving it if it is
# already imported before deleting it.
held_threading = sys_modules['threading']
holding_threading = True
del sys_modules['threading']
if '_threading_local' in sys_modules:
# If ``_threading_local`` is already imported, might as well prevent
# trying to import it more than needed by saving it if it is
# already imported before deleting it.
held__threading_local = sys_modules['_threading_local']
holding__threading_local = True
del sys_modules['_threading_local']
import threading
# Need a copy of the code kept somewhere...
sys_modules['_dummy_threading'] = sys_modules['threading']
del sys_modules['threading']
sys_modules['_dummy__threading_local'] = sys_modules['_threading_local']
del sys_modules['_threading_local']
from _dummy_threading import *
from _dummy_threading import __all__
finally:
# Put back ``threading`` if we overwrote earlier
if holding_threading:
sys_modules['threading'] = held_threading
del held_threading
del holding_threading
# Put back ``_threading_local`` if we overwrote earlier
if holding__threading_local:
sys_modules['_threading_local'] = held__threading_local
del held__threading_local
del holding__threading_local
# Put back ``thread`` if we overwrote, else del the entry we made
if holding_thread:
sys_modules['_thread'] = held_thread
del held_thread
else:
del sys_modules['_thread']
del holding_thread
del _dummy_thread
del sys_modules

View File

@ -0,0 +1,255 @@
import _dummy_thread as _thread
import time
import queue
import random
import unittest
from test import support
from unittest import mock
DELAY = 0
class LockTests(unittest.TestCase):
"""Test lock objects."""
def setUp(self):
# Create a lock
self.lock = _thread.allocate_lock()
def test_initlock(self):
#Make sure locks start locked
self.assertFalse(self.lock.locked(),
"Lock object is not initialized unlocked.")
def test_release(self):
# Test self.lock.release()
self.lock.acquire()
self.lock.release()
self.assertFalse(self.lock.locked(),
"Lock object did not release properly.")
def test_LockType_context_manager(self):
with _thread.LockType():
pass
self.assertFalse(self.lock.locked(),
"Acquired Lock was not released")
def test_improper_release(self):
#Make sure release of an unlocked thread raises RuntimeError
self.assertRaises(RuntimeError, self.lock.release)
def test_cond_acquire_success(self):
#Make sure the conditional acquiring of the lock works.
self.assertTrue(self.lock.acquire(0),
"Conditional acquiring of the lock failed.")
def test_cond_acquire_fail(self):
#Test acquiring locked lock returns False
self.lock.acquire(0)
self.assertFalse(self.lock.acquire(0),
"Conditional acquiring of a locked lock incorrectly "
"succeeded.")
def test_uncond_acquire_success(self):
#Make sure unconditional acquiring of a lock works.
self.lock.acquire()
self.assertTrue(self.lock.locked(),
"Uncondional locking failed.")
def test_uncond_acquire_return_val(self):
#Make sure that an unconditional locking returns True.
self.assertIs(self.lock.acquire(1), True,
"Unconditional locking did not return True.")
self.assertIs(self.lock.acquire(), True)
def test_uncond_acquire_blocking(self):
#Make sure that unconditional acquiring of a locked lock blocks.
def delay_unlock(to_unlock, delay):
"""Hold on to lock for a set amount of time before unlocking."""
time.sleep(delay)
to_unlock.release()
self.lock.acquire()
start_time = int(time.time())
_thread.start_new_thread(delay_unlock,(self.lock, DELAY))
if support.verbose:
print()
print("*** Waiting for thread to release the lock "\
"(approx. %s sec.) ***" % DELAY)
self.lock.acquire()
end_time = int(time.time())
if support.verbose:
print("done")
self.assertGreaterEqual(end_time - start_time, DELAY,
"Blocking by unconditional acquiring failed.")
@mock.patch('time.sleep')
def test_acquire_timeout(self, mock_sleep):
"""Test invoking acquire() with a positive timeout when the lock is
already acquired. Ensure that time.sleep() is invoked with the given
timeout and that False is returned."""
self.lock.acquire()
retval = self.lock.acquire(waitflag=0, timeout=1)
self.assertTrue(mock_sleep.called)
mock_sleep.assert_called_once_with(1)
self.assertEqual(retval, False)
def test_lock_representation(self):
self.lock.acquire()
self.assertIn("locked", repr(self.lock))
self.lock.release()
self.assertIn("unlocked", repr(self.lock))
class MiscTests(unittest.TestCase):
"""Miscellaneous tests."""
def test_exit(self):
self.assertRaises(SystemExit, _thread.exit)
def test_ident(self):
self.assertIsInstance(_thread.get_ident(), int,
"_thread.get_ident() returned a non-integer")
self.assertGreater(_thread.get_ident(), 0)
def test_LockType(self):
self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
"_thread.LockType is not an instance of what "
"is returned by _thread.allocate_lock()")
def test_set_sentinel(self):
self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
"_thread._set_sentinel() did not return a "
"LockType instance.")
def test_interrupt_main(self):
#Calling start_new_thread with a function that executes interrupt_main
# should raise KeyboardInterrupt upon completion.
def call_interrupt():
_thread.interrupt_main()
self.assertRaises(KeyboardInterrupt,
_thread.start_new_thread,
call_interrupt,
tuple())
def test_interrupt_in_main(self):
self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
def test_stack_size_None(self):
retval = _thread.stack_size(None)
self.assertEqual(retval, 0)
def test_stack_size_not_None(self):
with self.assertRaises(_thread.error) as cm:
_thread.stack_size("")
self.assertEqual(cm.exception.args[0],
"setting thread stack size not supported")
class ThreadTests(unittest.TestCase):
"""Test thread creation."""
def test_arg_passing(self):
#Make sure that parameter passing works.
def arg_tester(queue, arg1=False, arg2=False):
"""Use to test _thread.start_new_thread() passes args properly."""
queue.put((arg1, arg2))
testing_queue = queue.Queue(1)
_thread.start_new_thread(arg_tester, (testing_queue, True, True))
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using tuple failed")
_thread.start_new_thread(
arg_tester,
tuple(),
{'queue':testing_queue, 'arg1':True, 'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation "
"using kwargs failed")
_thread.start_new_thread(
arg_tester,
(testing_queue, True),
{'arg2':True})
result = testing_queue.get()
self.assertTrue(result[0] and result[1],
"Argument passing for thread creation using both tuple"
" and kwargs failed")
def test_multi_thread_creation(self):
def queue_mark(queue, delay):
time.sleep(delay)
queue.put(_thread.get_ident())
thread_count = 5
testing_queue = queue.Queue(thread_count)
if support.verbose:
print()
print("*** Testing multiple thread creation "
"(will take approx. %s to %s sec.) ***" % (
DELAY, thread_count))
for count in range(thread_count):
if DELAY:
local_delay = round(random.random(), 1)
else:
local_delay = 0
_thread.start_new_thread(queue_mark,
(testing_queue, local_delay))
time.sleep(DELAY)
if support.verbose:
print('done')
self.assertEqual(testing_queue.qsize(), thread_count,
"Not all %s threads executed properly "
"after %s sec." % (thread_count, DELAY))
def test_args_not_tuple(self):
"""
Test invoking start_new_thread() with a non-tuple value for "args".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), [])
self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
def test_kwargs_not_dict(self):
"""
Test invoking start_new_thread() with a non-dict value for "kwargs".
Expect TypeError with a meaningful error message to be raised.
"""
with self.assertRaises(TypeError) as cm:
_thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
def test_SystemExit(self):
"""
Test invoking start_new_thread() with a function that raises
SystemExit.
The exception should be discarded.
"""
func = mock.Mock(side_effect=SystemExit())
try:
_thread.start_new_thread(func, tuple())
except SystemExit:
self.fail("start_new_thread raised SystemExit.")
@mock.patch('traceback.print_exc')
def test_RaiseException(self, mock_print_exc):
"""
Test invoking start_new_thread() with a function that raises exception.
The exception should be discarded and the traceback should be printed
via traceback.print_exc()
"""
func = mock.Mock(side_effect=Exception)
_thread.start_new_thread(func, tuple())
self.assertTrue(mock_print_exc.called)

View File

@ -0,0 +1,60 @@
from test import support
import unittest
import dummy_threading as _threading
import time
class DummyThreadingTestCase(unittest.TestCase):
class TestThread(_threading.Thread):
def run(self):
global running
global sema
global mutex
# Uncomment if testing another module, such as the real 'threading'
# module.
#delay = random.random() * 2
delay = 0
if support.verbose:
print('task', self.name, 'will run for', delay, 'sec')
sema.acquire()
mutex.acquire()
running += 1
if support.verbose:
print(running, 'tasks are running')
mutex.release()
time.sleep(delay)
if support.verbose:
print('task', self.name, 'done')
mutex.acquire()
running -= 1
if support.verbose:
print(self.name, 'is finished.', running, 'tasks are running')
mutex.release()
sema.release()
def setUp(self):
self.numtasks = 10
global sema
sema = _threading.BoundedSemaphore(value=3)
global mutex
mutex = _threading.RLock()
global running
running = 0
self.threads = []
def test_tasks(self):
for i in range(self.numtasks):
t = self.TestThread(name="<thread %d>"%i)
self.threads.append(t)
t.start()
if support.verbose:
print('waiting for all tasks to complete')
for t in self.threads:
t.join()
if support.verbose:
print('all tasks done')
if __name__ == '__main__':
unittest.main()