Breaking ground for PEP 3137 implementation:

Get rid of buffer().  Use memoryview() in its place where possible.
In a few places, do things a bit different, because memoryview()
can't slice (yet).
This commit is contained in:
Guido van Rossum 2007-10-08 02:46:15 +00:00
parent 85c1ba5d74
commit bae07c9baf
24 changed files with 72 additions and 199 deletions

View File

@ -491,7 +491,7 @@ class Sequence(metaclass=ABCMeta):
Sequence.register(tuple)
Sequence.register(basestring)
Sequence.register(buffer)
Sequence.register(memoryview)
class MutableSequence(Sequence):

View File

@ -6,7 +6,7 @@ import re
def dump(obj):
# helper function to dump memory contents in hex, with a hyphen
# between the bytes.
h = str(hexlify(buffer(obj)))
h = str(hexlify(memoryview(obj)))
return re.sub(r"(..)", r"\1-", h)[:-1]

View File

@ -4,7 +4,7 @@ from binascii import hexlify
from ctypes import *
def bin(s):
return str(hexlify(buffer(s))).upper()
return str(hexlify(memoryview(s))).upper()
# Each *simple* type that supports different byte orders has an
# __ctype_be__ attribute that specifies the same type in BIG ENDIAN

View File

@ -30,17 +30,17 @@ class StringArrayTestCase(unittest.TestCase):
buf.value = "Hello, World"
self.failUnlessEqual(buf.value, "Hello, World")
self.failUnlessRaises(TypeError, setattr, buf, "value", buffer("Hello, World"))
self.assertRaises(TypeError, setattr, buf, "value", buffer("abc"))
self.assertRaises(ValueError, setattr, buf, "raw", buffer("x" * 100))
self.failUnlessRaises(TypeError, setattr, buf, "value", memoryview(b"Hello, World"))
self.assertRaises(TypeError, setattr, buf, "value", memoryview(b"abc"))
self.assertRaises(ValueError, setattr, buf, "raw", memoryview(b"x" * 100))
def test_c_buffer_raw(self):
buf = c_buffer(32)
buf.raw = buffer(b"Hello, World")
buf.raw = memoryview(b"Hello, World")
self.failUnlessEqual(buf.value, "Hello, World")
self.assertRaises(TypeError, setattr, buf, "value", buffer("abc"))
self.assertRaises(ValueError, setattr, buf, "raw", buffer("x" * 100))
self.assertRaises(TypeError, setattr, buf, "value", memoryview(b"abc"))
self.assertRaises(ValueError, setattr, buf, "raw", memoryview(b"x" * 100))
def test_param_1(self):
BUF = c_char * 4

View File

@ -50,7 +50,7 @@ def TimestampFromTicks(ticks):
version_info = tuple([int(x) for x in version.split(".")])
sqlite_version_info = tuple([int(x) for x in sqlite_version.split(".")])
Binary = buffer
Binary = memoryview
def register_adapters_and_converters():
def adapt_date(val):

View File

@ -593,7 +593,7 @@ class ConstructorTests(unittest.TestCase):
ts = sqlite.TimestampFromTicks(42)
def CheckBinary(self):
b = sqlite.Binary(chr(0) + "'")
b = sqlite.Binary(b"\0'")
class ExtensionTests(unittest.TestCase):
def CheckScriptStringSql(self):

View File

@ -62,7 +62,7 @@ class SqliteTypeTests(unittest.TestCase):
self.failUnlessEqual(row[0], val)
def CheckBlob(self):
val = buffer(b"Guglhupf")
val = memoryview(b"Guglhupf")
self.cur.execute("insert into test(b) values (?)", (val,))
self.cur.execute("select b from test")
row = self.cur.fetchone()
@ -203,7 +203,7 @@ class DeclTypesTests(unittest.TestCase):
def CheckBlob(self):
# default
val = buffer(b"Guglhupf")
val = memoryview(b"Guglhupf")
self.cur.execute("insert into test(bin) values (?)", (val,))
self.cur.execute("select bin from test")
row = self.cur.fetchone()
@ -305,7 +305,7 @@ class BinaryConverterTests(unittest.TestCase):
def CheckBinaryInputForConverter(self):
testdata = b"abcdefg" * 10
result = self.con.execute('select ? as "x [bin]"', (buffer(bz2.compress(testdata)),)).fetchone()[0]
result = self.con.execute('select ? as "x [bin]"', (memoryview(bz2.compress(testdata)),)).fetchone()[0]
self.failUnlessEqual(testdata, result)
class DateTimeTests(unittest.TestCase):

