diff --git a/Lib/test/test_zlib.py b/Lib/test/test_zlib.py index dff628d2cf7..8683879f192 100644 --- a/Lib/test/test_zlib.py +++ b/Lib/test/test_zlib.py @@ -74,25 +74,12 @@ class ExceptionTestCase(unittest.TestCase): class CompressTestCase(unittest.TestCase): # Test compression in one go (whole message compression) def test_speech(self): - # decompress(compress(data)) better be data - x = zlib.compress(hamlet_scene) - self.assertEqual(zlib.decompress(x), hamlet_scene) - - def test_speech8(self): - # decompress(compress(data)) better be data -- more compression chances - data = hamlet_scene * 8 - x = zlib.compress(data) - self.assertEqual(zlib.decompress(x), data) - - def test_speech16(self): - # decompress(compress(data)) better be data -- more compression chances - data = hamlet_scene * 16 - x = zlib.compress(data) - self.assertEqual(zlib.decompress(x), data) + x = zlib.compress(HAMLET_SCENE) + self.assertEqual(zlib.decompress(x), HAMLET_SCENE) def test_speech128(self): - # decompress(compress(data)) better be data -- more compression chances - data = hamlet_scene * 8 * 16 + # compress more data + data = HAMLET_SCENE * 128 x = zlib.compress(data) self.assertEqual(zlib.decompress(x), data) @@ -101,22 +88,10 @@ class CompressTestCase(unittest.TestCase): class CompressObjectTestCase(unittest.TestCase): # Test compression object - def test_pairsmall(self): - # use compress object in straightforward manner, decompress w/ object - data = hamlet_scene - co = zlib.compressobj() - x1 = co.compress(data) - x2 = co.flush() - self.assertRaises(zlib.error, co.flush) # second flush should not work - dco = zlib.decompressobj() - y1 = dco.decompress(x1 + x2) - y2 = dco.flush() - self.assertEqual(data, y1 + y2) - def test_pair(self): - # straightforward compress/decompress objects, more compression - data = hamlet_scene * 8 * 16 - co = zlib.compressobj(zlib.Z_BEST_COMPRESSION, zlib.DEFLATED) + # straightforward compress/decompress objects + data = HAMLET_SCENE * 128 + co = zlib.compressobj() x1 = co.compress(data) x2 = co.flush() self.assertRaises(zlib.error, co.flush) # second flush should not work @@ -133,16 +108,16 @@ class CompressObjectTestCase(unittest.TestCase): memlevel = 9 strategy = zlib.Z_FILTERED co = zlib.compressobj(level, method, wbits, memlevel, strategy) - x1 = co.compress(hamlet_scene) + x1 = co.compress(HAMLET_SCENE) x2 = co.flush() dco = zlib.decompressobj(wbits) y1 = dco.decompress(x1 + x2) y2 = dco.flush() - self.assertEqual(hamlet_scene, y1 + y2) + self.assertEqual(HAMLET_SCENE, y1 + y2) def test_compressincremental(self): # compress object in steps, decompress object as one-shot - data = hamlet_scene * 8 * 16 + data = HAMLET_SCENE * 128 co = zlib.compressobj() bufs = [] for i in range(0, len(data), 256): @@ -155,13 +130,14 @@ class CompressObjectTestCase(unittest.TestCase): y2 = dco.flush() self.assertEqual(data, y1 + y2) - def test_decompressincremental(self): + def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): # compress object in steps, decompress object in steps - data = hamlet_scene * 8 * 16 + source = source or HAMLET_SCENE + data = source * 128 co = zlib.compressobj() bufs = [] - for i in range(0, len(data), 256): - bufs.append(co.compress(data[i:i+256])) + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) bufs.append(co.flush()) combuf = ''.join(bufs) @@ -169,96 +145,59 @@ class CompressObjectTestCase(unittest.TestCase): dco = zlib.decompressobj() bufs = [] - for i in range(0, len(combuf), 128): - bufs.append(dco.decompress(combuf[i:i+128])) + for i in range(0, len(combuf), dcx): + bufs.append(dco.decompress(combuf[i:i+dcx])) self.assertEqual('', dco.unconsumed_tail, ######## "(A) uct should be '': not %d long" % - len(dco.unconsumed_tail)) - bufs.append(dco.flush()) + len(dco.unconsumed_tail)) + if flush: + bufs.append(dco.flush()) + else: + while True: + chunk = dco.decompress('') + if chunk: + bufs.append(chunk) + else: + break self.assertEqual('', dco.unconsumed_tail, ######## - "(B) uct should be '': not %d long" % - len(dco.unconsumed_tail)) + "(B) uct should be '': not %d long" % + len(dco.unconsumed_tail)) self.assertEqual(data, ''.join(bufs)) # Failure means: "decompressobj with init options failed" - def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): - # compress object in steps, decompress object in steps, loop sizes - source = source or hamlet_scene - for reps in sizes: - data = source * reps - co = zlib.compressobj() - bufs = [] - for i in range(0, len(data), cx): - bufs.append(co.compress(data[i:i+cx])) - bufs.append(co.flush()) - combuf = ''.join(bufs) + def test_decompincflush(self): + self.test_decompinc(flush=True) - self.assertEqual(data, zlib.decompress(combuf)) - - dco = zlib.decompressobj() - bufs = [] - for i in range(0, len(combuf), dcx): - bufs.append(dco.decompress(combuf[i:i+dcx])) - self.assertEqual('', dco.unconsumed_tail, ######## - "(A) uct should be '': not %d long" % - len(dco.unconsumed_tail)) - if flush: - bufs.append(dco.flush()) - else: - while True: - chunk = dco.decompress('') - if chunk: - bufs.append(chunk) - else: - break - self.assertEqual('', dco.unconsumed_tail, ######## - "(B) uct should be '': not %d long" % - len(dco.unconsumed_tail)) - self.assertEqual(data, ''.join(bufs)) - # Failure means: "decompressobj with init options failed" - - def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): - # compress in steps, decompress in length-restricted steps, loop sizes - source = source or hamlet_scene - for reps in sizes: - # Check a decompression object with max_length specified - data = source * reps - co = zlib.compressobj() - bufs = [] - for i in range(0, len(data), cx): - bufs.append(co.compress(data[i:i+cx])) - bufs.append(co.flush()) - combuf = ''.join(bufs) - self.assertEqual(data, zlib.decompress(combuf), - 'compressed data failure') - - dco = zlib.decompressobj() - bufs = [] - cb = combuf - while cb: - #max_length = 1 + len(cb)//10 - chunk = dco.decompress(cb, dcx) - self.failIf(len(chunk) > dcx, - 'chunk too big (%d>%d)' % (len(chunk), dcx)) - bufs.append(chunk) - cb = dco.unconsumed_tail - if flush: - bufs.append(dco.flush()) - else: - while True: - chunk = dco.decompress('', dcx) - self.failIf(len(chunk) > dcx, - 'chunk too big in tail (%d>%d)' % (len(chunk), dcx)) - if chunk: - bufs.append(chunk) - else: - break - self.assertEqual(len(data), len(''.join(bufs))) - self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') - - def test_decompressmaxlen(self): + def test_decompimax(self, source=None, cx=256, dcx=64): + # compress in steps, decompress in length-restricted steps + source = source or HAMLET_SCENE # Check a decompression object with max_length specified - data = hamlet_scene * 8 * 16 + data = source * 128 + co = zlib.compressobj() + bufs = [] + for i in range(0, len(data), cx): + bufs.append(co.compress(data[i:i+cx])) + bufs.append(co.flush()) + combuf = ''.join(bufs) + self.assertEqual(data, zlib.decompress(combuf), + 'compressed data failure') + + dco = zlib.decompressobj() + bufs = [] + cb = combuf + while cb: + #max_length = 1 + len(cb)//10 + chunk = dco.decompress(cb, dcx) + self.failIf(len(chunk) > dcx, + 'chunk too big (%d>%d)' % (len(chunk), dcx)) + bufs.append(chunk) + cb = dco.unconsumed_tail + bufs.append(dco.flush()) + self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + + def test_decompressmaxlen(self, flush=False): + # Check a decompression object with max_length specified + data = HAMLET_SCENE * 128 co = zlib.compressobj() bufs = [] for i in range(0, len(data), 256): @@ -278,42 +217,18 @@ class CompressObjectTestCase(unittest.TestCase): 'chunk too big (%d>%d)' % (len(chunk),max_length)) bufs.append(chunk) cb = dco.unconsumed_tail - bufs.append(dco.flush()) - self.assertEqual(len(data), len(''.join(bufs))) + if flush: + bufs.append(dco.flush()) + else: + while chunk: + chunk = dco.decompress('', max_length) + self.failIf(len(chunk) > max_length, + 'chunk too big (%d>%d)' % (len(chunk),max_length)) + bufs.append(chunk) self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') - def test_decompressmaxlenflushless(self): - # identical to test_decompressmaxlen except flush is replaced - # with an equivalent. This works and other fails on (eg) 2.2.2 - data = hamlet_scene * 8 * 16 - co = zlib.compressobj() - bufs = [] - for i in range(0, len(data), 256): - bufs.append(co.compress(data[i:i+256])) - bufs.append(co.flush()) - combuf = ''.join(bufs) - self.assertEqual(data, zlib.decompress(combuf), - 'compressed data mismatch') - - dco = zlib.decompressobj() - bufs = [] - cb = combuf - while cb: - max_length = 1 + len(cb)//10 - chunk = dco.decompress(cb, max_length) - self.failIf(len(chunk) > max_length, - 'chunk too big (%d>%d)' % (len(chunk),max_length)) - bufs.append(chunk) - cb = dco.unconsumed_tail - - #bufs.append(dco.flush()) - while len(chunk): - chunk = dco.decompress('', max_length) - self.failIf(len(chunk) > max_length, - 'chunk too big (%d>%d)' % (len(chunk),max_length)) - bufs.append(chunk) - - self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') + def test_decompressmaxlenflush(self): + self.test_decompressmaxlen(flush=True) def test_maxlenmisc(self): # Misc tests of max_length @@ -327,7 +242,7 @@ class CompressObjectTestCase(unittest.TestCase): sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] sync_opt = [getattr(zlib, opt) for opt in sync_opt if hasattr(zlib, opt)] - data = hamlet_scene * 8 + data = HAMLET_SCENE * 8 for sync in sync_opt: for level in range(10): @@ -349,7 +264,7 @@ class CompressObjectTestCase(unittest.TestCase): # Testing on 17K of "random" data # Create compressor and decompressor objects - co = zlib.compressobj(9) + co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) dco = zlib.decompressobj() # Try 17K of data @@ -375,23 +290,6 @@ class CompressObjectTestCase(unittest.TestCase): # if decompressed data is different from the input data, choke. self.assertEqual(expanded, data, "17K random source doesn't match") - def test_manydecompinc(self): - # Run incremental decompress test for a large range of sizes - self.test_decompinc(sizes=[1<