Add from_buffer and from_buffer_copy class methods to ctypes types.

This commit is contained in:
Thomas Heller 2008-04-25 15:44:16 +00:00
parent 5364e2e46f
commit 6ad5fbb7ea
4 changed files with 221 additions and 0 deletions

View File

@ -1950,6 +1950,28 @@ Data types
exact, they are methods of the :term:`metaclass`):
.. method:: _CData.from_buffer(source[, offset])
This method returns a ctypes instance that shares the buffer of
the ``source`` object. The ``source`` object must support the
writeable buffer interface. The optional ``offset`` parameter
specifies an offset into the source buffer in bytes; the default
is zero. If the source buffer is not large enough a ValueError
is raised.
.. versionadded:: 2.6
.. method:: _CData.from_buffer_copy(source[, offset])
This method creates a ctypes instance, the buffer is copied from
the source object buffer which must be readable. The optional
``offset`` parameter specifies an offset into the source buffer
in bytes; the default is zero. If the source buffer is not
large enough a ValueError is raised.
.. versionadded:: 2.6
.. method:: from_address(address)
This method returns a ctypes type instance using the memory specified by

View File

@ -0,0 +1,81 @@
from ctypes import *
import array
import gc
import unittest
class X(Structure):
_fields_ = [("c_int", c_int)]
init_called = False
def __init__(self):
self._init_called = True
class Test(unittest.TestCase):
def test_fom_buffer(self):
a = array.array("i", range(16))
x = (c_int * 16).from_buffer(a)
y = X.from_buffer(a)
self.assertEqual(y.c_int, a[0])
self.failIf(y.init_called)
self.assertEqual(x[:], a.tolist())
a[0], a[-1] = 200, -200
self.assertEqual(x[:], a.tolist())
self.assert_(a in x._objects.values())
self.assertRaises(ValueError,
c_int.from_buffer, a, -1)
expected = x[:]
del a; gc.collect(); gc.collect(); gc.collect()
self.assertEqual(x[:], expected)
self.assertRaises(TypeError,
(c_char * 16).from_buffer, "a" * 16)
def test_fom_buffer_with_offset(self):
a = array.array("i", range(16))
x = (c_int * 15).from_buffer(a, sizeof(c_int))
self.assertEqual(x[:], a.tolist()[1:])
self.assertRaises(ValueError, lambda: (c_int * 16).from_buffer(a, sizeof(c_int)))
self.assertRaises(ValueError, lambda: (c_int * 1).from_buffer(a, 16 * sizeof(c_int)))
def test_from_buffer_copy(self):
a = array.array("i", range(16))
x = (c_int * 16).from_buffer_copy(a)
y = X.from_buffer_copy(a)
self.assertEqual(y.c_int, a[0])
self.failIf(y.init_called)
self.assertEqual(x[:], range(16))
a[0], a[-1] = 200, -200
self.assertEqual(x[:], range(16))
self.assertEqual(x._objects, None)
self.assertRaises(ValueError,
c_int.from_buffer, a, -1)
del a; gc.collect(); gc.collect(); gc.collect()
self.assertEqual(x[:], range(16))
x = (c_char * 16).from_buffer_copy("a" * 16)
self.assertEqual(x[:], "a" * 16)
def test_fom_buffer_copy_with_offset(self):
a = array.array("i", range(16))
x = (c_int * 15).from_buffer_copy(a, sizeof(c_int))
self.assertEqual(x[:], a.tolist()[1:])
self.assertRaises(ValueError,
(c_int * 16).from_buffer_copy, a, sizeof(c_int))
self.assertRaises(ValueError,
(c_int * 1).from_buffer_copy, a, 16 * sizeof(c_int))
if __name__ == '__main__':
unittest.main()

View File

@ -43,6 +43,9 @@ Extensions Modules
Library
-------
- Add from_buffer() and from_buffer_copy() class methods to ctypes
data types
- Issue #2682: ctypes callback functions no longer contain a cyclic
reference to themselves.

View File