View File

@ -36,7 +36,7 @@ def func_returnfloat():
def func_returnnull():
return None
def func_returnblob():
return buffer(b"blob")
return b"blob"
def func_raiseexception():
5/0
@ -49,7 +49,7 @@ def func_isfloat(v):
def func_isnone(v):
return type(v) is type(None)
def func_isblob(v):
return type(v) is buffer
return isinstance(v, (bytes, memoryview))
class AggrNoStep:
def __init__(self):
@ -100,7 +100,8 @@ class AggrCheckType:
self.val = None
def step(self, whichType, val):
theType = {"str": str, "int": int, "float": float, "None": type(None), "blob": buffer}
theType = {"str": str, "int": int, "float": float, "None": type(None),
"blob": bytes}
self.val = int(theType[whichType] is type(val))
def finalize(self):
@ -196,8 +197,8 @@ class FunctionTests(unittest.TestCase):
cur = self.con.cursor()
cur.execute("select returnblob()")
val = cur.fetchone()[0]
self.failUnlessEqual(type(val), buffer)
self.failUnlessEqual(val, buffer(b"blob"))
self.failUnlessEqual(type(val), bytes)
self.failUnlessEqual(val, memoryview(b"blob"))
def CheckFuncException(self):
cur = self.con.cursor()
@ -234,7 +235,7 @@ class FunctionTests(unittest.TestCase):
def CheckParamBlob(self):
cur = self.con.cursor()
cur.execute("select isblob(?)", (buffer(b"blob"),))
cur.execute("select isblob(?)", (memoryview(b"blob"),))
val = cur.fetchone()[0]
self.failUnlessEqual(val, 1)
@ -252,7 +253,7 @@ class AggregateTests(unittest.TestCase):
)
""")
cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
("foo", 5, 3.14, None, buffer(b"blob"),))
("foo", 5, 3.14, None, memoryview(b"blob"),))
self.con.create_aggregate("nostep", 1, AggrNoStep)
self.con.create_aggregate("nofinalize", 1, AggrNoFinalize)
@ -344,7 +345,7 @@ class AggregateTests(unittest.TestCase):
def CheckAggrCheckParamBlob(self):
cur = self.con.cursor()
cur.execute("select checkType('blob', ?)", (buffer(b"blob"),))
cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
val = cur.fetchone()[0]
self.failUnlessEqual(val, 1)

View File

@ -1041,8 +1041,11 @@ class Popen(object):
def _communicate(self, input):
if isinstance(input, str): # Unicode
input = input.encode("utf-8") # XXX What else?
if self.stdin:
if isinstance(input, str): # Unicode
input = input.encode("utf-8") # XXX What else?
if not isinstance(input, (bytes, str8)):
input = bytes(input)
read_set = []
write_set = []
stdout = None # Return
@ -1071,7 +1074,8 @@ class Popen(object):
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
chunk = input[input_offset : input_offset + 512]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()

View File

@ -708,7 +708,7 @@ class BaseTest(unittest.TestCase):
def test_buffer(self):
a = array.array(self.typecode, self.example)
b = bytes(buffer(a))
b = bytes(memoryview(a))
self.assertEqual(b[0], a.tostring()[0])
def test_weakref(self):

View File

@ -1,56 +0,0 @@
"""Unit tests for buffer objects.
For now, we just test (the brand new) rich comparison.
"""
import unittest
from test import test_support
class BufferTests(unittest.TestCase):
def test_comparison(self):
a = buffer("a.b.c")
b = buffer("a.b" + ".c")
self.assert_(a == b)
self.assert_(a <= b)
self.assert_(a >= b)
self.assert_(a == "a.b.c")
self.assert_(a <= "a.b.c")
self.assert_(a >= "a.b.c")
b = buffer("a.b.c.d")
self.assert_(a != b)
self.assert_(a <= b)
self.assert_(a < b)
self.assert_(a != "a.b.c.d")
self.assert_(a < "a.b.c.d")
self.assert_(a <= "a.b.c.d")
b = buffer("a.b")
self.assert_(a != b)
self.assert_(a >= b)
self.assert_(a > b)
self.assert_(a != "a.b")
self.assert_(a > "a.b")
self.assert_(a >= "a.b")
b = object()
self.assert_(a != b)
self.failIf(a == b)
self.assertRaises(TypeError, lambda: a < b)
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing.
s = bytes(range(255, -1, -1))
b = buffer(s)
indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
for start in indices:
for stop in indices:
# Skip step 0 (invalid)
for step in indices[1:]:
self.assertEqual(b[start:stop:step],
s[start:stop:step])
def test_main():
test_support.run_unittest(BufferTests)
if __name__ == "__main__":
test_main()

View File

@ -343,7 +343,7 @@ class BytesTest(unittest.TestCase):
def test_from_buffer(self):
sample = str8("Hello world\n\x80\x81\xfe\xff")
buf = buffer(sample)
buf = memoryview(sample)
b = bytes(buf)
self.assertEqual(b, bytes(map(ord, sample)))
@ -456,8 +456,8 @@ class BytesTest(unittest.TestCase):
b = bytes([0x1a, 0x2b, 0x30])
self.assertEquals(bytes.fromhex('1a2B30'), b)
self.assertEquals(bytes.fromhex(' 1A 2B 30 '), b)
self.assertEquals(bytes.fromhex(buffer(b'')), bytes())
self.assertEquals(bytes.fromhex(buffer(b'0000')), bytes([0, 0]))
self.assertEquals(bytes.fromhex(memoryview(b'')), bytes())
self.assertEquals(bytes.fromhex(memoryview(b'0000')), bytes([0, 0]))
self.assertRaises(ValueError, bytes.fromhex, 'a')
self.assertRaises(ValueError, bytes.fromhex, 'rt')
self.assertRaises(ValueError, bytes.fromhex, '1a b cd')
@ -630,7 +630,7 @@ class BytesTest(unittest.TestCase):
self.assertEqual(b' a bb c '.split(None, 3), [b'a', b'bb', b'c'])
def test_split_buffer(self):
self.assertEqual(b'a b'.split(buffer(b' ')), [b'a', b'b'])
self.assertEqual(b'a b'.split(memoryview(b' ')), [b'a', b'b'])
def test_split_string_error(self):
self.assertRaises(TypeError, b'a b'.split, ' ')
@ -653,7 +653,7 @@ class BytesTest(unittest.TestCase):
self.assertEqual(b' a bb c '.rsplit(None, 3), [b'a', b'bb', b'c'])
def test_rplit_buffer(self):
self.assertEqual(b'a b'.rsplit(buffer(b' ')), [b'a', b'b'])
self.assertEqual(b'a b'.rsplit(memoryview(b' ')), [b'a', b'b'])
def test_rplit_string_error(self):
self.assertRaises(TypeError, b'a b'.rsplit, ' ')
@ -707,9 +707,9 @@ class BytesTest(unittest.TestCase):
self.assertEqual(b.rstrip(), b' \t\n\r\f\vabc')
def test_strip_buffer(self):
self.assertEqual(b'abc'.strip(buffer(b'ac')), b'b')
self.assertEqual(b'abc'.lstrip(buffer(b'ac')), b'bc')
self.assertEqual(b'abc'.rstrip(buffer(b'ac')), b'ab')
self.assertEqual(b'abc'.strip(memoryview(b'ac')), b'b')
self.assertEqual(b'abc'.lstrip(memoryview(b'ac')), b'bc')
self.assertEqual(b'abc'.rstrip(memoryview(b'ac')), b'ab')
def test_strip_string_error(self):
self.assertRaises(TypeError, b'abc'.strip, 'b')

View File

@ -251,7 +251,7 @@ class IOTest(unittest.TestCase):
def test_array_writes(self):
a = array.array('i', range(10))
n = len(buffer(a))
n = len(memoryview(a))
f = io.open(test_support.TESTFN, "wb", 0)
self.assertEqual(f.write(a), n)
f.close()

View File

@ -98,9 +98,9 @@ class StringTestCase(unittest.TestCase, HelperMixin):
for s in ["", "Andr\xe8 Previn", "abc", " "*10000]:
self.helper(s)
def test_buffer(self):
def test_bytes(self):
for s in [b"", b"Andr\xe8 Previn", b"abc", b" "*10000]:
self.helper(buffer(s))
self.helper(s)
class ExceptionTestCase(unittest.TestCase):
def test_exceptions(self):

View File

@ -163,12 +163,6 @@ class ReprTests(unittest.TestCase):
eq(r([[[[[[{}]]]]]]), "[[[[[[{}]]]]]]")
eq(r([[[[[[[{}]]]]]]]), "[[[[[[[...]]]]]]]")
def test_buffer(self):
# XXX doesn't test buffers with no b_base or read-write buffers (see
# bufferobject.c). The test is fairly incomplete too. Sigh.
x = buffer('foo')
self.failUnless(repr(x).startswith('<read-only buffer for 0x'))
def test_cell(self):
# XXX Hmm? How to get at a cell object?
pass

View File

@ -541,7 +541,7 @@ def test_1530559():
test_1530559()
###########################################################################
# Packing and unpacking to/from buffers.
# Packing and unpacking to/from memory views.
# Copied and modified from unittest.
def assertRaises(excClass, callableObj, *args, **kwargs):
@ -556,7 +556,7 @@ def test_unpack_from():
test_string = b'abcd01234'
fmt = '4s'
s = struct.Struct(fmt)
for cls in (str, str8, buffer, bytes):
for cls in (str, str8, bytes): # XXX + memoryview
if verbose:
print("test_unpack_from using", cls.__name__)
data = cls(test_string)
@ -567,7 +567,7 @@ def test_unpack_from():
vereq(s.unpack_from(data, i), (data[i:i+4],))
for i in range(6, len(test_string) + 1):
simple_err(s.unpack_from, data, i)
for cls in (str, buffer):
for cls in (str, str8, bytes): # XXX + memoryview
data = cls(test_string)
vereq(struct.unpack_from(fmt, data), ('abcd',))
vereq(struct.unpack_from(fmt, data, 2), ('cd01',))
@ -619,19 +619,19 @@ def test_pack_into_fn():
assertRaises(struct.error, pack_into, small_buf, 0, test_string)
assertRaises(struct.error, pack_into, small_buf, 2, test_string)
def test_unpack_with_buffer():
def test_unpack_with_memoryview():
# SF bug 1563759: struct.unpack doens't support buffer protocol objects
data1 = array.array('B', b'\x12\x34\x56\x78')
data2 = buffer(b'......\x12\x34\x56\x78......', 6, 4)
data2 = memoryview(b'\x12\x34\x56\x78') # XXX b'......XXXX......', 6, 4
for data in [data1, data2]:
value, = struct.unpack('>I', data)
vereq(value, 0x12345678)
# Test methods to pack and unpack from buffers rather than strings.
# Test methods to pack and unpack from memoryviews rather than strings.
test_unpack_from()
test_pack_into()
test_pack_into_fn()
test_unpack_with_buffer()
test_unpack_with_memoryview()
def test_bool():
for prefix in tuple("<>!=")+('',):

View File

@ -203,54 +203,6 @@ class TypesTests(unittest.TestCase):
self.assertRaises(TypeError, type, 1, 2)
self.assertRaises(TypeError, type, 1, 2, 3, 4)
def test_buffers(self):
self.assertRaises(ValueError, buffer, 'asdf', -1)
self.assertRaises(TypeError, buffer, None)
a = buffer(b'asdf')
hash(a)
b = a * 5
if a == b:
self.fail('buffers should not be equal')
if str(b) != ('asdf' * 5):
self.fail('repeated buffer has wrong content')
if str(a * 0) != '':
self.fail('repeated buffer zero times has wrong content')
if str(a + buffer(b'def')) != 'asdfdef':
self.fail('concatenation of buffers yields wrong content')
if str(buffer(a)) != 'asdf':
self.fail('composing buffers failed')
if str(buffer(a, 2)) != 'df':
self.fail('specifying buffer offset failed')
if str(buffer(a, 0, 2)) != 'as':
self.fail('specifying buffer size failed')
if str(buffer(a, 1, 2)) != 'sd':
self.fail('specifying buffer offset and size failed')
self.assertRaises(ValueError, buffer, buffer(b'asdf', 1), -1)
if str(buffer(buffer(b'asdf', 0, 2), 0)) != 'as':
self.fail('composing length-specified buffer failed')
if str(buffer(buffer(b'asdf', 0, 2), 0, 5000)) != 'as':
self.fail('composing length-specified buffer failed')
if str(buffer(buffer(b'asdf', 0, 2), 0, -1)) != 'as':
self.fail('composing length-specified buffer failed')
if str(buffer(buffer(b'asdf', 0, 2), 1, 2)) != 's':
self.fail('composing length-specified buffer failed')
try: a[1] = 'g'
except TypeError: pass
else: self.fail("buffer assignment should raise TypeError")
try: a[0:1] = 'g'
except TypeError: pass
else: self.fail("buffer slice assignment should raise TypeError")
# array.array() returns an object that does not implement a char buffer,
# something which int() uses for conversion.
import array
try: int(buffer(array.array('b')))
except TypeError: pass
else: self.fail("char buffer (at C level) not working")
def test_main():
run_unittest(TypesTests)

View File

@ -713,7 +713,7 @@ class UnicodeTest(
if not sys.platform.startswith('java'):
self.assertEqual(
str(
buffer(b'character buffers are decoded to unicode'),
memoryview(b'character buffers are decoded to unicode'),
'utf-8',
'strict'
),

View File

@ -22,8 +22,6 @@ try:
except NameError:
pass
BufferType = buffer
TupleType = tuple
ListType = list
DictType = DictionaryType = dict

View File

@ -739,18 +739,12 @@ CharArray_set_raw(CDataObject *self, PyObject *value)
{
char *ptr;
Py_ssize_t size;
int rel = 0;
Py_buffer view;
if (PyBuffer_Check(value)) {
if (PyObject_GetBuffer(value, &view, PyBUF_SIMPLE) < 0)
return -1;
size = view.len;
ptr = view.buf;
rel = 1;
} else if (-1 == PyString_AsStringAndSize(value, &ptr, &size)) {
if (PyObject_GetBuffer(value, &view, PyBUF_SIMPLE) < 0)
return -1;
}
size = view.len;
ptr = view.buf;
if (size > self->b_size) {
PyErr_SetString(PyExc_ValueError,
"string too long");
@ -759,12 +753,10 @@ CharArray_set_raw(CDataObject *self, PyObject *value)
memcpy(self->b_ptr, ptr, size);
if (rel)
PyObject_ReleaseBuffer(value, &view);
PyObject_ReleaseBuffer(value, &view);
return 0;
fail:
if (rel)
PyObject_ReleaseBuffer(value, &view);
PyObject_ReleaseBuffer(value, &view);
return -1;
}

View File

@ -425,16 +425,16 @@ void _pysqlite_set_result(sqlite3_context* context, PyObject* py_val)
sqlite3_result_int64(context, (PY_LONG_LONG)longval);
} else if (PyFloat_Check(py_val)) {
sqlite3_result_double(context, PyFloat_AsDouble(py_val));
} else if (PyBuffer_Check(py_val)) {
} else if (PyString_Check(py_val)) {
sqlite3_result_text(context, PyString_AsString(py_val), -1, SQLITE_TRANSIENT);
} else if (PyUnicode_Check(py_val)) {
sqlite3_result_text(context, PyUnicode_AsString(py_val), -1, SQLITE_TRANSIENT);
} else if (PyObject_CheckBuffer(py_val)) {
if (PyObject_AsCharBuffer(py_val, &buffer, &buflen) != 0) {
PyErr_SetString(PyExc_ValueError, "could not convert BLOB to buffer");
} else {
sqlite3_result_blob(context, buffer, buflen, SQLITE_TRANSIENT);
}
} else if (PyString_Check(py_val)) {
sqlite3_result_text(context, PyString_AsString(py_val), -1, SQLITE_TRANSIENT);
} else if (PyUnicode_Check(py_val)) {
sqlite3_result_text(context, PyUnicode_AsString(py_val), -1, SQLITE_TRANSIENT);
} else {
/* TODO: raise error */
}
@ -478,16 +478,8 @@ PyObject* _pysqlite_build_py_params(sqlite3_context *context, int argc, sqlite3_
break;
case SQLITE_BLOB:
buflen = sqlite3_value_bytes(cur_value);
cur_py_value = PyBuffer_New(buflen);
if (!cur_py_value) {
break;
}
if (PyObject_AsWriteBuffer(cur_py_value, &raw_buffer, &buflen)) {
Py_DECREF(cur_py_value);
cur_py_value = NULL;
break;
}
memcpy(raw_buffer, sqlite3_value_blob(cur_value), buflen);
cur_py_value = PyBytes_FromStringAndSize(
sqlite3_value_blob(cur_value), buflen);
break;
case SQLITE_NULL:
default:

View File

@ -380,14 +380,11 @@ PyObject* _pysqlite_fetch_one_row(pysqlite_Cursor* self)
} else {
/* coltype == SQLITE_BLOB */
nbytes = sqlite3_column_bytes(self->statement->st, i);
buffer = PyBuffer_New(nbytes);
buffer = PyBytes_FromStringAndSize(
sqlite3_column_blob(self->statement->st, i), nbytes);
if (!buffer) {
break;
}
if (PyObject_AsWriteBuffer(buffer, &raw_buffer, &nbytes)) {
break;
}
memcpy(raw_buffer, sqlite3_column_blob(self->statement->st, i), nbytes);
converted = buffer;
}
}

View File

@ -102,13 +102,6 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec
#endif
} else if (PyFloat_Check(parameter)) {
rc = sqlite3_bind_double(self->st, pos, PyFloat_AsDouble(parameter));
} else if (PyBuffer_Check(parameter)) {
if (PyObject_AsCharBuffer(parameter, &buffer, &buflen) == 0) {
rc = sqlite3_bind_blob(self->st, pos, buffer, buflen, SQLITE_TRANSIENT);
} else {
PyErr_SetString(PyExc_ValueError, "could not convert BLOB to buffer");
rc = -1;
}
} else if PyString_Check(parameter) {
string = PyString_AsString(parameter);
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
@ -118,6 +111,13 @@ int pysqlite_statement_bind_parameter(pysqlite_Statement* self, int pos, PyObjec
rc = sqlite3_bind_text(self->st, pos, string, -1, SQLITE_TRANSIENT);
Py_DECREF(stringval);
} else if (PyObject_CheckBuffer(parameter)) {
if (PyObject_AsCharBuffer(parameter, &buffer, &buflen) == 0) {
rc = sqlite3_bind_blob(self->st, pos, buffer, buflen, SQLITE_TRANSIENT);
} else {
PyErr_SetString(PyExc_ValueError, "could not convert BLOB to buffer");
rc = -1;
}
} else {
rc = -1;
}

View File

@ -1787,8 +1787,7 @@ _PyBuiltin_Init(void)
SETBUILTIN("True", Py_True);
SETBUILTIN("basestring", &PyBaseString_Type);
SETBUILTIN("bool", &PyBool_Type);
SETBUILTIN("buffer", &PyBuffer_Type);
SETBUILTIN("memoryview", &PyMemoryView_Type);
SETBUILTIN("memoryview", &PyMemoryView_Type);
SETBUILTIN("bytes", &PyBytes_Type);
SETBUILTIN("classmethod", &PyClassMethod_Type);
#ifndef WITHOUT_COMPLEX