Issue #19641: Added the audioop.byteswap() function to convert big-endian

samples to little-endian and vice versa.
This commit is contained in:
Serhiy Storchaka 2013-11-23 22:26:01 +02:00
parent 2b38fc187c
commit 3062c9a6c8
10 changed files with 80 additions and 78 deletions

View File

@ -77,6 +77,14 @@ The module defines the following variables and functions:
sample. Samples wrap around in case of overflow. sample. Samples wrap around in case of overflow.
.. function:: byteswap(fragment, width)
"Byteswap" all samples in a fragment and returns the modified fragment.
Converts big-endian samples to little-endian and vice versa.
.. versionadded: 3.4
.. function:: cross(fragment, width) .. function:: cross(fragment, width)
Return the number of zero crossings in the fragment passed as an argument. Return the number of zero crossings in the fragment passed as an argument.

View File

@ -415,6 +415,9 @@ audioop
Added support for 24-bit samples (:issue:`12866`). Added support for 24-bit samples (:issue:`12866`).
Added the :func:`~audioop.byteswap` function to convert big-endian samples
to little-endian and vice versa (:issue:`19641`).
base64 base64
------ ------

View File

@ -5,24 +5,6 @@ import io
import pickle import pickle
import sys import sys
def byteswap2(data):
a = array.array('h')
a.frombytes(data)
a.byteswap()
return a.tobytes()
def byteswap3(data):
ba = bytearray(data)
ba[::3] = data[2::3]
ba[2::3] = data[::3]
return bytes(ba)
def byteswap4(data):
a = array.array('i')
a.frombytes(data)
a.byteswap()
return a.tobytes()
class UnseekableIO(io.FileIO): class UnseekableIO(io.FileIO):
def tell(self): def tell(self):
raise io.UnsupportedOperation raise io.UnsupportedOperation

View File

@ -1,6 +1,7 @@
from test.support import findfile, TESTFN, unlink from test.support import findfile, TESTFN, unlink
import unittest import unittest
from test import audiotests from test import audiotests
from audioop import byteswap
import os import os
import io import io
import sys import sys
@ -122,7 +123,7 @@ class AifcULAWTest(AifcTest, unittest.TestCase):
E5040CBC 617C0A3C 08BC0A3C 2C7C0B3C 517C0E3C 8A8410FC B6840EBC 457C0A3C \ E5040CBC 617C0A3C 08BC0A3C 2C7C0B3C 517C0E3C 8A8410FC B6840EBC 457C0A3C \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap2(frames) frames = byteswap(frames, 2)
class AifcALAWTest(AifcTest, unittest.TestCase): class AifcALAWTest(AifcTest, unittest.TestCase):
@ -143,7 +144,7 @@ class AifcALAWTest(AifcTest, unittest.TestCase):
E4800CC0 62000A40 08C00A40 2B000B40 52000E40 8A001180 B6000EC0 46000A40 \ E4800CC0 62000A40 08C00A40 2B000B40 52000E40 8A001180 B6000EC0 46000A40 \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap2(frames) frames = byteswap(frames, 2)
class AifcMiscTest(audiotests.AudioTests, unittest.TestCase): class AifcMiscTest(audiotests.AudioTests, unittest.TestCase):

View File

