Implemented thread-local data as proposed on python-dev:
http://mail.python.org/pipermail/python-dev/2004-June/045785.html
This commit is contained in:
parent
e827437f45
commit
d15dc06df0
|
@ -49,6 +49,25 @@ reset to false with the \method{clear()} method. The \method{wait()}
|
|||
method blocks until the flag is true.
|
||||
\end{funcdesc}
|
||||
|
||||
\begin{classdesc*}{local}{}
|
||||
A class that represents thread-local data. Thread-local data are data
|
||||
who's values are thread specific. To manage thread-local data, just
|
||||
create an instance of \class{local} (or a subclass) and store
|
||||
attributes on it:
|
||||
|
||||
\begin{verbatim}
|
||||
>>> mydata = threading.local()
|
||||
>>> mydata.x = 1
|
||||
\end{verbatim}
|
||||
|
||||
The instance's values will be different for separate threads.
|
||||
|
||||
For more details and extensive examples, see the documentation string
|
||||
of the _threading_local module.
|
||||
|
||||
\versionadded{2.4}
|
||||
\end{classdesc*}
|
||||
|
||||
\begin{funcdesc}{Lock}{}
|
||||
A factory function that returns a new primitive lock object. Once
|
||||
a thread has acquired it, subsequent attempts to acquire it block,
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
"""Thread-local objects
|
||||
|
||||
(Note that this module provides a Python version of thread
|
||||
threading.local class. Deoending on the version of Python you're
|
||||
using, there may be a faster one available. You should always import
|
||||
the local class from threading.)
|
||||
|
||||
Thread-local objects support the management of thread-local data.
|
||||
If you have data that you want to be local to a thread, simply create
|
||||
a thread-local object and use it's attributes:
|
||||
|
||||
>>> mydata = local()
|
||||
>>> mydata.number = 42
|
||||
>>> mydata.number
|
||||
42
|
||||
|
||||
You can also access the local-object's dictionary:
|
||||
|
||||
>>> mydata.__dict__
|
||||
{'number': 42}
|
||||
>>> mydata.__dict__.setdefault('widgets', [])
|
||||
[]
|
||||
>>> mydata.widgets
|
||||
[]
|
||||
|
||||
What's important about thread-local objects is that their data are
|
||||
local to a thread. If we access the data in a different thread:
|
||||
|
||||
>>> log = []
|
||||
>>> def f():
|
||||
... items = mydata.__dict__.items()
|
||||
... items.sort()
|
||||
... log.append(items)
|
||||
... mydata.number = 11
|
||||
... log.append(mydata.number)
|
||||
|
||||
>>> import threading
|
||||
>>> thread = threading.Thread(target=f)
|
||||
>>> thread.start()
|
||||
>>> thread.join()
|
||||
>>> log
|
||||
[[], 11]
|
||||
|
||||
we get different data. Furthermore, changes made in the other thread
|
||||
don't affect data seen in this thread:
|
||||
|
||||
>>> mydata.number
|
||||
42
|
||||
|
||||
Of course, values you get from a local object, including a __dict__
|
||||
attribute, are for whatever thread was current at the time the
|
||||
attribute was read. For that reason, you generally don't want to save
|
||||
these values across threads, as they apply only to the thread they
|
||||
came from.
|
||||
|
||||
You can create custom local objects by subclassing the local class:
|
||||
|
||||
>>> class MyLocal(local):
|
||||
... number = 2
|
||||
... initialized = False
|
||||
... def __init__(self, **kw):
|
||||
... if self.initialized:
|
||||
... raise SystemError('__init__ called too many times')
|
||||
... self.initialized = True
|
||||
... self.__dict__.update(kw)
|
||||
... def squared(self):
|
||||
... return self.number ** 2
|
||||
|
||||
This can be useful to support default values, methods and
|
||||
initialization. Note that if you define an __init__ method, it will be
|
||||
called each time the local object is used in a separate thread. This
|
||||
is necessary to initialize each thread's dictionary.
|
||||
|
||||
Now if we create a local object:
|
||||
|
||||
>>> mydata = MyLocal(color='red')
|
||||
|
||||
Now we have a default number:
|
||||
|
||||
>>> mydata.number
|
||||
2
|
||||
|
||||
an initial color:
|
||||
|
||||
>>> mydata.color
|
||||
'red'
|
||||
>>> del mydata.color
|
||||
|
||||
And a method that operates on the data:
|
||||
|
||||
>>> mydata.squared()
|
||||
4
|
||||
|
||||
As before, we can access the data in a separate thread:
|
||||
|
||||
>>> log = []
|
||||
>>> thread = threading.Thread(target=f)
|
||||
>>> thread.start()
|
||||
>>> thread.join()
|
||||
>>> log
|
||||
[[('color', 'red'), ('initialized', True)], 11]
|
||||
|
||||
without effecting this threads data:
|
||||
|
||||
>>> mydata.number
|
||||
2
|
||||
>>> mydata.color
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
AttributeError: 'MyLocal' object has no attribute 'color'
|
||||
|
||||
Note that subclasses can define slots, but they are not thread
|
||||
local. They are shared across threads:
|
||||
|
||||
>>> class MyLocal(local):
|
||||
... __slots__ = 'number'
|
||||
|
||||
>>> mydata = MyLocal()
|
||||
>>> mydata.number = 42
|
||||
>>> mydata.color = 'red'
|
||||
|
||||
So, the separate thread:
|
||||
|
||||
>>> thread = threading.Thread(target=f)
|
||||
>>> thread.start()
|
||||
>>> thread.join()
|
||||
|
||||
affects what we see:
|
||||
|
||||
>>> mydata.number
|
||||
11
|
||||
|
||||
>>> del mydata
|
||||
"""
|
||||
|
||||
# Threading import is at end
|
||||
|
||||
class _localbase(object):
|
||||
__slots__ = '_local__key', '_local__args', '_local__lock'
|
||||
|
||||
def __new__(cls, *args, **kw):
|
||||
self = object.__new__(cls)
|
||||
key = '_local__key', 'thread.local.' + str(id(self))
|
||||
object.__setattr__(self, '_local__key', key)
|
||||
object.__setattr__(self, '_local__args', (args, kw))
|
||||
object.__setattr__(self, '_local__lock', RLock())
|
||||
|
||||
if args or kw and (cls.__init__ is object.__init__):
|
||||
raise TypeError("Initialization arguments are not supported")
|
||||
|
||||
# We need to create the thread dict in anticipation of
|
||||
# __init__ being called, to make sire we don't cal it
|
||||
# again ourselves.
|
||||
dict = object.__getattribute__(self, '__dict__')
|
||||
currentThread().__dict__[key] = dict
|
||||
|
||||
return self
|
||||
|
||||
def _patch(self):
|
||||
key = object.__getattribute__(self, '_local__key')
|
||||
d = currentThread().__dict__.get(key)
|
||||
if d is None:
|
||||
d = {}
|
||||
currentThread().__dict__[key] = d
|
||||
object.__setattr__(self, '__dict__', d)
|
||||
|
||||
# we have a new instance dict, so call out __init__ if we have
|
||||
# one
|
||||
cls = type(self)
|
||||
if cls.__init__ is not object.__init__:
|
||||
args, kw = object.__getattribute__(self, '_local__args')
|
||||
cls.__init__(self, *args, **kw)
|
||||
else:
|
||||
object.__setattr__(self, '__dict__', d)
|
||||
|
||||
class local(_localbase):
|
||||
|
||||
def __getattribute__(self, name):
|
||||
lock = object.__getattribute__(self, '_local__lock')
|
||||
lock.acquire()
|
||||
try:
|
||||
_patch(self)
|
||||
return object.__getattribute__(self, name)
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
lock = object.__getattribute__(self, '_local__lock')
|
||||
lock.acquire()
|
||||
try:
|
||||
_patch(self)
|
||||
return object.__setattr__(self, name, value)
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
def __delattr__(self, name):
|
||||
lock = object.__getattribute__(self, '_local__lock')
|
||||
lock.acquire()
|
||||
try:
|
||||
_patch(self)
|
||||
return object.__delattr__(self, name)
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
|
||||
def __del__():
|
||||
threading_enumerate = enumerate
|
||||
__getattribute__ = object.__getattribute__
|
||||
|
||||
def __del__(self):
|
||||
key = __getattribute__(self, '_local__key')
|
||||
|
||||
try:
|
||||
threads = list(threading_enumerate())
|
||||
except:
|
||||
# if enumerate fails, as it seems to do during
|
||||
# shutdown, we'll skip cleanup under the assumption
|
||||
# that there is nothing to clean up
|
||||
return
|
||||
|
||||
for thread in threads:
|
||||
try:
|
||||
__dict__ = thread.__dict__
|
||||
except AttributeError:
|
||||
# Thread is dying, rest in peace
|
||||
continue
|
||||
|
||||
if key in __dict__:
|
||||
try:
|
||||
del __dict__[key]
|
||||
except KeyError:
|
||||
pass # didn't have nything in this thread
|
||||
|
||||
return __del__
|
||||
__del__ = __del__()
|
||||
|
||||
from threading import currentThread, enumerate, RLock
|
|
@ -18,6 +18,7 @@ import dummy_thread
|
|||
# Declaring now so as to not have to nest ``try``s to get proper clean-up.
|
||||
holding_thread = False
|
||||
holding_threading = False
|
||||
holding__threading_local = False
|
||||
|
||||
try:
|
||||
# Could have checked if ``thread`` was not in sys.modules and gone
|
||||
|
@ -37,20 +38,39 @@ try:
|
|||
held_threading = sys_modules['threading']
|
||||
holding_threading = True
|
||||
del sys_modules['threading']
|
||||
|
||||
if '_threading_local' in sys_modules:
|
||||
# If ``_threading_local`` is already imported, might as well prevent
|
||||
# trying to import it more than needed by saving it if it is
|
||||
# already imported before deleting it.
|
||||
held__threading_local = sys_modules['_threading_local']
|
||||
holding__threading_local = True
|
||||
del sys_modules['_threading_local']
|
||||
|
||||
import threading
|
||||
# Need a copy of the code kept somewhere...
|
||||
sys_modules['_dummy_threading'] = sys_modules['threading']
|
||||
del sys_modules['threading']
|
||||
sys_modules['_dummy__threading_local'] = sys_modules['_threading_local']
|
||||
del sys_modules['_threading_local']
|
||||
from _dummy_threading import *
|
||||
from _dummy_threading import __all__
|
||||
|
||||
finally:
|
||||
# Put back ``threading`` if we overwrote earlier
|
||||
|
||||
if holding_threading:
|
||||
sys_modules['threading'] = held_threading
|
||||
del held_threading
|
||||
del holding_threading
|
||||
|
||||
# Put back ``_threading_local`` if we overwrote earlier
|
||||
|
||||
if holding__threading_local:
|
||||
sys_modules['_threading_local'] = held__threading_local
|
||||
del held__threading_local
|
||||
del holding__threading_local
|
||||
|
||||
# Put back ``thread`` if we overwrote, else del the entry we made
|
||||
if holding_thread:
|
||||
sys_modules['thread'] = held_thread
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
import unittest
|
||||
from doctest import DocTestSuite
|
||||
from test import test_support
|
||||
|
||||
def test_main():
|
||||
suite = DocTestSuite('_threading_local')
|
||||
|
||||
try:
|
||||
from thread import _local
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
import _threading_local
|
||||
local_orig = _threading_local.local
|
||||
def setUp():
|
||||
_threading_local.local = _local
|
||||
def tearDown():
|
||||
_threading_local.local = local_orig
|
||||
suite.addTest(DocTestSuite('_threading_local',
|
||||
setUp=setUp, tearDown=tearDown)
|
||||
)
|
||||
|
||||
test_support.run_suite(suite)
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_main()
|
|
@ -15,7 +15,7 @@ from collections import deque
|
|||
# Rename some stuff so "from threading import *" is safe
|
||||
__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
|
||||
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
|
||||
'Timer', 'setprofile', 'settrace']
|
||||
'Timer', 'setprofile', 'settrace', 'local']
|
||||
|
||||
_start_new_thread = thread.start_new_thread
|
||||
_allocate_lock = thread.allocate_lock
|
||||
|
@ -661,6 +661,14 @@ def enumerate():
|
|||
|
||||
_MainThread()
|
||||
|
||||
# get thread-local implementation, either from the thread
|
||||
# module, or from the python fallback
|
||||
|
||||
try:
|
||||
from thread import _local as local
|
||||
except ImportError:
|
||||
from _threading_local import local
|
||||
|
||||
|
||||
# Self-test code
|
||||
|
||||
|
|
|
@ -158,6 +158,259 @@ static PyTypeObject Locktype = {
|
|||
0, /*tp_repr*/
|
||||
};
|
||||
|
||||
/* Thread-local objects */
|
||||
|
||||
#include "structmember.h"
|
||||
|
||||
typedef struct {
|
||||
PyObject_HEAD
|
||||
PyObject *key;
|
||||
PyObject *args;
|
||||
PyObject *kw;
|
||||
PyObject *dict;
|
||||
} localobject;
|
||||
|
||||
static PyTypeObject localtype;
|
||||
|
||||
static PyObject *
|
||||
local_new(PyTypeObject *type, PyObject *args, PyObject *kw)
|
||||
{
|
||||
localobject *self;
|
||||
PyObject *tdict;
|
||||
|
||||
if (type->tp_init == PyBaseObject_Type.tp_init
|
||||
&& ((args && PyObject_IsTrue(args))
|
||||
||
|
||||
(kw && PyObject_IsTrue(kw))
|
||||
)
|
||||
) {
|
||||
PyErr_SetString(PyExc_TypeError,
|
||||
"Initialization arguments are not supported");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
self = (localobject *)type->tp_alloc(type, 0);
|
||||
if (self == NULL)
|
||||
return NULL;
|
||||
|
||||
Py_XINCREF(args);
|
||||
self->args = args;
|
||||
Py_XINCREF(kw);
|
||||
self->kw = kw;
|
||||
self->dict = NULL; /* making sure */
|
||||
self->key = PyString_FromFormat("thread.local.%p", self);
|
||||
if (self->key == NULL)
|
||||
goto err;
|
||||
|
||||
self->dict = PyDict_New();
|
||||
if (self->dict == NULL)
|
||||
goto err;
|
||||
|
||||
tdict = PyThreadState_GetDict();
|
||||
if (tdict == NULL) {
|
||||
PyErr_SetString(PyExc_SystemError,
|
||||
"Couldn't get thread-state dictionary");
|
||||
goto err;
|
||||
}
|
||||
|
||||
if (PyDict_SetItem(tdict, self->key, self->dict) < 0)
|
||||
goto err;
|
||||
|
||||
return (PyObject *)self;
|
||||
|
||||
err:
|
||||
Py_DECREF(self);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int
|
||||
local_traverse(localobject *self, visitproc visit, void *arg)
|
||||
{
|
||||
Py_VISIT(self->args);
|
||||
Py_VISIT(self->kw);
|
||||
Py_VISIT(self->dict);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
local_clear(localobject *self)
|
||||
{
|
||||
Py_CLEAR(self->key);
|
||||
Py_CLEAR(self->args);
|
||||
Py_CLEAR(self->kw);
|
||||
Py_CLEAR(self->dict);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
local_dealloc(localobject *self)
|
||||
{
|
||||
PyThreadState *tstate;
|
||||
if (self->key
|
||||
&& (tstate = PyThreadState_Get())
|
||||
&& tstate->interp) {
|
||||
for(tstate = PyInterpreterState_ThreadHead(tstate->interp);
|
||||
tstate;
|
||||
tstate = PyThreadState_Next(tstate)
|
||||
)
|
||||
if (tstate->dict &&
|
||||
PyDict_GetItem(tstate->dict, self->key))
|
||||
PyDict_DelItem(tstate->dict, self->key);
|
||||
}
|
||||
|
||||
local_clear(self);
|
||||
self->ob_type->tp_free((PyObject*)self);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
_ldict(localobject *self)
|
||||
{
|
||||
PyObject *tdict, *ldict;
|
||||
|
||||
tdict = PyThreadState_GetDict();
|
||||
if (tdict == NULL) {
|
||||
PyErr_SetString(PyExc_SystemError,
|
||||
"Couldn't get thread-state dictionary");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ldict = PyDict_GetItem(tdict, self->key);
|
||||
if (ldict == NULL) {
|
||||
ldict = PyDict_New(); /* we own ldict */
|
||||
|
||||
if (ldict == NULL)
|
||||
return NULL;
|
||||
else {
|
||||
int i = PyDict_SetItem(tdict, self->key, ldict);
|
||||
Py_DECREF(ldict); /* now ldict is borowed */
|
||||
if (i < 0)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Py_CLEAR(self->dict);
|
||||
Py_INCREF(ldict);
|
||||
self->dict = ldict; /* still borrowed */
|
||||
|
||||
if (self->ob_type->tp_init != PyBaseObject_Type.tp_init &&
|
||||
self->ob_type->tp_init((PyObject*)self,
|
||||
self->args, self->kw) < 0
|
||||
) {
|
||||
/* we need to get rid of ldict from thread so
|
||||
we create a new one the next time we do an attr
|
||||
acces */
|
||||
PyDict_DelItem(tdict, self->key);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
}
|
||||
else if (self->dict != ldict) {
|
||||
Py_CLEAR(self->dict);
|
||||
Py_INCREF(ldict);
|
||||
self->dict = ldict;
|
||||
}
|
||||
|
||||
return ldict;
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
local_getattro(localobject *self, PyObject *name)
|
||||
{
|
||||
PyObject *ldict, *value;
|
||||
|
||||
ldict = _ldict(self);
|
||||
if (ldict == NULL)
|
||||
return NULL;
|
||||
|
||||
if (self->ob_type != &localtype)
|
||||
/* use generic lookup for subtypes */
|
||||
return PyObject_GenericGetAttr((PyObject *)self, name);
|
||||
|
||||
/* Optimization: just look in dict ourselves */
|
||||
value = PyDict_GetItem(ldict, name);
|
||||
if (value == NULL)
|
||||
/* Fall back on generic to get __class__ and __dict__ */
|
||||
return PyObject_GenericGetAttr((PyObject *)self, name);
|
||||
|
||||
Py_INCREF(value);
|
||||
return value;
|
||||
}
|
||||
|
||||
static int
|
||||
local_setattro(localobject *self, PyObject *name, PyObject *v)
|
||||
{
|
||||
PyObject *ldict;
|
||||
|
||||
ldict = _ldict(self);
|
||||
if (ldict == NULL)
|
||||
return -1;
|
||||
|
||||
return PyObject_GenericSetAttr((PyObject *)self, name, v);
|
||||
}
|
||||
|
||||
static PyObject *
|
||||
local_getdict(localobject *self, void *closure)
|
||||
{
|
||||
if (self->dict == NULL) {
|
||||
PyErr_SetString(PyExc_AttributeError, "__dict__");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Py_INCREF(self->dict);
|
||||
return self->dict;
|
||||
}
|
||||
|
||||
static PyGetSetDef local_getset[] = {
|
||||
{"__dict__",
|
||||
(getter)local_getdict, (setter)0,
|
||||
"Local-data dictionary",
|
||||
NULL},
|
||||
{NULL} /* Sentinel */
|
||||
};
|
||||
|
||||
static PyTypeObject localtype = {
|
||||
PyObject_HEAD_INIT(NULL)
|
||||
/* ob_size */ 0,
|
||||
/* tp_name */ "thread._local",
|
||||
/* tp_basicsize */ sizeof(localobject),
|
||||
/* tp_itemsize */ 0,
|
||||
/* tp_dealloc */ (destructor)local_dealloc,
|
||||
/* tp_print */ (printfunc)0,
|
||||
/* tp_getattr */ (getattrfunc)0,
|
||||
/* tp_setattr */ (setattrfunc)0,
|
||||
/* tp_compare */ (cmpfunc)0,
|
||||
/* tp_repr */ (reprfunc)0,
|
||||
/* tp_as_number */ 0,
|
||||
/* tp_as_sequence */ 0,
|
||||
/* tp_as_mapping */ 0,
|
||||
/* tp_hash */ (hashfunc)0,
|
||||
/* tp_call */ (ternaryfunc)0,
|
||||
/* tp_str */ (reprfunc)0,
|
||||
/* tp_getattro */ (getattrofunc)local_getattro,
|
||||
/* tp_setattro */ (setattrofunc)local_setattro,
|
||||
/* tp_as_buffer */ 0,
|
||||
/* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
|
||||
/* tp_doc */ "Thread-local data",
|
||||
/* tp_traverse */ (traverseproc)local_traverse,
|
||||
/* tp_clear */ (inquiry)local_clear,
|
||||
/* tp_richcompare */ (richcmpfunc)0,
|
||||
/* tp_weaklistoffset */ (long)0,
|
||||
/* tp_iter */ (getiterfunc)0,
|
||||
/* tp_iternext */ (iternextfunc)0,
|
||||
/* tp_methods */ 0,
|
||||
/* tp_members */ 0,
|
||||
/* tp_getset */ local_getset,
|
||||
/* tp_base */ 0,
|
||||
/* tp_dict */ 0, /* internal use */
|
||||
/* tp_descr_get */ (descrgetfunc)0,
|
||||
/* tp_descr_set */ (descrsetfunc)0,
|
||||
/* tp_dictoffset */ offsetof(localobject, dict),
|
||||
/* tp_init */ (initproc)0,
|
||||
/* tp_alloc */ (allocfunc)0,
|
||||
/* tp_new */ (newfunc)local_new,
|
||||
/* tp_free */ 0, /* Low-level free-mem routine */
|
||||
/* tp_is_gc */ (inquiry)0, /* For PyObject_IS_GC */
|
||||
};
|
||||
|
||||
|
||||
/* Module functions */
|
||||
|
||||
|
@ -389,6 +642,10 @@ PyMODINIT_FUNC
|
|||
initthread(void)
|
||||
{
|
||||
PyObject *m, *d;
|
||||
|
||||
/* Initialize types: */
|
||||
if (PyType_Ready(&localtype) < 0)
|
||||
return;
|
||||
|
||||
/* Create the module and add the functions */
|
||||
m = Py_InitModule3("thread", thread_methods, thread_doc);
|
||||
|
@ -401,6 +658,9 @@ initthread(void)
|
|||
Py_INCREF(&Locktype);
|
||||
PyDict_SetItemString(d, "LockType", (PyObject *)&Locktype);
|
||||
|
||||
if (PyModule_AddObject(m, "_local", (PyObject *)&localtype) < 0)
|
||||
return;
|
||||
|
||||
/* Initialize the C thread library */
|
||||
PyThread_init_thread();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue