Issue #16686: Fixed a lot of bugs in audioop module.

* avgpp() and maxpp() no more crash on empty and 1-samples input fragment. They now work when peak-peak values are greater INT_MAX.
* ratecv() no more crashes on empty input fragment.
* Fixed an integer overflow in ratecv().
* Fixed an integer overflow in add() and bias() for 32-bit samples.
* reverse(), lin2lin() and ratecv() no more lose precision for 32-bit samples.
* max() and rms() no more returns negative result for 32-bit sample -0x80000000.
* minmax() now returns correct max value for 32-bit sample -0x80000000.
* avg(), mul(), tomono() and tostereo() now round negative result down and can return 32-bit sample -0x80000000.
* add() now can return 32-bit sample -0x80000000.
This commit is contained in:
Serhiy Storchaka 2013-02-09 11:10:53 +02:00
parent a48b61f8f2
commit 01ad622a2c
4 changed files with 435 additions and 286 deletions

View File

@ -36,7 +36,7 @@ The module defines the following variables and functions:
Return a fragment which is the addition of the two samples passed as parameters. Return a fragment which is the addition of the two samples passed as parameters.
*width* is the sample width in bytes, either ``1``, ``2`` or ``4``. Both *width* is the sample width in bytes, either ``1``, ``2`` or ``4``. Both
fragments should have the same length. fragments should have the same length. Samples are truncated in case of overflow.
.. function:: adpcm2lin(adpcmfragment, width, state) .. function:: adpcm2lin(adpcmfragment, width, state)
@ -67,7 +67,7 @@ The module defines the following variables and functions:
.. function:: bias(fragment, width, bias) .. function:: bias(fragment, width, bias)
Return a fragment that is the original fragment with a bias added to each Return a fragment that is the original fragment with a bias added to each
sample. sample. Samples wrap around in case of overflow.
.. function:: cross(fragment, width) .. function:: cross(fragment, width)
@ -175,7 +175,7 @@ The module defines the following variables and functions:
.. function:: mul(fragment, width, factor) .. function:: mul(fragment, width, factor)
Return a fragment that has all samples in the original fragment multiplied by Return a fragment that has all samples in the original fragment multiplied by
the floating-point value *factor*. Overflow is silently ignored. the floating-point value *factor*. Samples are truncated in case of overflow.
.. function:: ratecv(fragment, width, nchannels, inrate, outrate, state[, weightA[, weightB]]) .. function:: ratecv(fragment, width, nchannels, inrate, outrate, state[, weightA[, weightB]])

View File

