import unittest from test import test_support import zlib import random # print test_support.TESTFN def getbuf(): # This was in the original. Avoid non-repeatable sources. # Left here (unused) in case something wants to be done with it. import imp try: t = imp.find_module('test_zlib') file = t[0] except ImportError: file = open(__file__) buf = file.read() * 8 file.close() return buf class ChecksumTestCase(unittest.TestCase): # checksum test cases def test_crc32start(self): self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) def test_crc32empty(self): self.assertEqual(zlib.crc32("", 0), 0) self.assertEqual(zlib.crc32("", 1), 1) self.assertEqual(zlib.crc32("", 432), 432) def test_adler32start(self): self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) def test_adler32empty(self): self.assertEqual(zlib.adler32("", 0), 0) self.assertEqual(zlib.adler32("", 1), 1) self.assertEqual(zlib.adler32("", 432), 432) def assertEqual32(self, seen, expected): # 32-bit values masked -- checksums on 32- vs 64- bit machines # This is important if bit 31 (0x08000000L) is set. self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) def test_penguins(self): self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) class ExceptionTestCase(unittest.TestCase): # make sure we generate some expected errors def test_bigbits(self): # specifying total bits too large causes an error self.assertRaises(zlib.error, zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) def test_badcompressobj(self): # verify failure on building compress object with bad params self.assertRaises(ValueError, zlib.compressobj, 1, 8, 0) def test_baddecompressobj(self): # verify failure on building decompress object with bad params self.assertRaises(ValueError, zlib.decompressobj, 0) class CompressTestCase(unittest.TestCase): # Test compression in one go (whole message compression) def test_speech(self): # decompress(compress(data)) better be data x = zlib.compress(hamlet_scene) self.assertEqual(zlib.decompress(x), hamlet_scene) def test_speech8(self): # decompress(compress(data)) better be data -- more compression chances data = hamlet_scene * 8 x = zlib.compress(data) self.assertEqual(zlib.decompress(x), data) def test_speech16(self): # decompress(compress(data)) better be data -- more compression chances data = hamlet_scene * 16 x = zlib.compress(data) self.assertEqual(zlib.decompress(x), data) def test_speech128(self): # decompress(compress(data)) better be data -- more compression chances data = hamlet_scene * 8 * 16 x = zlib.compress(data) self.assertEqual(zlib.decompress(x), data) def test_monotonic(self): # higher compression levels should not expand compressed size data = hamlet_scene * 8 * 16 last = length = len(zlib.compress(data, 0)) self.failUnless(last > len(data), "compress level 0 always expands") for level in range(10): length = len(zlib.compress(data, level)) self.failUnless(length <= last, 'compress level %d more effective than %d!' % ( level-1, level)) last = length class CompressObjectTestCase(unittest.TestCase): # Test compression object def test_pairsmall(self): # use compress object in straightforward manner, decompress w/ object data = hamlet_scene co = zlib.compressobj(8, 8, -15) x1 = co.compress(data) x2 = co.flush() self.assertRaises(zlib.error, co.flush) # second flush should not work dco = zlib.decompressobj(-15) y1 = dco.decompress(x1 + x2) y2 = dco.flush() self.assertEqual(data, y1 + y2) def test_pair(self): # straightforward compress/decompress objects, more compression data = hamlet_scene * 8 * 16 co = zlib.compressobj(8, 8, -15) x1 = co.compress(data) x2 = co.flush() self.assertRaises(zlib.error, co.flush) # second flush should not work dco = zlib.decompressobj(-15) y1 = dco.decompress(x1 + x2) y2 = dco.flush() self.assertEqual(data, y1 + y2) def test_compressincremental(self): # compress object in steps, decompress object as one-shot data = hamlet_scene * 8 * 16 co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) combuf = ''.join(bufs) dco = zlib.decompressobj(-15) y1 = dco.decompress(''.join(bufs)) y2 = dco.flush() self.assertEqual(data, y1 + y2) def test_decompressincremental(self): # compress object in steps, decompress object in steps data = hamlet_scene * 8 * 16 co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) combuf = ''.join(bufs) self.assertEqual(data, zlib.decompress(combuf, -12, -5)) dco = zlib.decompressobj(-12) bufs = [] for i in range(0, len(combuf), 128): bufs.append(dco.decompress(combuf[i:i+128])) self.assertEqual('', dco.unconsumed_tail, ######## "(A) uct should be '': not %d long" % len(dco.unconsumed_tail)) bufs.append(dco.flush()) self.assertEqual('', dco.unconsumed_tail, ######## "(B) uct should be '': not %d long" % len(dco.unconsumed_tail)) self.assertEqual(data, ''.join(bufs)) # Failure means: "decompressobj with init options failed" def test_decompinc(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): # compress object in steps, decompress object in steps, loop sizes source = source or hamlet_scene for reps in sizes: data = source * reps co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), cx): bufs.append(co.compress(data[i:i+cx])) bufs.append(co.flush()) combuf = ''.join(bufs) self.assertEqual(data, zlib.decompress(combuf, -12, -5)) dco = zlib.decompressobj(-12) bufs = [] for i in range(0, len(combuf), dcx): bufs.append(dco.decompress(combuf[i:i+dcx])) self.assertEqual('', dco.unconsumed_tail, ######## "(A) uct should be '': not %d long" % len(dco.unconsumed_tail)) if flush: bufs.append(dco.flush()) else: while True: chunk = dco.decompress('') if chunk: bufs.append(chunk) else: break self.assertEqual('', dco.unconsumed_tail, ######## "(B) uct should be '': not %d long" % len(dco.unconsumed_tail)) self.assertEqual(data, ''.join(bufs)) # Failure means: "decompressobj with init options failed" def test_decompimax(self,sizes=[128],flush=True,source=None,cx=256,dcx=64): # compress in steps, decompress in length-restricted steps, loop sizes source = source or hamlet_scene for reps in sizes: # Check a decompression object with max_length specified data = source * reps co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), cx): bufs.append(co.compress(data[i:i+cx])) bufs.append(co.flush()) combuf = ''.join(bufs) self.assertEqual(data, zlib.decompress(combuf, -12, -5), 'compressed data failure') dco = zlib.decompressobj(-12) bufs = [] cb = combuf while cb: #max_length = 1 + len(cb)//10 chunk = dco.decompress(cb, dcx) self.failIf(len(chunk) > dcx, 'chunk too big (%d>%d)' % (len(chunk), dcx)) bufs.append(chunk) cb = dco.unconsumed_tail if flush: bufs.append(dco.flush()) else: while True: chunk = dco.decompress('', dcx) self.failIf(len(chunk) > dcx, 'chunk too big in tail (%d>%d)' % (len(chunk), dcx)) if chunk: bufs.append(chunk) else: break self.assertEqual(len(data), len(''.join(bufs))) self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') def test_decompressmaxlen(self): # Check a decompression object with max_length specified data = hamlet_scene * 8 * 16 co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) combuf = ''.join(bufs) self.assertEqual(data, zlib.decompress(combuf, -12, -5), 'compressed data failure') dco = zlib.decompressobj(-12) bufs = [] cb = combuf while cb: max_length = 1 + len(cb)//10 chunk = dco.decompress(cb, max_length) self.failIf(len(chunk) > max_length, 'chunk too big (%d>%d)' % (len(chunk),max_length)) bufs.append(chunk) cb = dco.unconsumed_tail bufs.append(dco.flush()) self.assertEqual(len(data), len(''.join(bufs))) self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') def test_decompressmaxlenflushless(self): # identical to test_decompressmaxlen except flush is replaced # with an equivalent. This works and other fails on (eg) 2.2.2 data = hamlet_scene * 8 * 16 co = zlib.compressobj(2, 8, -12, 9, 1) bufs = [] for i in range(0, len(data), 256): bufs.append(co.compress(data[i:i+256])) bufs.append(co.flush()) combuf = ''.join(bufs) self.assertEqual(data, zlib.decompress(combuf, -12, -5), 'compressed data mismatch') dco = zlib.decompressobj(-12) bufs = [] cb = combuf while cb: max_length = 1 + len(cb)//10 chunk = dco.decompress(cb, max_length) self.failIf(len(chunk) > max_length, 'chunk too big (%d>%d)' % (len(chunk),max_length)) bufs.append(chunk) cb = dco.unconsumed_tail #bufs.append(dco.flush()) while len(chunk): chunk = dco.decompress('', max_length) self.failIf(len(chunk) > max_length, 'chunk too big (%d>%d)' % (len(chunk),max_length)) bufs.append(chunk) self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') def test_maxlenmisc(self): # Misc tests of max_length dco = zlib.decompressobj(-12) self.assertRaises(ValueError, dco.decompress, "", -1) self.assertEqual('', dco.unconsumed_tail) def test_flushes(self): # Test flush() with the various options, using all the # different levels in order to provide more variations. sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] sync_opt = [getattr(zlib, opt) for opt in sync_opt if hasattr(zlib, opt)] data = hamlet_scene * 8 for sync in sync_opt: for level in range(10): obj = zlib.compressobj( level ) a = obj.compress( data[:3000] ) b = obj.flush( sync ) c = obj.compress( data[3000:] ) d = obj.flush() self.assertEqual(zlib.decompress(''.join([a,b,c,d])), data, ("Decompress failed: flush " "mode=%i, level=%i") % (sync, level)) del obj def test_odd_flush(self): # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 import random if hasattr(zlib, 'Z_SYNC_FLUSH'): # Testing on 17K of "random" data # Create compressor and decompressor objects co = zlib.compressobj(9) dco = zlib.decompressobj() # Try 17K of data # generate random data stream try: # In 2.3 and later, WichmannHill is the RNG of the bug report gen = random.WichmannHill() except AttributeError: try: # 2.2 called it Random gen = random.Random() except AttributeError: # others might simply have a single RNG gen = random gen.seed(1) data = genblock(1, 17 * 1024, generator=gen) # compress, sync-flush, and decompress first = co.compress(data) second = co.flush(zlib.Z_SYNC_FLUSH) expanded = dco.decompress(first + second) # if decompressed data is different from the input data, choke. self.assertEqual(expanded, data, "17K random source doesn't match") def test_manydecompinc(self): # Run incremental decompress test for a large range of sizes self.test_decompinc(sizes=[1<