@ -448,6 +448,23 @@ class TestAudioop(unittest.TestCase):
self.assertEqual(audioop.getsample(data, w, 3), maxvalues[w]) self.assertEqual(audioop.getsample(data, w, 3), maxvalues[w])
self.assertEqual(audioop.getsample(data, w, 4), minvalues[w]) self.assertEqual(audioop.getsample(data, w, 4), minvalues[w])
def test_byteswap(self):
swapped_datas = {
1: datas[1],
2: packs[2](0, 0x3412, 0x6745, -0x6646, -0x81, 0x80, -1),
3: packs[3](0, 0x563412, -0x7698bb, 0x7798ba, -0x81, 0x80, -1),
4: packs[4](0, 0x78563412, -0x547698bb, 0x557698ba,
-0x81, 0x80, -1),
}
for w in 1, 2, 3, 4:
self.assertEqual(audioop.byteswap(b'', w), b'')
self.assertEqual(audioop.byteswap(datas[w], w), swapped_datas[w])
self.assertEqual(audioop.byteswap(swapped_datas[w], w), datas[w])
self.assertEqual(audioop.byteswap(bytearray(datas[w]), w),
swapped_datas[w])
self.assertEqual(audioop.byteswap(memoryview(datas[w]), w),
swapped_datas[w])
def test_negativelen(self): def test_negativelen(self):
# from issue 3306, previously it segfaulted # from issue 3306, previously it segfaulted
self.assertRaises(audioop.error, self.assertRaises(audioop.error,

View File

@ -1,6 +1,7 @@
from test.support import TESTFN from test.support import TESTFN
import unittest import unittest
from test import audiotests from test import audiotests
from audioop import byteswap
import sys import sys
import sunau import sunau
@ -124,7 +125,7 @@ class SunauULAWTest(audiotests.AudioWriteTests,
E5040CBC 617C0A3C 08BC0A3C 2C7C0B3C 517C0E3C 8A8410FC B6840EBC 457C0A3C \ E5040CBC 617C0A3C 08BC0A3C 2C7C0B3C 517C0E3C 8A8410FC B6840EBC 457C0A3C \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap2(frames) frames = byteswap(frames, 2)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,6 +1,7 @@
from test.support import TESTFN from test.support import TESTFN
import unittest import unittest
from test import audiotests from test import audiotests
from audioop import byteswap
import sys import sys
import wave import wave
@ -46,13 +47,7 @@ class WavePCM16Test(audiotests.AudioWriteTests,
E4B50CEB 63440A5A 08CA0A1F 2BBA0B0B 51460E47 8BCB113C B6F50EEA 44150A59 \ E4B50CEB 63440A5A 08CA0A1F 2BBA0B0B 51460E47 8BCB113C B6F50EEA 44150A59 \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap2(frames) frames = byteswap(frames, 2)
if sys.byteorder == 'big':
@unittest.expectedFailure
def test_unseekable_incompleted_write(self):
super().test_unseekable_incompleted_write()
class WavePCM24Test(audiotests.AudioWriteTests, class WavePCM24Test(audiotests.AudioWriteTests,
@ -82,7 +77,7 @@ class WavePCM24Test(audiotests.AudioWriteTests,
51486F0E44E1 8BCC64113B05 B6F4EC0EEB36 4413170A5B48 \ 51486F0E44E1 8BCC64113B05 B6F4EC0EEB36 4413170A5B48 \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap3(frames) frames = byteswap(frames, 3)
class WavePCM32Test(audiotests.AudioWriteTests, class WavePCM32Test(audiotests.AudioWriteTests,
@ -112,12 +107,7 @@ class WavePCM32Test(audiotests.AudioWriteTests,
51486F800E44E190 8BCC6480113B0580 B6F4EC000EEB3630 441317800A5B48A0 \ 51486F800E44E190 8BCC6480113B0580 B6F4EC000EEB3630 441317800A5B48A0 \
""") """)
if sys.byteorder != 'big': if sys.byteorder != 'big':
frames = audiotests.byteswap4(frames) frames = byteswap(frames, 4)
if sys.byteorder == 'big':
@unittest.expectedFailure
def test_unseekable_incompleted_write(self):
super().test_unseekable_incompleted_write()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -82,17 +82,12 @@ WAVE_FORMAT_PCM = 0x0001
_array_fmts = None, 'b', 'h', None, 'i' _array_fmts = None, 'b', 'h', None, 'i'
import audioop
import struct import struct
import sys import sys
from chunk import Chunk from chunk import Chunk
from collections import namedtuple from collections import namedtuple
def _byteswap3(data):
ba = bytearray(data)
ba[::3] = data[2::3]
ba[2::3] = data[::3]
return bytes(ba)
_wave_params = namedtuple('_wave_params', _wave_params = namedtuple('_wave_params',
'nchannels sampwidth framerate nframes comptype compname') 'nchannels sampwidth framerate nframes comptype compname')
@ -243,29 +238,9 @@ class Wave_read:
self._data_seek_needed = 0 self._data_seek_needed = 0
if nframes == 0: if nframes == 0:
return b'' return b''
if self._sampwidth in (2, 4) and sys.byteorder == 'big': data = self._data_chunk.read(nframes * self._framesize)
# unfortunately the fromfile() method does not take if self._sampwidth != 1 and sys.byteorder == 'big':
# something that only looks like a file object, so data = audioop.byteswap(data, self._sampwidth)
# we have to reach into the innards of the chunk object
import array
chunk = self._data_chunk
data = array.array(_array_fmts[self._sampwidth])
assert data.itemsize == self._sampwidth
nitems = nframes * self._nchannels
if nitems * self._sampwidth > chunk.chunksize - chunk.size_read:
nitems = (chunk.chunksize - chunk.size_read) // self._sampwidth
data.fromfile(chunk.file.file, nitems)
# "tell" data chunk how much was read
chunk.size_read = chunk.size_read + nitems * self._sampwidth
# do the same for the outermost chunk
chunk = chunk.file
chunk.size_read = chunk.size_read + nitems * self._sampwidth
data.byteswap()
data = data.tobytes()
else:
data = self._data_chunk.read(nframes * self._framesize)
if self._sampwidth == 3 and sys.byteorder == 'big':
data = _byteswap3(data)
if self._convert and data: if self._convert and data:
data = self._convert(data) data = self._convert(data)
self._soundpos = self._soundpos + len(data) // (self._nchannels * self._sampwidth) self._soundpos = self._soundpos + len(data) // (self._nchannels * self._sampwidth)
@ -441,20 +416,10 @@ class Wave_write:
nframes = len(data) // (self._sampwidth * self._nchannels) nframes = len(data) // (self._sampwidth * self._nchannels)
if self._convert: if self._convert:
data = self._convert(data) data = self._convert(data)
if self._sampwidth in (2, 4) and sys.byteorder == 'big': if self._sampwidth != 1 and sys.byteorder == 'big':
import array data = audioop.byteswap(data, self._sampwidth)
a = array.array(_array_fmts[self._sampwidth]) self._file.write(data)
a.frombytes(data) self._datawritten += len(data)
data = a
assert data.itemsize == self._sampwidth
data.byteswap()
data.tofile(self._file)
self._datawritten = self._datawritten + len(data) * self._sampwidth
else:
if self._sampwidth == 3 and sys.byteorder == 'big':
data = _byteswap3(data)
self._file.write(data)
self._datawritten = self._datawritten + len(data)
self._nframeswritten = self._nframeswritten + nframes self._nframeswritten = self._nframeswritten + nframes
def writeframes(self, data): def writeframes(self, data):

View File

@ -68,6 +68,9 @@ Core and Builtins
Library Library
------- -------
- Issue #19641: Added the audioop.byteswap() function to convert big-endian
samples to little-endian and vice versa.
- Issue #15204: Deprecated the 'U' mode in file-like objects. - Issue #15204: Deprecated the 'U' mode in file-like objects.
- Issue #17810: Implement PEP 3154, pickle protocol 4. - Issue #17810: Implement PEP 3154, pickle protocol 4.

View File

@ -1107,6 +1107,37 @@ audioop_reverse(PyObject *self, PyObject *args)
return rv; return rv;
} }
static PyObject *
audioop_byteswap(PyObject *self, PyObject *args)
{
Py_buffer view;
unsigned char *ncp;
Py_ssize_t i;
int size;
PyObject *rv = NULL;
if (!PyArg_ParseTuple(args, "y*i:swapbytes",
&view, &size))
return NULL;
if (!audioop_check_parameters(view.len, size))
goto exit;
rv = PyBytes_FromStringAndSize(NULL, view.len);
if (rv == NULL)
goto exit;
ncp = (unsigned char *)PyBytes_AsString(rv);
for (i = 0; i < view.len; i += size) {
int j;
for (j = 0; j < size; j++)
ncp[i + size - 1 - j] = ((unsigned char *)view.buf)[i + j];
}
exit:
PyBuffer_Release(&view);
return rv;
}
static PyObject * static PyObject *
audioop_lin2lin(PyObject *self, PyObject *args) audioop_lin2lin(PyObject *self, PyObject *args)
{ {
@ -1698,6 +1729,7 @@ static PyMethodDef audioop_methods[] = {
{ "tostereo", audioop_tostereo, METH_VARARGS }, { "tostereo", audioop_tostereo, METH_VARARGS },
{ "getsample", audioop_getsample, METH_VARARGS }, { "getsample", audioop_getsample, METH_VARARGS },
{ "reverse", audioop_reverse, METH_VARARGS }, { "reverse", audioop_reverse, METH_VARARGS },
{ "byteswap", audioop_byteswap, METH_VARARGS },
{ "ratecv", audioop_ratecv, METH_VARARGS }, { "ratecv", audioop_ratecv, METH_VARARGS },
{ 0, 0 } { 0, 0 }
}; };