@ -1,25 +1,21 @@
import audioop import audioop
import sys
import unittest import unittest
from test.support import run_unittest from test.support import run_unittest
endian = 'big' if audioop.getsample(b'\0\1', 2, 0) == 1 else 'little' def pack(width, data):
return b''.join(v.to_bytes(width, sys.byteorder, signed=True) for v in data)
def gendata1(): packs = {w: (lambda *data, width=w: pack(width, data)) for w in (1, 2, 4)}
return b'\0\1\2' maxvalues = {w: (1 << (8 * w - 1)) - 1 for w in (1, 2, 4)}
minvalues = {w: -1 << (8 * w - 1) for w in (1, 2, 4)}
def gendata2(): datas = {
if endian == 'big': 1: b'\x00\x12\x45\xbb\x7f\x80\xff',
return b'\0\0\0\1\0\2' 2: packs[2](0, 0x1234, 0x4567, -0x4567, 0x7fff, -0x8000, -1),
else: 4: packs[4](0, 0x12345678, 0x456789ab, -0x456789ab,
return b'\0\0\1\0\2\0' 0x7fffffff, -0x80000000, -1),
}
def gendata4():
if endian == 'big':
return b'\0\0\0\0\0\0\0\1\0\0\0\2'
else:
return b'\0\0\0\0\1\0\0\0\2\0\0\0'
data = [gendata1(), gendata2(), gendata4()]
INVALID_DATA = [ INVALID_DATA = [
(b'abc', 0), (b'abc', 0),
@ -31,171 +27,320 @@ INVALID_DATA = [
class TestAudioop(unittest.TestCase): class TestAudioop(unittest.TestCase):
def test_max(self): def test_max(self):
self.assertEqual(audioop.max(data[0], 1), 2) for w in 1, 2, 4:
self.assertEqual(audioop.max(data[1], 2), 2) self.assertEqual(audioop.max(b'', w), 0)
self.assertEqual(audioop.max(data[2], 4), 2) p = packs[w]
self.assertEqual(audioop.max(p(5), w), 5)
self.assertEqual(audioop.max(p(5, -8, -1), w), 8)
self.assertEqual(audioop.max(p(maxvalues[w]), w), maxvalues[w])
self.assertEqual(audioop.max(p(minvalues[w]), w), -minvalues[w])
self.assertEqual(audioop.max(datas[w], w), -minvalues[w])
def test_minmax(self): def test_minmax(self):
self.assertEqual(audioop.minmax(data[0], 1), (0, 2)) for w in 1, 2, 4:
self.assertEqual(audioop.minmax(data[1], 2), (0, 2)) self.assertEqual(audioop.minmax(b'', w),
self.assertEqual(audioop.minmax(data[2], 4), (0, 2)) (0x7fffffff, -0x80000000))
p = packs[w]
self.assertEqual(audioop.minmax(p(5), w), (5, 5))
self.assertEqual(audioop.minmax(p(5, -8, -1), w), (-8, 5))
self.assertEqual(audioop.minmax(p(maxvalues[w]), w),
(maxvalues[w], maxvalues[w]))
self.assertEqual(audioop.minmax(p(minvalues[w]), w),
(minvalues[w], minvalues[w]))
self.assertEqual(audioop.minmax(datas[w], w),
(minvalues[w], maxvalues[w]))
def test_maxpp(self): def test_maxpp(self):
self.assertEqual(audioop.maxpp(data[0], 1), 0) for w in 1, 2, 4:
self.assertEqual(audioop.maxpp(data[1], 2), 0) self.assertEqual(audioop.maxpp(b'', w), 0)
self.assertEqual(audioop.maxpp(data[2], 4), 0) self.assertEqual(audioop.maxpp(packs[w](*range(100)), w), 0)
self.assertEqual(audioop.maxpp(packs[w](9, 10, 5, 5, 0, 1), w), 10)
self.assertEqual(audioop.maxpp(datas[w], w),
maxvalues[w] - minvalues[w])
def test_avg(self): def test_avg(self):
self.assertEqual(audioop.avg(data[0], 1), 1) for w in 1, 2, 4:
self.assertEqual(audioop.avg(data[1], 2), 1) self.assertEqual(audioop.avg(b'', w), 0)
self.assertEqual(audioop.avg(data[2], 4), 1) p = packs[w]
self.assertEqual(audioop.avg(p(5), w), 5)
self .assertEqual(audioop.avg(p(5, 8), w), 6)
self.assertEqual(audioop.avg(p(5, -8), w), -2)
self.assertEqual(audioop.avg(p(maxvalues[w], maxvalues[w]), w),
maxvalues[w])
self.assertEqual(audioop.avg(p(minvalues[w], minvalues[w]), w),
minvalues[w])
self.assertEqual(audioop.avg(packs[4](0x50000000, 0x70000000), 4),
0x60000000)
self.assertEqual(audioop.avg(packs[4](-0x50000000, -0x70000000), 4),
-0x60000000)
def test_avgpp(self): def test_avgpp(self):
self.assertEqual(audioop.avgpp(data[0], 1), 0) for w in 1, 2, 4:
self.assertEqual(audioop.avgpp(data[1], 2), 0) self.assertEqual(audioop.avgpp(b'', w), 0)
self.assertEqual(audioop.avgpp(data[2], 4), 0) self.assertEqual(audioop.avgpp(packs[w](*range(100)), w), 0)
self.assertEqual(audioop.avgpp(packs[w](9, 10, 5, 5, 0, 1), w), 10)
self.assertEqual(audioop.avgpp(datas[1], 1), 196)
self.assertEqual(audioop.avgpp(datas[2], 2), 50534)
self.assertEqual(audioop.avgpp(datas[4], 4), 3311897002)
def test_rms(self): def test_rms(self):
self.assertEqual(audioop.rms(data[0], 1), 1) for w in 1, 2, 4:
self.assertEqual(audioop.rms(data[1], 2), 1) self.assertEqual(audioop.rms(b'', w), 0)
self.assertEqual(audioop.rms(data[2], 4), 1) p = packs[w]
self.assertEqual(audioop.rms(p(*range(100)), w), 57)
self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
maxvalues[w], delta=1)
self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
-minvalues[w], delta=1)
self.assertEqual(audioop.rms(datas[1], 1), 77)
self.assertEqual(audioop.rms(datas[2], 2), 20001)
self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
def test_cross(self): def test_cross(self):
self.assertEqual(audioop.cross(data[0], 1), 0) for w in 1, 2, 4:
self.assertEqual(audioop.cross(data[1], 2), 0) self.assertEqual(audioop.cross(b'', w), -1)
self.assertEqual(audioop.cross(data[2], 4), 0) p = packs[w]
self.assertEqual(audioop.cross(p(0, 1, 2), w), 0)
self.assertEqual(audioop.cross(p(1, 2, -3, -4), w), 1)
self.assertEqual(audioop.cross(p(-1, -2, 3, 4), w), 1)
self.assertEqual(audioop.cross(p(0, minvalues[w]), w), 1)
self.assertEqual(audioop.cross(p(minvalues[w], maxvalues[w]), w), 1)
def test_add(self): def test_add(self):
data2 = [] for w in 1, 2, 4:
for d in data: self.assertEqual(audioop.add(b'', b'', w), b'')
str = bytearray(len(d)) self.assertEqual(audioop.add(datas[w], b'\0' * len(datas[w]), w),
for i,b in enumerate(d): datas[w])
str[i] = 2*b self.assertEqual(audioop.add(datas[1], datas[1], 1),
data2.append(str) b'\x00\x24\x7f\x80\x7f\x80\xfe')
self.assertEqual(audioop.add(data[0], data[0], 1), data2[0]) self.assertEqual(audioop.add(datas[2], datas[2], 2),
self.assertEqual(audioop.add(data[1], data[1], 2), data2[1]) packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2))
self.assertEqual(audioop.add(data[2], data[2], 4), data2[2]) self.assertEqual(audioop.add(datas[4], datas[4], 4),
packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000,
0x7fffffff, -0x80000000, -2))
def test_bias(self): def test_bias(self):
# Note: this test assumes that avg() works for w in 1, 2, 4:
d1 = audioop.bias(data[0], 1, 100) for bias in 0, 1, -1, 127, -128, 0x7fffffff, -0x80000000:
d2 = audioop.bias(data[1], 2, 100) self.assertEqual(audioop.bias(b'', w, bias), b'')
d4 = audioop.bias(data[2], 4, 100) self.assertEqual(audioop.bias(datas[1], 1, 1),
self.assertEqual(audioop.avg(d1, 1), 101) b'\x01\x13\x46\xbc\x80\x81\x00')
self.assertEqual(audioop.avg(d2, 2), 101) self.assertEqual(audioop.bias(datas[1], 1, -1),
self.assertEqual(audioop.avg(d4, 4), 101) b'\xff\x11\x44\xba\x7e\x7f\xfe')
self.assertEqual(audioop.bias(datas[1], 1, 0x7fffffff),
b'\xff\x11\x44\xba\x7e\x7f\xfe')
self.assertEqual(audioop.bias(datas[1], 1, -0x80000000),
datas[1])
self.assertEqual(audioop.bias(datas[2], 2, 1),
packs[2](1, 0x1235, 0x4568, -0x4566, -0x8000, -0x7fff, 0))
self.assertEqual(audioop.bias(datas[2], 2, -1),
packs[2](-1, 0x1233, 0x4566, -0x4568, 0x7ffe, 0x7fff, -2))
self.assertEqual(audioop.bias(datas[2], 2, 0x7fffffff),
packs[2](-1, 0x1233, 0x4566, -0x4568, 0x7ffe, 0x7fff, -2))
self.assertEqual(audioop.bias(datas[2], 2, -0x80000000),
datas[2])
self.assertEqual(audioop.bias(datas[4], 4, 1),
packs[4](1, 0x12345679, 0x456789ac, -0x456789aa,
-0x80000000, -0x7fffffff, 0))
self.assertEqual(audioop.bias(datas[4], 4, -1),
packs[4](-1, 0x12345677, 0x456789aa, -0x456789ac,
0x7ffffffe, 0x7fffffff, -2))
self.assertEqual(audioop.bias(datas[4], 4, 0x7fffffff),
packs[4](0x7fffffff, -0x6dcba989, -0x3a987656, 0x3a987654,
-2, -1, 0x7ffffffe))
self.assertEqual(audioop.bias(datas[4], 4, -0x80000000),
packs[4](-0x80000000, -0x6dcba988, -0x3a987655, 0x3a987655,
-1, 0, 0x7fffffff))
def test_lin2lin(self): def test_lin2lin(self):
# too simple: we test only the size for w in 1, 2, 4:
for d1 in data: self.assertEqual(audioop.lin2lin(datas[w], w, w), datas[w])
for d2 in data:
got = len(d1)//3 self.assertEqual(audioop.lin2lin(datas[1], 1, 2),
wtd = len(d2)//3 packs[2](0, 0x1200, 0x4500, -0x4500, 0x7f00, -0x8000, -0x100))
self.assertEqual(len(audioop.lin2lin(d1, got, wtd)), len(d2)) self.assertEqual(audioop.lin2lin(datas[1], 1, 4),
packs[4](0, 0x12000000, 0x45000000, -0x45000000,
0x7f000000, -0x80000000, -0x1000000))
self.assertEqual(audioop.lin2lin(datas[2], 2, 1),
b'\x00\x12\x45\xba\x7f\x80\xff')
self.assertEqual(audioop.lin2lin(datas[2], 2, 4),
packs[4](0, 0x12340000, 0x45670000, -0x45670000,
0x7fff0000, -0x80000000, -0x10000))
self.assertEqual(audioop.lin2lin(datas[4], 4, 1),
b'\x00\x12\x45\xba\x7f\x80\xff')
self.assertEqual(audioop.lin2lin(datas[4], 4, 2),
packs[2](0, 0x1234, 0x4567, -0x4568, 0x7fff, -0x8000, -1))
def test_adpcm2lin(self): def test_adpcm2lin(self):
self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 1, None),
(b'\x00\x00\x00\xff\x00\xff', (-179, 40)))
self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 2, None),
(packs[2](0, 0xb, 0x29, -0x16, 0x72, -0xb3), (-179, 40)))
self.assertEqual(audioop.adpcm2lin(b'\x07\x7f\x7f', 4, None),
(packs[4](0, 0xb0000, 0x290000, -0x160000, 0x720000,
-0xb30000), (-179, 40)))
# Very cursory test # Very cursory test
self.assertEqual(audioop.adpcm2lin(b'\0\0', 1, None), (b'\0' * 4, (0,0))) for w in 1, 2, 4:
self.assertEqual(audioop.adpcm2lin(b'\0\0', 2, None), (b'\0' * 8, (0,0))) self.assertEqual(audioop.adpcm2lin(b'\0' * 5, w, None),
self.assertEqual(audioop.adpcm2lin(b'\0\0', 4, None), (b'\0' * 16, (0,0))) (b'\0' * w * 10, (0, 0)))
def test_lin2adpcm(self): def test_lin2adpcm(self):
self.assertEqual(audioop.lin2adpcm(datas[1], 1, None),
(b'\x07\x7f\x7f', (-221, 39)))
self.assertEqual(audioop.lin2adpcm(datas[2], 2, None),
(b'\x07\x7f\x7f', (31, 39)))
self.assertEqual(audioop.lin2adpcm(datas[4], 4, None),
(b'\x07\x7f\x7f', (31, 39)))
# Very cursory test # Very cursory test
self.assertEqual(audioop.lin2adpcm(b'\0\0\0\0', 1, None), (b'\0\0', (0,0))) for w in 1, 2, 4:
self.assertEqual(audioop.lin2adpcm(b'\0' * w * 10, w, None),
(b'\0' * 5, (0, 0)))
def test_lin2alaw(self): def test_lin2alaw(self):
self.assertEqual(audioop.lin2alaw(data[0], 1), b'\xd5\xc5\xf5') self.assertEqual(audioop.lin2alaw(datas[1], 1),
self.assertEqual(audioop.lin2alaw(data[1], 2), b'\xd5\xd5\xd5') b'\xd5\x87\xa4\x24\xaa\x2a\x5a')
self.assertEqual(audioop.lin2alaw(data[2], 4), b'\xd5\xd5\xd5') self.assertEqual(audioop.lin2alaw(datas[2], 2),
b'\xd5\x87\xa4\x24\xaa\x2a\x55')
self.assertEqual(audioop.lin2alaw(datas[4], 4),
b'\xd5\x87\xa4\x24\xaa\x2a\x55')
def test_alaw2lin(self): def test_alaw2lin(self):
# Cursory encoded = b'\x00\x03\x24\x2a\x51\x54\x55\x58\x6b\x71\x7f'\
d = audioop.lin2alaw(data[0], 1) b'\x80\x83\xa4\xaa\xd1\xd4\xd5\xd8\xeb\xf1\xff'
self.assertEqual(audioop.alaw2lin(d, 1), data[0]) src = [-688, -720, -2240, -4032, -9, -3, -1, -27, -244, -82, -106,
if endian == 'big': 688, 720, 2240, 4032, 9, 3, 1, 27, 244, 82, 106]
self.assertEqual(audioop.alaw2lin(d, 2), for w in 1, 2, 4:
b'\x00\x08\x01\x08\x02\x10') self.assertEqual(audioop.alaw2lin(encoded, w),
self.assertEqual(audioop.alaw2lin(d, 4), packs[w](*(x << (w * 8) >> 13 for x in src)))
b'\x00\x08\x00\x00\x01\x08\x00\x00\x02\x10\x00\x00')
else: encoded = bytes(range(256))
self.assertEqual(audioop.alaw2lin(d, 2), for w in 2, 4:
b'\x08\x00\x08\x01\x10\x02') decoded = audioop.alaw2lin(encoded, w)
self.assertEqual(audioop.alaw2lin(d, 4), self.assertEqual(audioop.lin2alaw(decoded, w), encoded)
b'\x00\x00\x08\x00\x00\x00\x08\x01\x00\x00\x10\x02')
def test_lin2ulaw(self): def test_lin2ulaw(self):
self.assertEqual(audioop.lin2ulaw(data[0], 1), b'\xff\xe7\xdb') self.assertEqual(audioop.lin2ulaw(datas[1], 1),
self.assertEqual(audioop.lin2ulaw(data[1], 2), b'\xff\xff\xff') b'\xff\xad\x8e\x0e\x80\x00\x67')
self.assertEqual(audioop.lin2ulaw(data[2], 4), b'\xff\xff\xff') self.assertEqual(audioop.lin2ulaw(datas[2], 2),
b'\xff\xad\x8e\x0e\x80\x00\x7e')
self.assertEqual(audioop.lin2ulaw(datas[4], 4),
b'\xff\xad\x8e\x0e\x80\x00\x7e')
def test_ulaw2lin(self): def test_ulaw2lin(self):
# Cursory encoded = b'\x00\x0e\x28\x3f\x57\x6a\x76\x7c\x7e\x7f'\
d = audioop.lin2ulaw(data[0], 1) b'\x80\x8e\xa8\xbf\xd7\xea\xf6\xfc\xfe\xff'
self.assertEqual(audioop.ulaw2lin(d, 1), data[0]) src = [-8031, -4447, -1471, -495, -163, -53, -18, -6, -2, 0,
if endian == 'big': 8031, 4447, 1471, 495, 163, 53, 18, 6, 2, 0]
self.assertEqual(audioop.ulaw2lin(d, 2), for w in 1, 2, 4:
b'\x00\x00\x01\x04\x02\x0c') self.assertEqual(audioop.ulaw2lin(encoded, w),
self.assertEqual(audioop.ulaw2lin(d, 4), packs[w](*(x << (w * 8) >> 14 for x in src)))
b'\x00\x00\x00\x00\x01\x04\x00\x00\x02\x0c\x00\x00')
else: # Current u-law implementation has two codes fo 0: 0x7f and 0xff.
self.assertEqual(audioop.ulaw2lin(d, 2), encoded = bytes(range(127)) + bytes(range(128, 256))
b'\x00\x00\x04\x01\x0c\x02') for w in 2, 4:
self.assertEqual(audioop.ulaw2lin(d, 4), decoded = audioop.ulaw2lin(encoded, w)
b'\x00\x00\x00\x00\x00\x00\x04\x01\x00\x00\x0c\x02') self.assertEqual(audioop.lin2ulaw(decoded, w), encoded)
def test_mul(self): def test_mul(self):
data2 = [] for w in 1, 2, 4:
for d in data: self.assertEqual(audioop.mul(b'', w, 2), b'')
str = bytearray(len(d)) self.assertEqual(audioop.mul(datas[w], w, 0),
for i,b in enumerate(d): b'\0' * len(datas[w]))
str[i] = 2*b self.assertEqual(audioop.mul(datas[w], w, 1),
data2.append(str) datas[w])
self.assertEqual(audioop.mul(data[0], 1, 2), data2[0]) self.assertEqual(audioop.mul(datas[1], 1, 2),
self.assertEqual(audioop.mul(data[1],2, 2), data2[1]) b'\x00\x24\x7f\x80\x7f\x80\xfe')
self.assertEqual(audioop.mul(data[2], 4, 2), data2[2]) self.assertEqual(audioop.mul(datas[2], 2, 2),
packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2))
self.assertEqual(audioop.mul(datas[4], 4, 2),
packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000,
0x7fffffff, -0x80000000, -2))
def test_ratecv(self): def test_ratecv(self):
for w in 1, 2, 4:
self.assertEqual(audioop.ratecv(b'', w, 1, 8000, 8000, None),
(b'', (-1, ((0, 0),))))
self.assertEqual(audioop.ratecv(b'', w, 5, 8000, 8000, None),
(b'', (-1, ((0, 0),) * 5)))
self.assertEqual(audioop.ratecv(b'', w, 1, 8000, 16000, None),
(b'', (-2, ((0, 0),))))
self.assertEqual(audioop.ratecv(datas[w], w, 1, 8000, 8000, None)[0],
datas[w])
state = None state = None
d1, state = audioop.ratecv(data[0], 1, 1, 8000, 16000, state) d1, state = audioop.ratecv(b'\x00\x01\x02', 1, 1, 8000, 16000, state)
d2, state = audioop.ratecv(data[0], 1, 1, 8000, 16000, state) d2, state = audioop.ratecv(b'\x00\x01\x02', 1, 1, 8000, 16000, state)
self.assertEqual(d1 + d2, b'\000\000\001\001\002\001\000\000\001\001\002') self.assertEqual(d1 + d2, b'\000\000\001\001\002\001\000\000\001\001\002')
for w in 1, 2, 4:
d0, state0 = audioop.ratecv(datas[w], w, 1, 8000, 16000, None)
d, state = b'', None
for i in range(0, len(datas[w]), w):
d1, state = audioop.ratecv(datas[w][i:i + w], w, 1,
8000, 16000, state)
d += d1
self.assertEqual(d, d0)
self.assertEqual(state, state0)
def test_reverse(self): def test_reverse(self):
self.assertEqual(audioop.reverse(data[0], 1), b'\2\1\0') for w in 1, 2, 4:
self.assertEqual(audioop.reverse(b'', w), b'')
self.assertEqual(audioop.reverse(packs[w](0, 1, 2), w),
packs[w](2, 1, 0))
def test_tomono(self): def test_tomono(self):
data2 = bytearray() for w in 1, 2, 4:
for d in data[0]: data1 = datas[w]
data2.append(d) data2 = bytearray(2 * len(data1))
data2.append(d) for k in range(w):
self.assertEqual(audioop.tomono(data2, 1, 0.5, 0.5), data[0]) data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 1, 0), data1)
self.assertEqual(audioop.tomono(data2, w, 0, 1), b'\0' * len(data1))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tomono(data2, w, 0.5, 0.5), data1)
def test_tostereo(self): def test_tostereo(self):
data2 = bytearray() for w in 1, 2, 4:
for d in data[0]: data1 = datas[w]
data2.append(d) data2 = bytearray(2 * len(data1))
data2.append(d) for k in range(w):
self.assertEqual(audioop.tostereo(data[0], 1, 1, 1), data2) data2[k::2*w] = data1[k::w]
self.assertEqual(audioop.tostereo(data1, w, 1, 0), data2)
self.assertEqual(audioop.tostereo(data1, w, 0, 0), b'\0' * len(data2))
for k in range(w):
data2[k+w::2*w] = data1[k::w]
self.assertEqual(audioop.tostereo(data1, w, 1, 1), data2)
def test_findfactor(self): def test_findfactor(self):
self.assertEqual(audioop.findfactor(data[1], data[1]), 1.0) self.assertEqual(audioop.findfactor(datas[2], datas[2]), 1.0)
self.assertEqual(audioop.findfactor(b'\0' * len(datas[2]), datas[2]),
0.0)
def test_findfit(self): def test_findfit(self):
self.assertEqual(audioop.findfit(data[1], data[1]), (0, 1.0)) self.assertEqual(audioop.findfit(datas[2], datas[2]), (0, 1.0))
self.assertEqual(audioop.findfit(datas[2], packs[2](1, 2, 0)),
(1, 8038.8))
self.assertEqual(audioop.findfit(datas[2][:-2] * 5 + datas[2], datas[2]),
(30, 1.0))
def test_findmax(self): def test_findmax(self):
self.assertEqual(audioop.findmax(data[1], 1), 2) self.assertEqual(audioop.findmax(datas[2], 1), 5)
def test_getsample(self): def test_getsample(self):
for i in range(3): for w in 1, 2, 4:
self.assertEqual(audioop.getsample(data[0], 1, i), i) data = packs[w](0, 1, -1, maxvalues[w], minvalues[w])
self.assertEqual(audioop.getsample(data[1], 2, i), i) self.assertEqual(audioop.getsample(data, w, 0), 0)
self.assertEqual(audioop.getsample(data[2], 4, i), i) self.assertEqual(audioop.getsample(data, w, 1), 1)
self.assertEqual(audioop.getsample(data, w, 2), -1)
self.assertEqual(audioop.getsample(data, w, 3), maxvalues[w])
self.assertEqual(audioop.getsample(data, w, 4), minvalues[w])
def test_negativelen(self): def test_negativelen(self):
# from issue 3306, previously it segfaulted # from issue 3306, previously it segfaulted
self.assertRaises(audioop.error, self.assertRaises(audioop.error,
audioop.findmax, ''.join(chr(x) for x in range(256)), -2392392) audioop.findmax, bytes(range(256)), -2392392)
def test_issue7673(self): def test_issue7673(self):
state = None state = None
@ -222,9 +367,9 @@ class TestAudioop(unittest.TestCase):
self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state) self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_wrongsize(self): def test_wrongsize(self):
data = b'abc' data = b'abcdefgh'
state = None state = None
for size in (-1, 3, 5): for size in (-1, 0, 3, 5, 1024):
self.assertRaises(audioop.error, audioop.ulaw2lin, data, size) self.assertRaises(audioop.error, audioop.ulaw2lin, data, size)
self.assertRaises(audioop.error, audioop.alaw2lin, data, size) self.assertRaises(audioop.error, audioop.alaw2lin, data, size)
self.assertRaises(audioop.error, audioop.adpcm2lin, data, size, state) self.assertRaises(audioop.error, audioop.adpcm2lin, data, size, state)

