Make a few more tests pass with the new I/O library.
Fix the truncate() semantics -- it should not affect the current position. Switch wave.py/chunk.py to struct.unpack_from() to support bytes. Don't use writelines() on binary files (test_fileinput.py).
This commit is contained in:
parent
b6f1fdc90c
commit
dc0b1a1069
|
@ -62,7 +62,7 @@ class Chunk:
|
||||||
if len(self.chunkname) < 4:
|
if len(self.chunkname) < 4:
|
||||||
raise EOFError
|
raise EOFError
|
||||||
try:
|
try:
|
||||||
self.chunksize = struct.unpack(strflag+'L', file.read(4))[0]
|
self.chunksize = struct.unpack_from(strflag+'L', file.read(4))[0]
|
||||||
except struct.error:
|
except struct.error:
|
||||||
raise EOFError
|
raise EOFError
|
||||||
if inclheader:
|
if inclheader:
|
||||||
|
|
|
@ -551,8 +551,6 @@ class _MemoryIOMixin(BufferedIOBase):
|
||||||
def truncate(self, pos=None):
|
def truncate(self, pos=None):
|
||||||
if pos is None:
|
if pos is None:
|
||||||
pos = self._pos
|
pos = self._pos
|
||||||
else:
|
|
||||||
self._pos = max(0, pos)
|
|
||||||
del self._buffer[pos:]
|
del self._buffer[pos:]
|
||||||
return pos
|
return pos
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,8 @@ from fileinput import FileInput, hook_encoded
|
||||||
def writeTmp(i, lines, mode='w'): # opening in text mode is the default
|
def writeTmp(i, lines, mode='w'): # opening in text mode is the default
|
||||||
name = TESTFN + str(i)
|
name = TESTFN + str(i)
|
||||||
f = open(name, mode)
|
f = open(name, mode)
|
||||||
f.writelines(lines)
|
for line in lines:
|
||||||
|
f.write(line)
|
||||||
f.close()
|
f.close()
|
||||||
return name
|
return name
|
||||||
|
|
||||||
|
|
|
@ -93,7 +93,7 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertEqual(f.seek(-1, 2), 13)
|
self.assertEqual(f.seek(-1, 2), 13)
|
||||||
self.assertEqual(f.tell(), 13)
|
self.assertEqual(f.tell(), 13)
|
||||||
self.assertEqual(f.truncate(12), 12)
|
self.assertEqual(f.truncate(12), 12)
|
||||||
self.assertEqual(f.tell(), 12)
|
self.assertEqual(f.tell(), 13)
|
||||||
|
|
||||||
def read_ops(self, f, buffered=False):
|
def read_ops(self, f, buffered=False):
|
||||||
data = f.read(5)
|
data = f.read(5)
|
||||||
|
@ -135,7 +135,7 @@ class IOTest(unittest.TestCase):
|
||||||
self.assertEqual(f.tell(), self.LARGE + 2)
|
self.assertEqual(f.tell(), self.LARGE + 2)
|
||||||
self.assertEqual(f.seek(0, 2), self.LARGE + 2)
|
self.assertEqual(f.seek(0, 2), self.LARGE + 2)
|
||||||
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
|
self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
|
||||||
self.assertEqual(f.tell(), self.LARGE + 1)
|
self.assertEqual(f.tell(), self.LARGE + 2)
|
||||||
self.assertEqual(f.seek(0, 2), self.LARGE + 1)
|
self.assertEqual(f.seek(0, 2), self.LARGE + 1)
|
||||||
self.assertEqual(f.seek(-1, 2), self.LARGE)
|
self.assertEqual(f.seek(-1, 2), self.LARGE)
|
||||||
self.assertEqual(f.read(2), b"x")
|
self.assertEqual(f.read(2), b"x")
|
||||||
|
|
|
@ -256,9 +256,9 @@ class Wave_read:
|
||||||
#
|
#
|
||||||
|
|
||||||
def _read_fmt_chunk(self, chunk):
|
def _read_fmt_chunk(self, chunk):
|
||||||
wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign = struct.unpack('<hhllh', chunk.read(14))
|
wFormatTag, self._nchannels, self._framerate, dwAvgBytesPerSec, wBlockAlign = struct.unpack_from('<hhllh', chunk.read(14))
|
||||||
if wFormatTag == WAVE_FORMAT_PCM:
|
if wFormatTag == WAVE_FORMAT_PCM:
|
||||||
sampwidth = struct.unpack('<h', chunk.read(2))[0]
|
sampwidth = struct.unpack_from('<h', chunk.read(2))[0]
|
||||||
self._sampwidth = (sampwidth + 7) // 8
|
self._sampwidth = (sampwidth + 7) // 8
|
||||||
else:
|
else:
|
||||||
raise Error, 'unknown format: %r' % (wFormatTag,)
|
raise Error, 'unknown format: %r' % (wFormatTag,)
|
||||||
|
|
|
@ -519,7 +519,7 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
|
||||||
{
|
{
|
||||||
PyObject *posobj = NULL;
|
PyObject *posobj = NULL;
|
||||||
Py_off_t pos;
|
Py_off_t pos;
|
||||||
int fd, whence;
|
int fd;
|
||||||
|
|
||||||
fd = self->fd;
|
fd = self->fd;
|
||||||
if (fd < 0)
|
if (fd < 0)
|
||||||
|
@ -530,17 +530,14 @@ fileio_truncate(PyFileIOObject *self, PyObject *args)
|
||||||
if (!PyArg_ParseTuple(args, "|O", &posobj))
|
if (!PyArg_ParseTuple(args, "|O", &posobj))
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
if (posobj == Py_None)
|
if (posobj == Py_None || posobj == NULL) {
|
||||||
posobj = NULL;
|
posobj = portable_lseek(fd, NULL, 1);
|
||||||
|
|
||||||
if (posobj == NULL)
|
|
||||||
whence = 1;
|
|
||||||
else
|
|
||||||
whence = 0;
|
|
||||||
|
|
||||||
posobj = portable_lseek(fd, posobj, whence);
|
|
||||||
if (posobj == NULL)
|
if (posobj == NULL)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Py_INCREF(posobj);
|
||||||
|
}
|
||||||
|
|
||||||
#if !defined(HAVE_LARGEFILE_SUPPORT)
|
#if !defined(HAVE_LARGEFILE_SUPPORT)
|
||||||
pos = PyInt_AsLong(posobj);
|
pos = PyInt_AsLong(posobj);
|
||||||
|
|
Loading…
Reference in New Issue