@ -412,6 +412,115 @@ CDataType_from_address(PyObject *type, PyObject *value)
return CData_AtAddress(type, buf);
}
static char from_buffer_doc[] =
"C.from_buffer(object, offset=0) -> C instance\ncreate a C instance from a writeable buffer";
static int
KeepRef(CDataObject *target, Py_ssize_t index, PyObject *keep);
static PyObject *
CDataType_from_buffer(PyObject *type, PyObject *args)
{
void *buffer;
Py_ssize_t buffer_len;
Py_ssize_t offset = 0;
PyObject *obj, *result;
StgDictObject *dict = PyType_stgdict(type);
assert (dict);
if (!PyArg_ParseTuple(args,
#if (PY_VERSION_HEX < 0x02050000)
"O|i:from_buffer",
#else
"O|n:from_buffer",
#endif
&obj, &offset))
return NULL;
if (-1 == PyObject_AsWriteBuffer(obj, &buffer, &buffer_len))
return NULL;
if (offset < 0) {
PyErr_SetString(PyExc_ValueError,
"offset cannit be negative");
return NULL;
}
if (dict->size > buffer_len - offset) {
PyErr_Format(PyExc_ValueError,
#if (PY_VERSION_HEX < 0x02050000)
"Buffer size too small (%d instead of at least %d bytes)",
#else
"Buffer size too small (%zd instead of at least %zd bytes)",
#endif
buffer_len, dict->size + offset);
return NULL;
}
result = CData_AtAddress(type, (char *)buffer + offset);
if (result == NULL)
return NULL;
Py_INCREF(obj);
if (-1 == KeepRef((CDataObject *)result, -1, obj)) {
Py_DECREF(result);
return NULL;
}
return result;
}
static char from_buffer_copy_doc[] =
"C.from_buffer_copy(object, offset=0) -> C instance\ncreate a C instance from a readable buffer";
static PyObject *
GenericCData_new(PyTypeObject *type, PyObject *args, PyObject *kwds);
static PyObject *
CDataType_from_buffer_copy(PyObject *type, PyObject *args)
{
void *buffer;
Py_ssize_t buffer_len;
Py_ssize_t offset = 0;
PyObject *obj, *result;
StgDictObject *dict = PyType_stgdict(type);
assert (dict);
if (!PyArg_ParseTuple(args,
#if (PY_VERSION_HEX < 0x02050000)
"O|i:from_buffer",
#else
"O|n:from_buffer",
#endif
&obj, &offset))
return NULL;
if (-1 == PyObject_AsReadBuffer(obj, &buffer, &buffer_len))
return NULL;
if (offset < 0) {
PyErr_SetString(PyExc_ValueError,
"offset cannit be negative");
return NULL;
}
if (dict->size > buffer_len - offset) {
PyErr_Format(PyExc_ValueError,
#if (PY_VERSION_HEX < 0x02050000)
"Buffer size too small (%d instead of at least %d bytes)",
#else
"Buffer size too small (%zd instead of at least %zd bytes)",
#endif
buffer_len, dict->size + offset);
return NULL;
}
result = GenericCData_new((PyTypeObject *)type, NULL, NULL);
if (result == NULL)
return NULL;
memcpy(((CDataObject *)result)->b_ptr,
(char *)buffer+offset, dict->size);
return result;
}
static char in_dll_doc[] =
"C.in_dll(dll, name) -> C instance\naccess a C instance in a dll";
@ -516,6 +625,8 @@ CDataType_from_param(PyObject *type, PyObject *value)
static PyMethodDef CDataType_methods[] = {
{ "from_param", CDataType_from_param, METH_O, from_param_doc },
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
{ "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
{ "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc },
{ NULL, NULL },
};
@ -845,6 +956,8 @@ PointerType_from_param(PyObject *type, PyObject *value)
static PyMethodDef PointerType_methods[] = {
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
{ "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
{ "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc},
{ "from_param", (PyCFunction)PointerType_from_param, METH_O, from_param_doc},
{ "set_type", (PyCFunction)PointerType_set_type, METH_O },
@ -1859,6 +1972,8 @@ SimpleType_from_param(PyObject *type, PyObject *value)
static PyMethodDef SimpleType_methods[] = {
{ "from_param", SimpleType_from_param, METH_O, from_param_doc },
{ "from_address", CDataType_from_address, METH_O, from_address_doc },
{ "from_buffer", CDataType_from_buffer, METH_VARARGS, from_buffer_doc, },
{ "from_buffer_copy", CDataType_from_buffer_copy, METH_VARARGS, from_buffer_copy_doc, },
{ "in_dll", CDataType_in_dll, METH_VARARGS, in_dll_doc},
{ NULL, NULL },
};