cpython/Lib/multiprocessing/sharedctypes.py

235 lines
5.8 KiB
Python

#
# Module which supports allocation of ctypes objects from shared memory
#
# multiprocessing/sharedctypes.py
#
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
#
import sys
import ctypes
import weakref
import copyreg
from multiprocessing import heap, RLock
from multiprocessing.forking import assert_spawning
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
#
#
#
typecode_to_type = {
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
}
#
#
#
def _new_value(type_):
size = ctypes.sizeof(type_)
wrapper = heap.BufferWrapper(size)
return rebuild_ctype(type_, wrapper, None)
def RawValue(typecode_or_type, *args):
'''
Returns a ctypes object allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
obj.__init__(*args)
return obj
def RawArray(typecode_or_type, size_or_initializer):
'''
Returns a ctypes array allocated from shared memory
'''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer
return _new_value(type_)
else:
type_ = type_ * len(size_or_initializer)
result = _new_value(type_)
result.__init__(*size_or_initializer)
return result
def Value(typecode_or_type, *args, **kwds):
'''
Return a synchronization wrapper for a Value
'''
lock = kwds.pop('lock', None)
if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
obj = RawValue(typecode_or_type, *args)
if lock is None:
lock = RLock()
assert hasattr(lock, 'acquire')
return synchronized(obj, lock)
def Array(typecode_or_type, size_or_initializer, **kwds):
'''
Return a synchronization wrapper for a RawArray
'''
lock = kwds.pop('lock', None)
if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
obj = RawArray(typecode_or_type, size_or_initializer)
if lock is None:
lock = RLock()
assert hasattr(lock, 'acquire')
return synchronized(obj, lock)
def copy(obj):
new_obj = _new_value(type(obj))
ctypes.pointer(new_obj)[0] = obj
return new_obj
def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock)
else:
cls = type(obj)
try:
scls = class_cache[cls]
except KeyError:
names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock)
#
# Functions for pickling/unpickling
#
def reduce_ctype(obj):
assert_spawning(obj)
if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else:
return rebuild_ctype, (type(obj), obj._wrapper, None)
def rebuild_ctype(type_, wrapper, length):
if length is not None:
type_ = type_ * length
if sys.platform == 'win32' and type_ not in copyreg.dispatch_table:
copyreg.pickle(type_, reduce_ctype)
obj = type_.from_address(wrapper.get_address())
obj._wrapper = wrapper
return obj
#
# Function to create properties
#
def make_property(name):
try:
return prop_cache[name]
except KeyError:
d = {}
exec(template % ((name,)*7), d)
prop_cache[name] = d[name]
return d[name]
template = '''
def get%s(self):
self.acquire()
try:
return self._obj.%s
finally:
self.release()
def set%s(self, value):
self.acquire()
try:
self._obj.%s = value
finally:
self.release()
%s = property(get%s, set%s)
'''
prop_cache = {}
class_cache = weakref.WeakKeyDictionary()
#
# Synchronized wrappers
#
class SynchronizedBase(object):
def __init__(self, obj, lock=None):
self._obj = obj
self._lock = lock or RLock()
self.acquire = self._lock.acquire
self.release = self._lock.release
def __reduce__(self):
assert_spawning(self)
return synchronized, (self._obj, self._lock)
def get_obj(self):
return self._obj
def get_lock(self):
return self._lock
def __repr__(self):
return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
class Synchronized(SynchronizedBase):
value = make_property('value')
class SynchronizedArray(SynchronizedBase):
def __len__(self):
return len(self._obj)
def __getitem__(self, i):
self.acquire()
try:
return self._obj[i]
finally:
self.release()
def __setitem__(self, i, value):
self.acquire()
try:
self._obj[i] = value
finally:
self.release()
def __getslice__(self, start, stop):
self.acquire()
try:
return self._obj[start:stop]
finally:
self.release()
def __setslice__(self, start, stop, values):
self.acquire()
try:
self._obj[start:stop] = values
finally:
self.release()
class SynchronizedString(SynchronizedArray):
value = make_property('value')
raw = make_property('raw')