From 01ad622a2cd73561b66d0f6287cb5b66de0bb0b3 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Sat, 9 Feb 2013 11:10:53 +0200 Subject: [PATCH] Issue #16686: Fixed a lot of bugs in audioop module. * avgpp() and maxpp() no more crash on empty and 1-samples input fragment. They now work when peak-peak values are greater INT_MAX. * ratecv() no more crashes on empty input fragment. * Fixed an integer overflow in ratecv(). * Fixed an integer overflow in add() and bias() for 32-bit samples. * reverse(), lin2lin() and ratecv() no more lose precision for 32-bit samples. * max() and rms() no more returns negative result for 32-bit sample -0x80000000. * minmax() now returns correct max value for 32-bit sample -0x80000000. * avg(), mul(), tomono() and tostereo() now round negative result down and can return 32-bit sample -0x80000000. * add() now can return 32-bit sample -0x80000000. --- Doc/library/audioop.rst | 6 +- Lib/test/test_audioop.py | 399 ++++++++++++++++++++++++++------------- Misc/NEWS | 6 + Modules/audioop.c | 310 +++++++++++++++--------------- 4 files changed, 435 insertions(+), 286 deletions(-) diff --git a/Doc/library/audioop.rst b/Doc/library/audioop.rst index c9476081b40..d4c0c95614f 100644 --- a/Doc/library/audioop.rst +++ b/Doc/library/audioop.rst @@ -36,7 +36,7 @@ The module defines the following variables and functions: Return a fragment which is the addition of the two samples passed as parameters. *width* is the sample width in bytes, either ``1``, ``2`` or ``4``. Both - fragments should have the same length. + fragments should have the same length. Samples are truncated in case of overflow. .. function:: adpcm2lin(adpcmfragment, width, state) @@ -67,7 +67,7 @@ The module defines the following variables and functions: .. function:: bias(fragment, width, bias) Return a fragment that is the original fragment with a bias added to each - sample. + sample. Samples wrap around in case of overflow. .. function:: cross(fragment, width) @@ -175,7 +175,7 @@ The module defines the following variables and functions: .. function:: mul(fragment, width, factor) Return a fragment that has all samples in the original fragment multiplied by - the floating-point value *factor*. Overflow is silently ignored. + the floating-point value *factor*. Samples are truncated in case of overflow. .. function:: ratecv(fragment, width, nchannels, inrate, outrate, state[, weightA[, weightB]]) diff --git a/Lib/test/test_audioop.py b/Lib/test/test_audioop.py index c14e8b87a85..ff67f2ccc50 100644 --- a/Lib/test/test_audioop.py +++ b/Lib/test/test_audioop.py @@ -1,25 +1,21 @@ import audioop +import sys import unittest from test.support import run_unittest -endian = 'big' if audioop.getsample(b'\0\1', 2, 0) == 1 else 'little' +def pack(width, data): + return b''.join(v.to_bytes(width, sys.byteorder, signed=True) for v in data) -def gendata1(): - return b'\0\1\2' +packs = {w: (lambda *data, width=w: pack(width, data)) for w in (1, 2, 4)} +maxvalues = {w: (1 << (8 * w - 1)) - 1 for w in (1, 2, 4)} +minvalues = {w: -1 << (8 * w - 1) for w in (1, 2, 4)} -def gendata2(): - if endian == 'big': - return b'\0\0\0\1\0\2' - else: - return b'\0\0\1\0\2\0' - -def gendata4(): - if endian == 'big': - return b'\0\0\0\0\0\0\0\1\0\0\0\2' - else: - return b'\0\0\0\0\1\0\0\0\2\0\0\0' - -data = [gendata1(), gendata2(), gendata4()] +datas = { + 1: b'\x00\x12\x45\xbb\x7f\x80\xff', + 2: packs[2](0, 0x1234, 0x4567, -0x4567, 0x7fff, -0x8000, -1), + 4: packs[4](0, 0x12345678, 0x456789ab, -0x456789ab, + 0x7fffffff, -0x80000000, -1), +} INVALID_DATA = [ (b'abc', 0), @@ -31,171 +27,320 @@ INVALID_DATA = [ class TestAudioop(unittest.TestCase): def test_max(self): - self.assertEqual(audioop.max(data[0], 1), 2) - self.assertEqual(audioop.max(data[1], 2), 2) - self.assertEqual(audioop.max(data[2], 4), 2) + for w in 1, 2, 4: + self.assertEqual(audioop.max(b'', w), 0) + p = packs[w] + self.assertEqual(audioop.max(p(5), w), 5) + self.assertEqual(audioop.max(p(5, -8, -1), w), 8) + self.assertEqual(audioop.max(p(maxvalues[w]), w), maxvalues[w]) + self.assertEqual(audioop.max(p(minvalues[w]), w), -minvalues[w]) + self.assertEqual(audioop.max(datas[w], w), -minvalues[w]) def test_minmax(self): - self.assertEqual(audioop.minmax(data[0], 1), (0, 2)) - self.assertEqual(audioop.minmax(data[1], 2), (0, 2)) - self.assertEqual(audioop.minmax(data[2], 4), (0, 2)) + for w in 1, 2, 4: + self.assertEqual(audioop.minmax(b'', w), + (0x7fffffff, -0x80000000)) + p = packs[w] + self.assertEqual(audioop.minmax(p(5), w), (5, 5)) + self.assertEqual(audioop.minmax(p(5, -8, -1), w), (-8, 5)) + self.assertEqual(audioop.minmax(p(maxvalues[w]), w), + (maxvalues[w], maxvalues[w])) + self.assertEqual(audioop.minmax(p(minvalues[w]), w), + (minvalues[w], minvalues[w])) + self.assertEqual(audioop.minmax(datas[w], w), + (minvalues[w], maxvalues[w])) def test_maxpp(self): - self.assertEqual(audioop.maxpp(data[0], 1), 0) - self.assertEqual(audioop.maxpp(data[1], 2), 0) - self.assertEqual(audioop.maxpp(data[2], 4), 0) + for w in 1, 2, 4: + self.assertEqual(audioop.maxpp(b'', w), 0) + self.assertEqual(audioop.maxpp(packs[w](*range(100)), w), 0) + self.assertEqual(audioop.maxpp(packs[w](9, 10, 5, 5, 0, 1), w), 10) + self.assertEqual(audioop.maxpp(datas[w], w), + maxvalues[w] - minvalues[w]) def test_avg(self): - self.assertEqual(audioop.avg(data[0], 1), 1) - self.assertEqual(audioop.avg(data[1], 2), 1) - self.assertEqual(audioop.avg(data[2], 4), 1) + for w in 1, 2, 4: + self.assertEqual(audioop.avg(b'', w), 0) + p = packs[w] + self.assertEqual(audioop.avg(p(5), w), 5) + self .assertEqual(audioop.avg(p(5, 8), w), 6) + self.assertEqual(audioop.avg(p(5, -8), w), -2) + self.assertEqual(audioop.avg(p(maxvalues[w], maxvalues[w]), w), + maxvalues[w]) + self.assertEqual(audioop.avg(p(minvalues[w], minvalues[w]), w), + minvalues[w]) + self.assertEqual(audioop.avg(packs[4](0x50000000, 0x70000000), 4), + 0x60000000) + self.assertEqual(audioop.avg(packs[4](-0x50000000, -0x70000000), 4), + -0x60000000) def test_avgpp(self): - self.assertEqual(audioop.avgpp(data[0], 1), 0) - self.assertEqual(audioop.avgpp(data[1], 2), 0) - self.assertEqual(audioop.avgpp(data[2], 4), 0) + for w in 1, 2, 4: + self.assertEqual(audioop.avgpp(b'', w), 0) + self.assertEqual(audioop.avgpp(packs[w](*range(100)), w), 0) + self.assertEqual(audioop.avgpp(packs[w](9, 10, 5, 5, 0, 1), w), 10) + self.assertEqual(audioop.avgpp(datas[1], 1), 196) + self.assertEqual(audioop.avgpp(datas[2], 2), 50534) + self.assertEqual(audioop.avgpp(datas[4], 4), 3311897002) def test_rms(self): - self.assertEqual(audioop.rms(data[0], 1), 1) - self.assertEqual(audioop.rms(data[1], 2), 1) - self.assertEqual(audioop.rms(data[2], 4), 1) + for w in 1, 2, 4: + self.assertEqual(audioop.rms(b'', w), 0) + p = packs[w] + self.assertEqual(audioop.rms(p(*range(100)), w), 57) + self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w), + maxvalues[w], delta=1) + self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w), + -minvalues[w], delta=1) + self.assertEqual(audioop.rms(datas[1], 1), 77) + self.assertEqual(audioop.rms(datas[2], 2), 20001) + self.assertEqual(audioop.rms(datas[4], 4), 1310854152) def test_cross(self): - self.assertEqual(audioop.cross(data[0], 1), 0) - self.assertEqual(audioop.cross(data[1], 2), 0) - self.assertEqual(audioop.cross(data[2], 4), 0) + for w in 1, 2, 4: + self.assertEqual(audioop.cross(b'', w), -1) + p = packs[w] + self.assertEqual(audioop.cross(p(0, 1, 2), w), 0) + self.assertEqual(audioop.cross(p(1, 2, -3, -4), w), 1) + self.assertEqual(audioop.cross(p(-1, -2, 3, 4), w), 1) + self.assertEqual(audioop.cross(p(0, minvalues[w]), w), 1) + self.assertEqual(audioop.cross(p(minvalues[w], maxvalues[w]), w), 1) def test_add(self): - data2 = [] - for d in data: - str = bytearray(len(d)) - for i,b in enumerate(d): - str[i] = 2*b - data2.append(str) - self.assertEqual(audioop.add(data[0], data[0], 1), data2[0]) - self.assertEqual(audioop.add(data[1], data[1], 2), data2[1]) - self.assertEqual(audioop.add(data[2], data[2], 4), data2[2]) + for w in 1, 2, 4: + self.assertEqual(audioop.add(b'', b'', w), b'') + self.assertEqual(audioop.add(datas[w], b'\0' * len(datas[w]), w), + datas[w]) + self.assertEqual(audioop.add(datas[1], datas[1], 1), + b'\x00\x24\x7f\x80\x7f\x80\xfe') + self.assertEqual(audioop.add(datas[2], datas[2], 2), + packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2)) + self.assertEqual(audioop.add(datas[4], datas[4], 4), + packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000, + 0x7fffffff, -0x80000000, -2)) def test_bias(self): - # Note: this test assumes that avg() works - d1 = audioop.bias(data[0], 1, 100) - d2 = audioop.bias(data[1], 2, 100) - d4 = audioop.bias(data[2], 4, 100) - self.assertEqual(audioop.avg(d1, 1), 101) - self.assertEqual(audioop.avg(d2, 2), 101) - self.assertEqual(audioop.avg(d4, 4), 101) + for w in 1, 2, 4: + for bias in 0, 1, -1, 127, -128, 0x7fffffff, -0x80000000: + self.assertEqual(audioop.bias(b'', w, bias), b'') + self.assertEqual(audioop.bias(datas[1], 1, 1), + b'\x01\x13\x46\xbc\x80\x81\x00') + self.assertEqual(audioop.bias(datas[1], 1, -1), + b'\xff\x11\x44\xba\x7e\x7f\xfe') + self.assertEqual(audioop.bias(datas[1], 1, 0x7fffffff), + b'\xff\x11\x44\xba\x7e\x7f\xfe') + self.assertEqual(audioop.bias(datas[1], 1, -0x80000000), + datas[1]) + self.assertEqual(audioop.bias(datas[2], 2, 1), + packs[2](1, 0x1235, 0x4568, -0x4566, -0x8000, -0x7fff, 0)) + self.assertEqual(audioop.bias(datas[2], 2, -1), + packs[2](-1, 0x1233, 0x4566, -0x4568, 0x7ffe, 0x7fff, -2)) + self.assertEqual(audioop.bias(datas[2], 2, 0x7fffffff), + packs[2](-1, 0x1233, 0x4566, -0x4568, 0x7ffe, 0x7fff, -2)) + self.assertEqual(audioop.bias(datas[2], 2, -0x80000000), + datas[2]) + self.assertEqual(audioop.bias(datas[4], 4, 1), + packs[4](1, 0x12345679, 0x456789ac, -0x456789aa, + -0x80000000, -0x7fffffff, 0)) + self.assertEqual(audioop.bias(datas[4], 4, -1), + packs[4](-1, 0x12345677, 0x456789aa, -0x456789ac, + 0x7ffffffe, 0x7fffffff, -2)) + self.assertEqual(audioop.bias(datas[4], 4, 0x7fffffff), + packs[4](0x7fffffff, -0x6dcba989, -0x3a987656, 0x3a987654, + -2, -1, 0x7ffffffe)) + self.assertEqual(audioop.bias(datas[4], 4, -0x80000000), + packs[4](-0x80000000, -0x6dcba988, -0x3a987655, 0x3a987655, + -1, 0, 0x7fffffff)) def test_lin2lin(self): - # too simple: we test only the size - for d1 in data: - for d2 in data: - got = len(d1)//3 - wtd = len(d2)//3 - self.assertEqual(len(audioop.lin2lin(d1, got, wtd)), len(d2)) + for w in 1, 2, 4: + self.assertEqual(audioop.lin2lin(datas[w], w, w), datas[w]) + + self.assertEqual(audioop.lin2lin(datas[1], 1, 2), + packs[2](0, 0x1200, 0x4500, -0x4500, 0x7f00, -0x8000, -0x100)) + self.assertEqual(audioop.lin2lin(datas[1], 1, 4), + packs[4](0, 0x12000000, 0x45000000, -0x45000000, + 0x7f000000, -0x80000000, -0x1000000)) + self.assertEqual(audioop.lin2lin(datas[2], 2, 1), + b'\x00\x12\x45\xba\x7f\x80\xff') + self.assertEqual(audioop.lin2lin(datas[2], 2, 4), + packs[4](0, 0x12340000, 0x45670000, -0x45670000, + 0x7fff0000, -0x80000000, -0x10000)) + self.assertEqual(audioop.lin2lin(datas[4], 4, 1), + b'\x00\x12\x45\xba\x7f\x80\xff') + self.assertEqual(audioop.lin2lin(datas[4], 4, 2), + packs[2](0, 0x1234, 0x4567, -0x4568, 0x7fff, -0x8000, -1)) def test_adpcm2lin(self): + self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 1, None), + (b'\x00\x00\x00\xff\x00\xff', (-179, 40))) + self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 2, None), + (packs[2](0, 0xb, 0x29, -0x16, 0x72, -0xb3), (-179, 40))) + self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 4, None), + (packs[4](0, 0xb0000, 0x290000, -0x160000, 0x720000, + -0xb30000), (-179, 40))) + # Very cursory test - self.assertEqual(audioop.adpcm2lin(b'\0\0', 1, None), (b'\0' * 4, (0,0))) - self.assertEqual(audioop.adpcm2lin(b'\0\0', 2, None), (b'\0' * 8, (0,0))) - self.assertEqual(audioop.adpcm2lin(b'\0\0', 4, None), (b'\0' * 16, (0,0))) + for w in 1, 2, 4: + self.assertEqual(audioop.adpcm2lin(b'\0' * 5, w, None), + (b'\0' * w * 10, (0, 0))) def test_lin2adpcm(self): + self.assertEqual(audioop.lin2adpcm(datas[1], 1, None), + (b'\x07\x7f\x7f', (-221, 39))) + self.assertEqual(audioop.lin2adpcm(datas[2], 2, None), + (b'\x07\x7f\x7f', (31, 39))) + self.assertEqual(audioop.lin2adpcm(datas[4], 4, None), + (b'\x07\x7f\x7f', (31, 39))) + # Very cursory test - self.assertEqual(audioop.lin2adpcm(b'\0\0\0\0', 1, None), (b'\0\0', (0,0))) + for w in 1, 2, 4: + self.assertEqual(audioop.lin2adpcm(b'\0' * w * 10, w, None), + (b'\0' * 5, (0, 0))) def test_lin2alaw(self): - self.assertEqual(audioop.lin2alaw(data[0], 1), b'\xd5\xc5\xf5') - self.assertEqual(audioop.lin2alaw(data[1], 2), b'\xd5\xd5\xd5') - self.assertEqual(audioop.lin2alaw(data[2], 4), b'\xd5\xd5\xd5') + self.assertEqual(audioop.lin2alaw(datas[1], 1), + b'\xd5\x87\xa4\x24\xaa\x2a\x5a') + self.assertEqual(audioop.lin2alaw(datas[2], 2), + b'\xd5\x87\xa4\x24\xaa\x2a\x55') + self.assertEqual(audioop.lin2alaw(datas[4], 4), + b'\xd5\x87\xa4\x24\xaa\x2a\x55') def test_alaw2lin(self): - # Cursory - d = audioop.lin2alaw(data[0], 1) - self.assertEqual(audioop.alaw2lin(d, 1), data[0]) - if endian == 'big': - self.assertEqual(audioop.alaw2lin(d, 2), - b'\x00\x08\x01\x08\x02\x10') - self.assertEqual(audioop.alaw2lin(d, 4), - b'\x00\x08\x00\x00\x01\x08\x00\x00\x02\x10\x00\x00') - else: - self.assertEqual(audioop.alaw2lin(d, 2), - b'\x08\x00\x08\x01\x10\x02') - self.assertEqual(audioop.alaw2lin(d, 4), - b'\x00\x00\x08\x00\x00\x00\x08\x01\x00\x00\x10\x02') + encoded = b'\x00\x03\x24\x2a\x51\x54\x55\x58\x6b\x71\x7f'\ + b'\x80\x83\xa4\xaa\xd1\xd4\xd5\xd8\xeb\xf1\xff' + src = [-688, -720, -2240, -4032, -9, -3, -1, -27, -244, -82, -106, + 688, 720, 2240, 4032, 9, 3, 1, 27, 244, 82, 106] + for w in 1, 2, 4: + self.assertEqual(audioop.alaw2lin(encoded, w), + packs[w](*(x << (w * 8) >> 13 for x in src))) + + encoded = bytes(range(256)) + for w in 2, 4: + decoded = audioop.alaw2lin(encoded, w) + self.assertEqual(audioop.lin2alaw(decoded, w), encoded) def test_lin2ulaw(self): - self.assertEqual(audioop.lin2ulaw(data[0], 1), b'\xff\xe7\xdb') - self.assertEqual(audioop.lin2ulaw(data[1], 2), b'\xff\xff\xff') - self.assertEqual(audioop.lin2ulaw(data[2], 4), b'\xff\xff\xff') + self.assertEqual(audioop.lin2ulaw(datas[1], 1), + b'\xff\xad\x8e\x0e\x80\x00\x67') + self.assertEqual(audioop.lin2ulaw(datas[2], 2), + b'\xff\xad\x8e\x0e\x80\x00\x7e') + self.assertEqual(audioop.lin2ulaw(datas[4], 4), + b'\xff\xad\x8e\x0e\x80\x00\x7e') def test_ulaw2lin(self): - # Cursory - d = audioop.lin2ulaw(data[0], 1) - self.assertEqual(audioop.ulaw2lin(d, 1), data[0]) - if endian == 'big': - self.assertEqual(audioop.ulaw2lin(d, 2), - b'\x00\x00\x01\x04\x02\x0c') - self.assertEqual(audioop.ulaw2lin(d, 4), - b'\x00\x00\x00\x00\x01\x04\x00\x00\x02\x0c\x00\x00') - else: - self.assertEqual(audioop.ulaw2lin(d, 2), - b'\x00\x00\x04\x01\x0c\x02') - self.assertEqual(audioop.ulaw2lin(d, 4), - b'\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x0c\x02') + encoded = b'\x00\x0e\x28\x3f\x57\x6a\x76\x7c\x7e\x7f'\ + b'\x80\x8e\xa8\xbf\xd7\xea\xf6\xfc\xfe\xff' + src = [-8031, -4447, -1471, -495, -163, -53, -18, -6, -2, 0, + 8031, 4447, 1471, 495, 163, 53, 18, 6, 2, 0] + for w in 1, 2, 4: + self.assertEqual(audioop.ulaw2lin(encoded, w), + packs[w](*(x << (w * 8) >> 14 for x in src))) + + # Current u-law implementation has two codes fo 0: 0x7f and 0xff. + encoded = bytes(range(127)) + bytes(range(128, 256)) + for w in 2, 4: + decoded = audioop.ulaw2lin(encoded, w) + self.assertEqual(audioop.lin2ulaw(decoded, w), encoded) def test_mul(self): - data2 = [] - for d in data: - str = bytearray(len(d)) - for i,b in enumerate(d): - str[i] = 2*b - data2.append(str) - self.assertEqual(audioop.mul(data[0], 1, 2), data2[0]) - self.assertEqual(audioop.mul(data[1],2, 2), data2[1]) - self.assertEqual(audioop.mul(data[2], 4, 2), data2[2]) + for w in 1, 2, 4: + self.assertEqual(audioop.mul(b'', w, 2), b'') + self.assertEqual(audioop.mul(datas[w], w, 0), + b'\0' * len(datas[w])) + self.assertEqual(audioop.mul(datas[w], w, 1), + datas[w]) + self.assertEqual(audioop.mul(datas[1], 1, 2), + b'\x00\x24\x7f\x80\x7f\x80\xfe') + self.assertEqual(audioop.mul(datas[2], 2, 2), + packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2)) + self.assertEqual(audioop.mul(datas[4], 4, 2), + packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000, + 0x7fffffff, -0x80000000, -2)) def test_ratecv(self): + for w in 1, 2, 4: + self.assertEqual(audioop.ratecv(b'', w, 1, 8000, 8000, None), + (b'', (-1, ((0, 0),)))) + self.assertEqual(audioop.ratecv(b'', w, 5, 8000, 8000, None), + (b'', (-1, ((0, 0),) * 5))) + self.assertEqual(audioop.ratecv(b'', w, 1, 8000, 16000, None), + (b'', (-2, ((0, 0),)))) + self.assertEqual(audioop.ratecv(datas[w], w, 1, 8000, 8000, None)[0], + datas[w]) state = None - d1, state = audioop.ratecv(data[0], 1, 1, 8000, 16000, state) - d2, state = audioop.ratecv(data[0], 1, 1, 8000, 16000, state) + d1, state = audioop.ratecv(b'\x00\x01\x02', 1, 1, 8000, 16000, state) + d2, state = audioop.ratecv(b'\x00\x01\x02', 1, 1, 8000, 16000, state) self.assertEqual(d1 + d2, b'\000\000\001\001\002\001\000\000\001\001\002') + for w in 1, 2, 4: + d0, state0 = audioop.ratecv(datas[w], w, 1, 8000, 16000, None) + d, state = b'', None + for i in range(0, len(datas[w]), w): + d1, state = audioop.ratecv(datas[w][i:i + w], w, 1, + 8000, 16000, state) + d += d1 + self.assertEqual(d, d0) + self.assertEqual(state, state0) + def test_reverse(self): - self.assertEqual(audioop.reverse(data[0], 1), b'\2\1\0') + for w in 1, 2, 4: + self.assertEqual(audioop.reverse(b'', w), b'') + self.assertEqual(audioop.reverse(packs[w](0, 1, 2), w), + packs[w](2, 1, 0)) def test_tomono(self): - data2 = bytearray() - for d in data[0]: - data2.append(d) - data2.append(d) - self.assertEqual(audioop.tomono(data2, 1, 0.5, 0.5), data[0]) + for w in 1, 2, 4: + data1 = datas[w] + data2 = bytearray(2 * len(data1)) + for k in range(w): + data2[k::2*w] = data1[k::w] + self.assertEqual(audioop.tomono(data2, w, 1, 0), data1) + self.assertEqual(audioop.tomono(data2, w, 0, 1), b'\0' * len(data1)) + for k in range(w): + data2[k+w::2*w] = data1[k::w] + self.assertEqual(audioop.tomono(data2, w, 0.5, 0.5), data1) def test_tostereo(self): - data2 = bytearray() - for d in data[0]: - data2.append(d) - data2.append(d) - self.assertEqual(audioop.tostereo(data[0], 1, 1, 1), data2) + for w in 1, 2, 4: + data1 = datas[w] + data2 = bytearray(2 * len(data1)) + for k in range(w): + data2[k::2*w] = data1[k::w] + self.assertEqual(audioop.tostereo(data1, w, 1, 0), data2) + self.assertEqual(audioop.tostereo(data1, w, 0, 0), b'\0' * len(data2)) + for k in range(w): + data2[k+w::2*w] = data1[k::w] + self.assertEqual(audioop.tostereo(data1, w, 1, 1), data2) def test_findfactor(self): - self.assertEqual(audioop.findfactor(data[1], data[1]), 1.0) + self.assertEqual(audioop.findfactor(datas[2], datas[2]), 1.0) + self.assertEqual(audioop.findfactor(b'\0' * len(datas[2]), datas[2]), + 0.0) def test_findfit(self): - self.assertEqual(audioop.findfit(data[1], data[1]), (0, 1.0)) + self.assertEqual(audioop.findfit(datas[2], datas[2]), (0, 1.0)) + self.assertEqual(audioop.findfit(datas[2], packs[2](1, 2, 0)), + (1, 8038.8)) + self.assertEqual(audioop.findfit(datas[2][:-2] * 5 + datas[2], datas[2]), + (30, 1.0)) def test_findmax(self): - self.assertEqual(audioop.findmax(data[1], 1), 2) + self.assertEqual(audioop.findmax(datas[2], 1), 5) def test_getsample(self): - for i in range(3): - self.assertEqual(audioop.getsample(data[0], 1, i), i) - self.assertEqual(audioop.getsample(data[1], 2, i), i) - self.assertEqual(audioop.getsample(data[2], 4, i), i) + for w in 1, 2, 4: + data = packs[w](0, 1, -1, maxvalues[w], minvalues[w]) + self.assertEqual(audioop.getsample(data, w, 0), 0) + self.assertEqual(audioop.getsample(data, w, 1), 1) + self.assertEqual(audioop.getsample(data, w, 2), -1) + self.assertEqual(audioop.getsample(data, w, 3), maxvalues[w]) + self.assertEqual(audioop.getsample(data, w, 4), minvalues[w]) def test_negativelen(self): # from issue 3306, previously it segfaulted self.assertRaises(audioop.error, - audioop.findmax, ''.join(chr(x) for x in range(256)), -2392392) + audioop.findmax, bytes(range(256)), -2392392) def test_issue7673(self): state = None @@ -222,9 +367,9 @@ class TestAudioop(unittest.TestCase): self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state) def test_wrongsize(self): - data = b'abc' + data = b'abcdefgh' state = None - for size in (-1, 3, 5): + for size in (-1, 0, 3, 5, 1024): self.assertRaises(audioop.error, audioop.ulaw2lin, data, size) self.assertRaises(audioop.error, audioop.alaw2lin, data, size) self.assertRaises(audioop.error, audioop.adpcm2lin, data, size, state) diff --git a/Misc/NEWS b/Misc/NEWS index dc044e365cc..036d71226cf 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -215,6 +215,12 @@ Core and Builtins Library ------- +- Issue #16686: Fixed a lot of bugs in audioop module. Fixed crashes in + avgpp(), maxpp() and ratecv(). Fixed an integer overflow in add(), bias(), + and ratecv(). reverse(), lin2lin() and ratecv() no more lose precision for + 32-bit samples. max() and rms() no more returns a negative result and + various other functions now work correctly with 32-bit sample -0x80000000. + - Issue #17073: Fix some integer overflows in sqlite3 module. - Issue #17114: IDLE now uses non-strict config parser. diff --git a/Modules/audioop.c b/Modules/audioop.c index 9ab4834e29c..5c6925870d9 100644 --- a/Modules/audioop.c +++ b/Modules/audioop.c @@ -26,6 +26,21 @@ typedef short PyInt16; #endif #endif +static const int maxvals[] = {0, 0x7F, 0x7FFF, 0x7FFFFF, 0x7FFFFFFF}; +static const int minvals[] = {0, -0x80, -0x8000, -0x800000, -0x80000000}; +static const unsigned int masks[] = {0, 0xFF, 0xFFFF, 0xFFFFFF, 0xFFFFFFFF}; + +static int +fbound(double val, double minval, double maxval) +{ + if (val > maxval) + val = maxval; + else if (val < minval + 1) + val = minval; + return val; +} + + /* Code shamelessly stolen from sox, 12.17.7, g711.c ** (c) Craig Reese, Joe Campbell and Jeff Poskanzer 1989 */ @@ -347,7 +362,7 @@ audioop_max(PyObject *self, PyObject *args) signed char *cp; Py_ssize_t len, i; int size, val = 0; - int max = 0; + unsigned int absval, max = 0; if ( !PyArg_ParseTuple(args, "s#i:max", &cp, &len, &size) ) return 0; @@ -357,10 +372,11 @@ audioop_max(PyObject *self, PyObject *args) if ( size == 1 ) val = (int)*CHARP(cp, i); else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i); - if ( val < 0 ) val = (-val); - if ( val > max ) max = val; + if (val < 0) absval = (-val); + else absval = val; + if (absval > max) max = absval; } - return PyLong_FromLong(max); + return PyLong_FromUnsignedLong(max); } static PyObject * @@ -369,7 +385,7 @@ audioop_minmax(PyObject *self, PyObject *args) signed char *cp; Py_ssize_t len, i; int size, val = 0; - int min = 0x7fffffff, max = -0x7fffffff; + int min = 0x7fffffff, max = -0x80000000; if (!PyArg_ParseTuple(args, "s#i:minmax", &cp, &len, &size)) return NULL; @@ -406,7 +422,7 @@ audioop_avg(PyObject *self, PyObject *args) if ( len == 0 ) val = 0; else - val = (int)(avg / (double)(len/size)); + val = (int)floor(avg / (double)(len/size)); return PyLong_FromLong(val); } @@ -416,6 +432,7 @@ audioop_rms(PyObject *self, PyObject *args) signed char *cp; Py_ssize_t len, i; int size, val = 0; + unsigned int res; double sum_squares = 0.0; if ( !PyArg_ParseTuple(args, "s#i:rms", &cp, &len, &size) ) @@ -429,10 +446,10 @@ audioop_rms(PyObject *self, PyObject *args) sum_squares += (double)val*(double)val; } if ( len == 0 ) - val = 0; + res = 0; else - val = (int)sqrt(sum_squares / (double)(len/size)); - return PyLong_FromLong(val); + res = (unsigned int)sqrt(sum_squares / (double)(len/size)); + return PyLong_FromUnsignedLong(res); } static double _sum2(short *a, short *b, Py_ssize_t len) @@ -624,52 +641,46 @@ audioop_avgpp(PyObject *self, PyObject *args) Py_ssize_t len, i; int size, val = 0, prevval = 0, prevextremevalid = 0, prevextreme = 0; - double avg = 0.0; - int diff, prevdiff, extremediff, nextreme = 0; + double sum = 0.0; + unsigned int avg; + int diff, prevdiff, nextreme = 0; if ( !PyArg_ParseTuple(args, "s#i:avgpp", &cp, &len, &size) ) return 0; if (!audioop_check_parameters(len, size)) return NULL; - /* Compute first delta value ahead. Also automatically makes us - ** skip the first extreme value - */ + if (len <= size) + return PyLong_FromLong(0); if ( size == 1 ) prevval = (int)*CHARP(cp, 0); else if ( size == 2 ) prevval = (int)*SHORTP(cp, 0); else if ( size == 4 ) prevval = (int)*LONGP(cp, 0); - if ( size == 1 ) val = (int)*CHARP(cp, size); - else if ( size == 2 ) val = (int)*SHORTP(cp, size); - else if ( size == 4 ) val = (int)*LONGP(cp, size); - prevdiff = val - prevval; - + prevdiff = 17; /* Anything != 0, 1 */ for ( i=size; i max ) - max = extremediff; + if (val != prevval) { + diff = val < prevval; + if (prevdiff == !diff) { + /* Derivative changed sign. Compute difference to + ** last extreme value and remember. + */ + if (prevextremevalid) { + if (prevval < prevextreme) + extremediff = (unsigned int)prevextreme - + (unsigned int)prevval; + else + extremediff = (unsigned int)prevval - + (unsigned int)prevextreme; + if ( extremediff > max ) + max = extremediff; + } + prevextremevalid = 1; + prevextreme = prevval; } - prevextremevalid = 1; - prevextreme = prevval; - } - prevval = val; - if ( diff != 0 ) + prevval = val; prevdiff = diff; + } } - return PyLong_FromLong(max); + return PyLong_FromUnsignedLong(max); } static PyObject * @@ -755,7 +765,7 @@ audioop_mul(PyObject *self, PyObject *args) signed char *cp, *ncp; Py_ssize_t len, i; int size, val = 0; - double factor, fval, maxval; + double factor, fval, maxval, minval; PyObject *rv; if ( !PyArg_ParseTuple(args, "s#id:mul", &cp, &len, &size, &factor ) ) @@ -763,13 +773,8 @@ audioop_mul(PyObject *self, PyObject *args) if (!audioop_check_parameters(len, size)) return NULL; - if ( size == 1 ) maxval = (double) 0x7f; - else if ( size == 2 ) maxval = (double) 0x7fff; - else if ( size == 4 ) maxval = (double) 0x7fffffff; - else { - PyErr_SetString(AudioopError, "Size should be 1, 2 or 4"); - return 0; - } + maxval = (double) maxvals[size]; + minval = (double) minvals[size]; rv = PyBytes_FromStringAndSize(NULL, len); if ( rv == 0 ) @@ -782,9 +787,7 @@ audioop_mul(PyObject *self, PyObject *args) else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i); fval = (double)val*factor; - if ( fval > maxval ) fval = maxval; - else if ( fval < -maxval ) fval = -maxval; - val = (int)fval; + val = (int)floor(fbound(fval, minval, maxval)); if ( size == 1 ) *CHARP(ncp, i) = (signed char)val; else if ( size == 2 ) *SHORTP(ncp, i) = (short)val; else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)val; @@ -799,7 +802,7 @@ audioop_tomono(PyObject *self, PyObject *args) signed char *cp, *ncp; Py_ssize_t len, i; int size, val1 = 0, val2 = 0; - double fac1, fac2, fval, maxval; + double fac1, fac2, fval, maxval, minval; PyObject *rv; if ( !PyArg_ParseTuple(args, "s*idd:tomono", @@ -817,14 +820,8 @@ audioop_tomono(PyObject *self, PyObject *args) return NULL; } - if ( size == 1 ) maxval = (double) 0x7f; - else if ( size == 2 ) maxval = (double) 0x7fff; - else if ( size == 4 ) maxval = (double) 0x7fffffff; - else { - PyBuffer_Release(&pcp); - PyErr_SetString(AudioopError, "Size should be 1, 2 or 4"); - return 0; - } + maxval = (double) maxvals[size]; + minval = (double) minvals[size]; rv = PyBytes_FromStringAndSize(NULL, len/2); if ( rv == 0 ) { @@ -842,9 +839,7 @@ audioop_tomono(PyObject *self, PyObject *args) else if ( size == 2 ) val2 = (int)*SHORTP(cp, i+2); else if ( size == 4 ) val2 = (int)*LONGP(cp, i+4); fval = (double)val1*fac1 + (double)val2*fac2; - if ( fval > maxval ) fval = maxval; - else if ( fval < -maxval ) fval = -maxval; - val1 = (int)fval; + val1 = (int)floor(fbound(fval, minval, maxval)); if ( size == 1 ) *CHARP(ncp, i/2) = (signed char)val1; else if ( size == 2 ) *SHORTP(ncp, i/2) = (short)val1; else if ( size == 4 ) *LONGP(ncp, i/2)= (Py_Int32)val1; @@ -859,7 +854,7 @@ audioop_tostereo(PyObject *self, PyObject *args) signed char *cp, *ncp; Py_ssize_t len, i; int size, val1, val2, val = 0; - double fac1, fac2, fval, maxval; + double fac1, fac2, fval, maxval, minval; PyObject *rv; if ( !PyArg_ParseTuple(args, "s#idd:tostereo", @@ -868,13 +863,8 @@ audioop_tostereo(PyObject *self, PyObject *args) if (!audioop_check_parameters(len, size)) return NULL; - if ( size == 1 ) maxval = (double) 0x7f; - else if ( size == 2 ) maxval = (double) 0x7fff; - else if ( size == 4 ) maxval = (double) 0x7fffffff; - else { - PyErr_SetString(AudioopError, "Size should be 1, 2 or 4"); - return 0; - } + maxval = (double) maxvals[size]; + minval = (double) minvals[size]; if (len > PY_SSIZE_T_MAX/2) { PyErr_SetString(PyExc_MemoryError, @@ -894,14 +884,10 @@ audioop_tostereo(PyObject *self, PyObject *args) else if ( size == 4 ) val = (int)*LONGP(cp, i); fval = (double)val*fac1; - if ( fval > maxval ) fval = maxval; - else if ( fval < -maxval ) fval = -maxval; - val1 = (int)fval; + val1 = (int)floor(fbound(fval, minval, maxval)); fval = (double)val*fac2; - if ( fval > maxval ) fval = maxval; - else if ( fval < -maxval ) fval = -maxval; - val2 = (int)fval; + val2 = (int)floor(fbound(fval, minval, maxval)); if ( size == 1 ) *CHARP(ncp, i*2) = (signed char)val1; else if ( size == 2 ) *SHORTP(ncp, i*2) = (short)val1; @@ -919,7 +905,7 @@ audioop_add(PyObject *self, PyObject *args) { signed char *cp1, *cp2, *ncp; Py_ssize_t len1, len2, i; - int size, val1 = 0, val2 = 0, maxval, newval; + int size, val1 = 0, val2 = 0, minval, maxval, newval; PyObject *rv; if ( !PyArg_ParseTuple(args, "s#s#i:add", @@ -932,13 +918,8 @@ audioop_add(PyObject *self, PyObject *args) return 0; } - if ( size == 1 ) maxval = 0x7f; - else if ( size == 2 ) maxval = 0x7fff; - else if ( size == 4 ) maxval = 0x7fffffff; - else { - PyErr_SetString(AudioopError, "Size should be 1, 2 or 4"); - return 0; - } + maxval = maxvals[size]; + minval = minvals[size]; rv = PyBytes_FromStringAndSize(NULL, len1); if ( rv == 0 ) @@ -954,12 +935,19 @@ audioop_add(PyObject *self, PyObject *args) else if ( size == 2 ) val2 = (int)*SHORTP(cp2, i); else if ( size == 4 ) val2 = (int)*LONGP(cp2, i); - newval = val1 + val2; - /* truncate in case of overflow */ - if (newval > maxval) newval = maxval; - else if (newval < -maxval) newval = -maxval; - else if (size == 4 && (newval^val1) < 0 && (newval^val2) < 0) - newval = val1 > 0 ? maxval : - maxval; + if (size < 4) { + newval = val1 + val2; + /* truncate in case of overflow */ + if (newval > maxval) + newval = maxval; + else if (newval < minval) + newval = minval; + } + else { + double fval = (double)val1 + (double)val2; + /* truncate in case of overflow */ + newval = (int)floor(fbound(fval, minval, maxval)); + } if ( size == 1 ) *CHARP(ncp, i) = (signed char)newval; else if ( size == 2 ) *SHORTP(ncp, i) = (short)newval; @@ -973,9 +961,9 @@ audioop_bias(PyObject *self, PyObject *args) { signed char *cp, *ncp; Py_ssize_t len, i; - int size, val = 0; + int size, bias; + unsigned int val = 0, mask; PyObject *rv; - int bias; if ( !PyArg_ParseTuple(args, "s#ii:bias", &cp, &len, &size , &bias) ) @@ -989,15 +977,20 @@ audioop_bias(PyObject *self, PyObject *args) return 0; ncp = (signed char *)PyBytes_AsString(rv); + mask = masks[size]; for ( i=0; i < len; i += size ) { - if ( size == 1 ) val = (int)*CHARP(cp, i); - else if ( size == 2 ) val = (int)*SHORTP(cp, i); - else if ( size == 4 ) val = (int)*LONGP(cp, i); + if ( size == 1 ) val = (unsigned int)(unsigned char)*CHARP(cp, i); + else if ( size == 2 ) val = (unsigned int)(unsigned short)*SHORTP(cp, i); + else if ( size == 4 ) val = (unsigned int)(Py_UInt32)*LONGP(cp, i); - if ( size == 1 ) *CHARP(ncp, i) = (signed char)(val+bias); - else if ( size == 2 ) *SHORTP(ncp, i) = (short)(val+bias); - else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)(val+bias); + val += (unsigned int)bias; + /* wrap around in case of overflow */ + val &= mask; + + if ( size == 1 ) *CHARP(ncp, i) = (signed char)(unsigned char)val; + else if ( size == 2 ) *SHORTP(ncp, i) = (short)(unsigned short)val; + else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)(Py_UInt32)val; } return rv; } @@ -1024,15 +1017,15 @@ audioop_reverse(PyObject *self, PyObject *args) ncp = (unsigned char *)PyBytes_AsString(rv); for ( i=0; i < len; i += size ) { - if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 8; - else if ( size == 2 ) val = (int)*SHORTP(cp, i); - else if ( size == 4 ) val = ((int)*LONGP(cp, i)) >> 16; + if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 24; + else if ( size == 2 ) val = ((int)*SHORTP(cp, i)) << 16; + else if ( size == 4 ) val = (int)*LONGP(cp, i); j = len - i - size; - if ( size == 1 ) *CHARP(ncp, j) = (signed char)(val >> 8); - else if ( size == 2 ) *SHORTP(ncp, j) = (short)(val); - else if ( size == 4 ) *LONGP(ncp, j) = (Py_Int32)(val<<16); + if ( size == 1 ) *CHARP(ncp, j) = (signed char)(val >> 24); + else if ( size == 2 ) *SHORTP(ncp, j) = (short)(val >> 16); + else if ( size == 4 ) *LONGP(ncp, j) = (Py_Int32)val; } return rv; } @@ -1066,13 +1059,13 @@ audioop_lin2lin(PyObject *self, PyObject *args) ncp = (unsigned char *)PyBytes_AsString(rv); for ( i=0, j=0; i < len; i += size, j += size2 ) { - if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 8; - else if ( size == 2 ) val = (int)*SHORTP(cp, i); - else if ( size == 4 ) val = ((int)*LONGP(cp, i)) >> 16; + if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 24; + else if ( size == 2 ) val = ((int)*SHORTP(cp, i)) << 16; + else if ( size == 4 ) val = (int)*LONGP(cp, i); - if ( size2 == 1 ) *CHARP(ncp, j) = (signed char)(val >> 8); - else if ( size2 == 2 ) *SHORTP(ncp, j) = (short)(val); - else if ( size2 == 4 ) *LONGP(ncp, j) = (Py_Int32)(val<<16); + if ( size2 == 1 ) *CHARP(ncp, j) = (signed char)(val >> 24); + else if ( size2 == 2 ) *SHORTP(ncp, j) = (short)(val >> 16); + else if ( size2 == 4 ) *LONGP(ncp, j) = (Py_Int32)val; } return rv; } @@ -1136,6 +1129,10 @@ audioop_ratecv(PyObject *self, PyObject *args) d = gcd(inrate, outrate); inrate /= d; outrate /= d; + /* divide weightA and weightB by their greatest common divisor */ + d = gcd(weightA, weightB); + weightA /= d; + weightA /= d; if ((size_t)nchannels > PY_SIZE_MAX/sizeof(int)) { PyErr_SetString(PyExc_MemoryError, @@ -1175,7 +1172,9 @@ audioop_ratecv(PyObject *self, PyObject *args) } /* str <- Space for the output buffer. */ - { + if (len == 0) + str = PyBytes_FromStringAndSize(NULL, 0); + else { /* There are len input frames, so we need (mathematically) ceiling(len*outrate/inrate) output frames, and each frame requires bytes_per_frame bytes. Computing this @@ -1190,12 +1189,11 @@ audioop_ratecv(PyObject *self, PyObject *args) else str = PyBytes_FromStringAndSize(NULL, q * outrate * bytes_per_frame); - - if (str == NULL) { - PyErr_SetString(PyExc_MemoryError, - "not enough memory for output buffer"); - goto exit; - } + } + if (str == NULL) { + PyErr_SetString(PyExc_MemoryError, + "not enough memory for output buffer"); + goto exit; } ncp = PyBytes_AsString(str); @@ -1229,32 +1227,32 @@ audioop_ratecv(PyObject *self, PyObject *args) for (chan = 0; chan < nchannels; chan++) { prev_i[chan] = cur_i[chan]; if (size == 1) - cur_i[chan] = ((int)*CHARP(cp, 0)) << 8; + cur_i[chan] = ((int)*CHARP(cp, 0)) << 24; else if (size == 2) - cur_i[chan] = (int)*SHORTP(cp, 0); + cur_i[chan] = ((int)*SHORTP(cp, 0)) << 16; else if (size == 4) - cur_i[chan] = ((int)*LONGP(cp, 0)) >> 16; + cur_i[chan] = (int)*LONGP(cp, 0); cp += size; /* implements a simple digital filter */ - cur_i[chan] = - (weightA * cur_i[chan] + - weightB * prev_i[chan]) / - (weightA + weightB); + cur_i[chan] = (int)( + ((double)weightA * (double)cur_i[chan] + + (double)weightB * (double)prev_i[chan]) / + ((double)weightA + (double)weightB)); } len--; d += outrate; } while (d >= 0) { for (chan = 0; chan < nchannels; chan++) { - cur_o = (prev_i[chan] * d + - cur_i[chan] * (outrate - d)) / - outrate; + cur_o = (int)(((double)prev_i[chan] * (double)d + + (double)cur_i[chan] * (double)(outrate - d)) / + (double)outrate); if (size == 1) - *CHARP(ncp, 0) = (signed char)(cur_o >> 8); + *CHARP(ncp, 0) = (signed char)(cur_o >> 24); else if (size == 2) - *SHORTP(ncp, 0) = (short)(cur_o); + *SHORTP(ncp, 0) = (short)(cur_o >> 16); else if (size == 4) - *LONGP(ncp, 0) = (Py_Int32)(cur_o<<16); + *LONGP(ncp, 0) = (Py_Int32)(cur_o); ncp += size; } d -= inrate;