View File

@ -215,6 +215,12 @@ Core and Builtins
Library Library
------- -------
- Issue #16686: Fixed a lot of bugs in audioop module. Fixed crashes in
avgpp(), maxpp() and ratecv(). Fixed an integer overflow in add(), bias(),
and ratecv(). reverse(), lin2lin() and ratecv() no more lose precision for
32-bit samples. max() and rms() no more returns a negative result and
various other functions now work correctly with 32-bit sample -0x80000000.
- Issue #17073: Fix some integer overflows in sqlite3 module. - Issue #17073: Fix some integer overflows in sqlite3 module.
- Issue #17114: IDLE now uses non-strict config parser. - Issue #17114: IDLE now uses non-strict config parser.

View File

@ -26,6 +26,21 @@ typedef short PyInt16;
#endif #endif
#endif #endif
static const int maxvals[] = {0, 0x7F, 0x7FFF, 0x7FFFFF, 0x7FFFFFFF};
static const int minvals[] = {0, -0x80, -0x8000, -0x800000, -0x80000000};
static const unsigned int masks[] = {0, 0xFF, 0xFFFF, 0xFFFFFF, 0xFFFFFFFF};
static int
fbound(double val, double minval, double maxval)
{
if (val > maxval)
val = maxval;
else if (val < minval + 1)
val = minval;
return val;
}
/* Code shamelessly stolen from sox, 12.17.7, g711.c /* Code shamelessly stolen from sox, 12.17.7, g711.c
** (c) Craig Reese, Joe Campbell and Jeff Poskanzer 1989 */ ** (c) Craig Reese, Joe Campbell and Jeff Poskanzer 1989 */
@ -347,7 +362,7 @@ audioop_max(PyObject *self, PyObject *args)
signed char *cp; signed char *cp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0; int size, val = 0;
int max = 0; unsigned int absval, max = 0;
if ( !PyArg_ParseTuple(args, "s#i:max", &cp, &len, &size) ) if ( !PyArg_ParseTuple(args, "s#i:max", &cp, &len, &size) )
return 0; return 0;
@ -357,10 +372,11 @@ audioop_max(PyObject *self, PyObject *args)
if ( size == 1 ) val = (int)*CHARP(cp, i); if ( size == 1 ) val = (int)*CHARP(cp, i);
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = (int)*SHORTP(cp, i);
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i);
if ( val < 0 ) val = (-val); if (val < 0) absval = (-val);
if ( val > max ) max = val; else absval = val;
if (absval > max) max = absval;
} }
return PyLong_FromLong(max); return PyLong_FromUnsignedLong(max);
} }
static PyObject * static PyObject *
@ -369,7 +385,7 @@ audioop_minmax(PyObject *self, PyObject *args)
signed char *cp; signed char *cp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0; int size, val = 0;
int min = 0x7fffffff, max = -0x7fffffff; int min = 0x7fffffff, max = -0x80000000;
if (!PyArg_ParseTuple(args, "s#i:minmax", &cp, &len, &size)) if (!PyArg_ParseTuple(args, "s#i:minmax", &cp, &len, &size))
return NULL; return NULL;
@ -406,7 +422,7 @@ audioop_avg(PyObject *self, PyObject *args)
if ( len == 0 ) if ( len == 0 )
val = 0; val = 0;
else else
val = (int)(avg / (double)(len/size)); val = (int)floor(avg / (double)(len/size));
return PyLong_FromLong(val); return PyLong_FromLong(val);
} }
@ -416,6 +432,7 @@ audioop_rms(PyObject *self, PyObject *args)
signed char *cp; signed char *cp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0; int size, val = 0;
unsigned int res;
double sum_squares = 0.0; double sum_squares = 0.0;
if ( !PyArg_ParseTuple(args, "s#i:rms", &cp, &len, &size) ) if ( !PyArg_ParseTuple(args, "s#i:rms", &cp, &len, &size) )
@ -429,10 +446,10 @@ audioop_rms(PyObject *self, PyObject *args)
sum_squares += (double)val*(double)val; sum_squares += (double)val*(double)val;
} }
if ( len == 0 ) if ( len == 0 )
val = 0; res = 0;
else else
val = (int)sqrt(sum_squares / (double)(len/size)); res = (unsigned int)sqrt(sum_squares / (double)(len/size));
return PyLong_FromLong(val); return PyLong_FromUnsignedLong(res);
} }
static double _sum2(short *a, short *b, Py_ssize_t len) static double _sum2(short *a, short *b, Py_ssize_t len)
@ -624,52 +641,46 @@ audioop_avgpp(PyObject *self, PyObject *args)
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0, prevval = 0, prevextremevalid = 0, int size, val = 0, prevval = 0, prevextremevalid = 0,
prevextreme = 0; prevextreme = 0;
double avg = 0.0; double sum = 0.0;
int diff, prevdiff, extremediff, nextreme = 0; unsigned int avg;
int diff, prevdiff, nextreme = 0;
if ( !PyArg_ParseTuple(args, "s#i:avgpp", &cp, &len, &size) ) if ( !PyArg_ParseTuple(args, "s#i:avgpp", &cp, &len, &size) )
return 0; return 0;
if (!audioop_check_parameters(len, size)) if (!audioop_check_parameters(len, size))
return NULL; return NULL;
/* Compute first delta value ahead. Also automatically makes us if (len <= size)
** skip the first extreme value return PyLong_FromLong(0);
*/
if ( size == 1 ) prevval = (int)*CHARP(cp, 0); if ( size == 1 ) prevval = (int)*CHARP(cp, 0);
else if ( size == 2 ) prevval = (int)*SHORTP(cp, 0); else if ( size == 2 ) prevval = (int)*SHORTP(cp, 0);
else if ( size == 4 ) prevval = (int)*LONGP(cp, 0); else if ( size == 4 ) prevval = (int)*LONGP(cp, 0);
if ( size == 1 ) val = (int)*CHARP(cp, size); prevdiff = 17; /* Anything != 0, 1 */
else if ( size == 2 ) val = (int)*SHORTP(cp, size);
else if ( size == 4 ) val = (int)*LONGP(cp, size);
prevdiff = val - prevval;
for ( i=size; i<len; i+= size) { for ( i=size; i<len; i+= size) {
if ( size == 1 ) val = (int)*CHARP(cp, i); if ( size == 1 ) val = (int)*CHARP(cp, i);
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = (int)*SHORTP(cp, i);
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i);
diff = val - prevval; if (val != prevval) {
if ( diff*prevdiff < 0 ) { diff = val < prevval;
if (prevdiff == !diff) {
/* Derivative changed sign. Compute difference to last /* Derivative changed sign. Compute difference to last
** extreme value and remember. ** extreme value and remember.
*/ */
if (prevextremevalid) { if (prevextremevalid) {
extremediff = prevval - prevextreme; sum += fabs((double)prevval - (double)prevextreme);
if ( extremediff < 0 )
extremediff = -extremediff;
avg += extremediff;
nextreme++; nextreme++;
} }
prevextremevalid = 1; prevextremevalid = 1;
prevextreme = prevval; prevextreme = prevval;
} }
prevval = val; prevval = val;
if ( diff != 0 )
prevdiff = diff; prevdiff = diff;
} }
}
if ( nextreme == 0 ) if ( nextreme == 0 )
val = 0; avg = 0;
else else
val = (int)(avg / (double)nextreme); avg = (unsigned int)(sum / (double)nextreme);
return PyLong_FromLong(val); return PyLong_FromUnsignedLong(avg);
} }
static PyObject * static PyObject *
@ -679,37 +690,36 @@ audioop_maxpp(PyObject *self, PyObject *args)
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0, prevval = 0, prevextremevalid = 0, int size, val = 0, prevval = 0, prevextremevalid = 0,
prevextreme = 0; prevextreme = 0;
int max = 0; unsigned int max = 0, extremediff;
int diff, prevdiff, extremediff; int diff, prevdiff;
if ( !PyArg_ParseTuple(args, "s#i:maxpp", &cp, &len, &size) ) if ( !PyArg_ParseTuple(args, "s#i:maxpp", &cp, &len, &size) )
return 0; return 0;
if (!audioop_check_parameters(len, size)) if (!audioop_check_parameters(len, size))
return NULL; return NULL;
/* Compute first delta value ahead. Also automatically makes us if (len <= size)
** skip the first extreme value return PyLong_FromLong(0);
*/
if ( size == 1 ) prevval = (int)*CHARP(cp, 0); if ( size == 1 ) prevval = (int)*CHARP(cp, 0);
else if ( size == 2 ) prevval = (int)*SHORTP(cp, 0); else if ( size == 2 ) prevval = (int)*SHORTP(cp, 0);
else if ( size == 4 ) prevval = (int)*LONGP(cp, 0); else if ( size == 4 ) prevval = (int)*LONGP(cp, 0);
if ( size == 1 ) val = (int)*CHARP(cp, size); prevdiff = 17; /* Anything != 0, 1 */
else if ( size == 2 ) val = (int)*SHORTP(cp, size);
else if ( size == 4 ) val = (int)*LONGP(cp, size);
prevdiff = val - prevval;
for ( i=size; i<len; i+= size) { for ( i=size; i<len; i+= size) {
if ( size == 1 ) val = (int)*CHARP(cp, i); if ( size == 1 ) val = (int)*CHARP(cp, i);
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = (int)*SHORTP(cp, i);
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i);
diff = val - prevval; if (val != prevval) {
if ( diff*prevdiff < 0 ) { diff = val < prevval;
if (prevdiff == !diff) {
/* Derivative changed sign. Compute difference to /* Derivative changed sign. Compute difference to
** last extreme value and remember. ** last extreme value and remember.
*/ */
if (prevextremevalid) { if (prevextremevalid) {
extremediff = prevval - prevextreme; if (prevval < prevextreme)
if ( extremediff < 0 ) extremediff = (unsigned int)prevextreme -
extremediff = -extremediff; (unsigned int)prevval;
else
extremediff = (unsigned int)prevval -
(unsigned int)prevextreme;
if ( extremediff > max ) if ( extremediff > max )
max = extremediff; max = extremediff;
} }
@ -717,10 +727,10 @@ audioop_maxpp(PyObject *self, PyObject *args)
prevextreme = prevval; prevextreme = prevval;
} }
prevval = val; prevval = val;
if ( diff != 0 )
prevdiff = diff; prevdiff = diff;
} }
return PyLong_FromLong(max); }
return PyLong_FromUnsignedLong(max);
} }
static PyObject * static PyObject *
@ -755,7 +765,7 @@ audioop_mul(PyObject *self, PyObject *args)
signed char *cp, *ncp; signed char *cp, *ncp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0; int size, val = 0;
double factor, fval, maxval; double factor, fval, maxval, minval;
PyObject *rv; PyObject *rv;
if ( !PyArg_ParseTuple(args, "s#id:mul", &cp, &len, &size, &factor ) ) if ( !PyArg_ParseTuple(args, "s#id:mul", &cp, &len, &size, &factor ) )
@ -763,13 +773,8 @@ audioop_mul(PyObject *self, PyObject *args)
if (!audioop_check_parameters(len, size)) if (!audioop_check_parameters(len, size))
return NULL; return NULL;
if ( size == 1 ) maxval = (double) 0x7f; maxval = (double) maxvals[size];
else if ( size == 2 ) maxval = (double) 0x7fff; minval = (double) minvals[size];
else if ( size == 4 ) maxval = (double) 0x7fffffff;
else {
PyErr_SetString(AudioopError, "Size should be 1, 2 or 4");
return 0;
}
rv = PyBytes_FromStringAndSize(NULL, len); rv = PyBytes_FromStringAndSize(NULL, len);
if ( rv == 0 ) if ( rv == 0 )
@ -782,9 +787,7 @@ audioop_mul(PyObject *self, PyObject *args)
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = (int)*SHORTP(cp, i);
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i);
fval = (double)val*factor; fval = (double)val*factor;
if ( fval > maxval ) fval = maxval; val = (int)floor(fbound(fval, minval, maxval));
else if ( fval < -maxval ) fval = -maxval;
val = (int)fval;
if ( size == 1 ) *CHARP(ncp, i) = (signed char)val; if ( size == 1 ) *CHARP(ncp, i) = (signed char)val;
else if ( size == 2 ) *SHORTP(ncp, i) = (short)val; else if ( size == 2 ) *SHORTP(ncp, i) = (short)val;
else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)val; else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)val;
@ -799,7 +802,7 @@ audioop_tomono(PyObject *self, PyObject *args)
signed char *cp, *ncp; signed char *cp, *ncp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val1 = 0, val2 = 0; int size, val1 = 0, val2 = 0;
double fac1, fac2, fval, maxval; double fac1, fac2, fval, maxval, minval;
PyObject *rv; PyObject *rv;
if ( !PyArg_ParseTuple(args, "s*idd:tomono", if ( !PyArg_ParseTuple(args, "s*idd:tomono",
@ -817,14 +820,8 @@ audioop_tomono(PyObject *self, PyObject *args)
return NULL; return NULL;
} }
if ( size == 1 ) maxval = (double) 0x7f; maxval = (double) maxvals[size];
else if ( size == 2 ) maxval = (double) 0x7fff; minval = (double) minvals[size];
else if ( size == 4 ) maxval = (double) 0x7fffffff;
else {
PyBuffer_Release(&pcp);
PyErr_SetString(AudioopError, "Size should be 1, 2 or 4");
return 0;
}
rv = PyBytes_FromStringAndSize(NULL, len/2); rv = PyBytes_FromStringAndSize(NULL, len/2);
if ( rv == 0 ) { if ( rv == 0 ) {
@ -842,9 +839,7 @@ audioop_tomono(PyObject *self, PyObject *args)
else if ( size == 2 ) val2 = (int)*SHORTP(cp, i+2); else if ( size == 2 ) val2 = (int)*SHORTP(cp, i+2);
else if ( size == 4 ) val2 = (int)*LONGP(cp, i+4); else if ( size == 4 ) val2 = (int)*LONGP(cp, i+4);
fval = (double)val1*fac1 + (double)val2*fac2; fval = (double)val1*fac1 + (double)val2*fac2;
if ( fval > maxval ) fval = maxval; val1 = (int)floor(fbound(fval, minval, maxval));
else if ( fval < -maxval ) fval = -maxval;
val1 = (int)fval;
if ( size == 1 ) *CHARP(ncp, i/2) = (signed char)val1; if ( size == 1 ) *CHARP(ncp, i/2) = (signed char)val1;
else if ( size == 2 ) *SHORTP(ncp, i/2) = (short)val1; else if ( size == 2 ) *SHORTP(ncp, i/2) = (short)val1;
else if ( size == 4 ) *LONGP(ncp, i/2)= (Py_Int32)val1; else if ( size == 4 ) *LONGP(ncp, i/2)= (Py_Int32)val1;
@ -859,7 +854,7 @@ audioop_tostereo(PyObject *self, PyObject *args)
signed char *cp, *ncp; signed char *cp, *ncp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val1, val2, val = 0; int size, val1, val2, val = 0;
double fac1, fac2, fval, maxval; double fac1, fac2, fval, maxval, minval;
PyObject *rv; PyObject *rv;
if ( !PyArg_ParseTuple(args, "s#idd:tostereo", if ( !PyArg_ParseTuple(args, "s#idd:tostereo",
@ -868,13 +863,8 @@ audioop_tostereo(PyObject *self, PyObject *args)
if (!audioop_check_parameters(len, size)) if (!audioop_check_parameters(len, size))
return NULL; return NULL;
if ( size == 1 ) maxval = (double) 0x7f; maxval = (double) maxvals[size];
else if ( size == 2 ) maxval = (double) 0x7fff; minval = (double) minvals[size];
else if ( size == 4 ) maxval = (double) 0x7fffffff;
else {
PyErr_SetString(AudioopError, "Size should be 1, 2 or 4");
return 0;
}
if (len > PY_SSIZE_T_MAX/2) { if (len > PY_SSIZE_T_MAX/2) {
PyErr_SetString(PyExc_MemoryError, PyErr_SetString(PyExc_MemoryError,
@ -894,14 +884,10 @@ audioop_tostereo(PyObject *self, PyObject *args)
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (int)*LONGP(cp, i);
fval = (double)val*fac1; fval = (double)val*fac1;
if ( fval > maxval ) fval = maxval; val1 = (int)floor(fbound(fval, minval, maxval));
else if ( fval < -maxval ) fval = -maxval;
val1 = (int)fval;
fval = (double)val*fac2; fval = (double)val*fac2;
if ( fval > maxval ) fval = maxval; val2 = (int)floor(fbound(fval, minval, maxval));
else if ( fval < -maxval ) fval = -maxval;
val2 = (int)fval;
if ( size == 1 ) *CHARP(ncp, i*2) = (signed char)val1; if ( size == 1 ) *CHARP(ncp, i*2) = (signed char)val1;
else if ( size == 2 ) *SHORTP(ncp, i*2) = (short)val1; else if ( size == 2 ) *SHORTP(ncp, i*2) = (short)val1;
@ -919,7 +905,7 @@ audioop_add(PyObject *self, PyObject *args)
{ {
signed char *cp1, *cp2, *ncp; signed char *cp1, *cp2, *ncp;
Py_ssize_t len1, len2, i; Py_ssize_t len1, len2, i;
int size, val1 = 0, val2 = 0, maxval, newval; int size, val1 = 0, val2 = 0, minval, maxval, newval;
PyObject *rv; PyObject *rv;
if ( !PyArg_ParseTuple(args, "s#s#i:add", if ( !PyArg_ParseTuple(args, "s#s#i:add",
@ -932,13 +918,8 @@ audioop_add(PyObject *self, PyObject *args)
return 0; return 0;
} }
if ( size == 1 ) maxval = 0x7f; maxval = maxvals[size];
else if ( size == 2 ) maxval = 0x7fff; minval = minvals[size];
else if ( size == 4 ) maxval = 0x7fffffff;
else {
PyErr_SetString(AudioopError, "Size should be 1, 2 or 4");
return 0;
}
rv = PyBytes_FromStringAndSize(NULL, len1); rv = PyBytes_FromStringAndSize(NULL, len1);
if ( rv == 0 ) if ( rv == 0 )
@ -954,12 +935,19 @@ audioop_add(PyObject *self, PyObject *args)
else if ( size == 2 ) val2 = (int)*SHORTP(cp2, i); else if ( size == 2 ) val2 = (int)*SHORTP(cp2, i);
else if ( size == 4 ) val2 = (int)*LONGP(cp2, i); else if ( size == 4 ) val2 = (int)*LONGP(cp2, i);
if (size < 4) {
newval = val1 + val2; newval = val1 + val2;
/* truncate in case of overflow */ /* truncate in case of overflow */
if (newval > maxval) newval = maxval; if (newval > maxval)
else if (newval < -maxval) newval = -maxval; newval = maxval;
else if (size == 4 && (newval^val1) < 0 && (newval^val2) < 0) else if (newval < minval)
newval = val1 > 0 ? maxval : - maxval; newval = minval;
}
else {
double fval = (double)val1 + (double)val2;
/* truncate in case of overflow */
newval = (int)floor(fbound(fval, minval, maxval));
}
if ( size == 1 ) *CHARP(ncp, i) = (signed char)newval; if ( size == 1 ) *CHARP(ncp, i) = (signed char)newval;
else if ( size == 2 ) *SHORTP(ncp, i) = (short)newval; else if ( size == 2 ) *SHORTP(ncp, i) = (short)newval;
@ -973,9 +961,9 @@ audioop_bias(PyObject *self, PyObject *args)
{ {
signed char *cp, *ncp; signed char *cp, *ncp;
Py_ssize_t len, i; Py_ssize_t len, i;
int size, val = 0; int size, bias;
unsigned int val = 0, mask;
PyObject *rv; PyObject *rv;
int bias;
if ( !PyArg_ParseTuple(args, "s#ii:bias", if ( !PyArg_ParseTuple(args, "s#ii:bias",
&cp, &len, &size , &bias) ) &cp, &len, &size , &bias) )
@ -989,15 +977,20 @@ audioop_bias(PyObject *self, PyObject *args)
return 0; return 0;
ncp = (signed char *)PyBytes_AsString(rv); ncp = (signed char *)PyBytes_AsString(rv);
mask = masks[size];
for ( i=0; i < len; i += size ) { for ( i=0; i < len; i += size ) {
if ( size == 1 ) val = (int)*CHARP(cp, i); if ( size == 1 ) val = (unsigned int)(unsigned char)*CHARP(cp, i);
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = (unsigned int)(unsigned short)*SHORTP(cp, i);
else if ( size == 4 ) val = (int)*LONGP(cp, i); else if ( size == 4 ) val = (unsigned int)(Py_UInt32)*LONGP(cp, i);
if ( size == 1 ) *CHARP(ncp, i) = (signed char)(val+bias); val += (unsigned int)bias;
else if ( size == 2 ) *SHORTP(ncp, i) = (short)(val+bias); /* wrap around in case of overflow */
else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)(val+bias); val &= mask;
if ( size == 1 ) *CHARP(ncp, i) = (signed char)(unsigned char)val;
else if ( size == 2 ) *SHORTP(ncp, i) = (short)(unsigned short)val;
else if ( size == 4 ) *LONGP(ncp, i) = (Py_Int32)(Py_UInt32)val;
} }
return rv; return rv;
} }
@ -1024,15 +1017,15 @@ audioop_reverse(PyObject *self, PyObject *args)
ncp = (unsigned char *)PyBytes_AsString(rv); ncp = (unsigned char *)PyBytes_AsString(rv);
for ( i=0; i < len; i += size ) { for ( i=0; i < len; i += size ) {
if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 8; if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 24;
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = ((int)*SHORTP(cp, i)) << 16;
else if ( size == 4 ) val = ((int)*LONGP(cp, i)) >> 16; else if ( size == 4 ) val = (int)*LONGP(cp, i);
j = len - i - size; j = len - i - size;
if ( size == 1 ) *CHARP(ncp, j) = (signed char)(val >> 8); if ( size == 1 ) *CHARP(ncp, j) = (signed char)(val >> 24);
else if ( size == 2 ) *SHORTP(ncp, j) = (short)(val); else if ( size == 2 ) *SHORTP(ncp, j) = (short)(val >> 16);
else if ( size == 4 ) *LONGP(ncp, j) = (Py_Int32)(val<<16); else if ( size == 4 ) *LONGP(ncp, j) = (Py_Int32)val;
} }
return rv; return rv;
} }
@ -1066,13 +1059,13 @@ audioop_lin2lin(PyObject *self, PyObject *args)
ncp = (unsigned char *)PyBytes_AsString(rv); ncp = (unsigned char *)PyBytes_AsString(rv);
for ( i=0, j=0; i < len; i += size, j += size2 ) { for ( i=0, j=0; i < len; i += size, j += size2 ) {
if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 8; if ( size == 1 ) val = ((int)*CHARP(cp, i)) << 24;
else if ( size == 2 ) val = (int)*SHORTP(cp, i); else if ( size == 2 ) val = ((int)*SHORTP(cp, i)) << 16;
else if ( size == 4 ) val = ((int)*LONGP(cp, i)) >> 16; else if ( size == 4 ) val = (int)*LONGP(cp, i);
if ( size2 == 1 ) *CHARP(ncp, j) = (signed char)(val >> 8); if ( size2 == 1 ) *CHARP(ncp, j) = (signed char)(val >> 24);
else if ( size2 == 2 ) *SHORTP(ncp, j) = (short)(val); else if ( size2 == 2 ) *SHORTP(ncp, j) = (short)(val >> 16);
else if ( size2 == 4 ) *LONGP(ncp, j) = (Py_Int32)(val<<16); else if ( size2 == 4 ) *LONGP(ncp, j) = (Py_Int32)val;
} }
return rv; return rv;
} }
@ -1136,6 +1129,10 @@ audioop_ratecv(PyObject *self, PyObject *args)
d = gcd(inrate, outrate); d = gcd(inrate, outrate);
inrate /= d; inrate /= d;
outrate /= d; outrate /= d;
/* divide weightA and weightB by their greatest common divisor */
d = gcd(weightA, weightB);
weightA /= d;
weightA /= d;
if ((size_t)nchannels > PY_SIZE_MAX/sizeof(int)) { if ((size_t)nchannels > PY_SIZE_MAX/sizeof(int)) {
PyErr_SetString(PyExc_MemoryError, PyErr_SetString(PyExc_MemoryError,
@ -1175,7 +1172,9 @@ audioop_ratecv(PyObject *self, PyObject *args)
} }
/* str <- Space for the output buffer. */ /* str <- Space for the output buffer. */
{ if (len == 0)
str = PyBytes_FromStringAndSize(NULL, 0);
else {
/* There are len input frames, so we need (mathematically) /* There are len input frames, so we need (mathematically)
ceiling(len*outrate/inrate) output frames, and each frame ceiling(len*outrate/inrate) output frames, and each frame
requires bytes_per_frame bytes. Computing this requires bytes_per_frame bytes. Computing this
@ -1190,13 +1189,12 @@ audioop_ratecv(PyObject *self, PyObject *args)
else else
str = PyBytes_FromStringAndSize(NULL, str = PyBytes_FromStringAndSize(NULL,
q * outrate * bytes_per_frame); q * outrate * bytes_per_frame);
}
if (str == NULL) { if (str == NULL) {
PyErr_SetString(PyExc_MemoryError, PyErr_SetString(PyExc_MemoryError,
"not enough memory for output buffer"); "not enough memory for output buffer");
goto exit; goto exit;
} }
}
ncp = PyBytes_AsString(str); ncp = PyBytes_AsString(str);
for (;;) { for (;;) {
@ -1229,32 +1227,32 @@ audioop_ratecv(PyObject *self, PyObject *args)
for (chan = 0; chan < nchannels; chan++) { for (chan = 0; chan < nchannels; chan++) {
prev_i[chan] = cur_i[chan]; prev_i[chan] = cur_i[chan];
if (size == 1) if (size == 1)
cur_i[chan] = ((int)*CHARP(cp, 0)) << 8; cur_i[chan] = ((int)*CHARP(cp, 0)) << 24;
else if (size == 2) else if (size == 2)
cur_i[chan] = (int)*SHORTP(cp, 0); cur_i[chan] = ((int)*SHORTP(cp, 0)) << 16;
else if (size == 4) else if (size == 4)
cur_i[chan] = ((int)*LONGP(cp, 0)) >> 16; cur_i[chan] = (int)*LONGP(cp, 0);
cp += size; cp += size;
/* implements a simple digital filter */ /* implements a simple digital filter */
cur_i[chan] = cur_i[chan] = (int)(
(weightA * cur_i[chan] + ((double)weightA * (double)cur_i[chan] +
weightB * prev_i[chan]) / (double)weightB * (double)prev_i[chan]) /
(weightA + weightB); ((double)weightA + (double)weightB));
} }
len--; len--;
d += outrate; d += outrate;
} }
while (d >= 0) { while (d >= 0) {
for (chan = 0; chan < nchannels; chan++) { for (chan = 0; chan < nchannels; chan++) {
cur_o = (prev_i[chan] * d + cur_o = (int)(((double)prev_i[chan] * (double)d +
cur_i[chan] * (outrate - d)) / (double)cur_i[chan] * (double)(outrate - d)) /
outrate; (double)outrate);
if (size == 1) if (size == 1)
*CHARP(ncp, 0) = (signed char)(cur_o >> 8); *CHARP(ncp, 0) = (signed char)(cur_o >> 24);
else if (size == 2) else if (size == 2)
*SHORTP(ncp, 0) = (short)(cur_o); *SHORTP(ncp, 0) = (short)(cur_o >> 16);
else if (size == 4) else if (size == 4)
*LONGP(ncp, 0) = (Py_Int32)(cur_o<<16); *LONGP(ncp, 0) = (Py_Int32)(cur_o);
ncp += size; ncp += size;
} }
d -= inrate; d -= inrate;