Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2

(e.g. on Android).
This commit is contained in:
Serhiy Storchaka 2016-11-11 17:11:33 +02:00
commit e0e9d5f312
1 changed files with 26 additions and 33 deletions

View File

@ -8,6 +8,7 @@ import pickle
import glob import glob
import pathlib import pathlib
import random import random
import shutil
import subprocess import subprocess
import sys import sys
from test.support import unlink from test.support import unlink
@ -22,6 +23,16 @@ except ImportError:
bz2 = support.import_module('bz2') bz2 = support.import_module('bz2')
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = None
def ext_decompress(data):
global has_cmdline_bunzip2
if has_cmdline_bunzip2 is None:
has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
if has_cmdline_bunzip2:
return subprocess.check_output(['bunzip2'], input=data)
else:
return bz2.decompress(data)
class BaseTest(unittest.TestCase): class BaseTest(unittest.TestCase):
"Base for other testcases." "Base for other testcases."
@ -74,24 +85,6 @@ class BaseTest(unittest.TestCase):
if os.path.isfile(self.filename): if os.path.isfile(self.filename):
os.unlink(self.filename) os.unlink(self.filename)
if sys.platform == "win32":
# bunzip2 isn't available to run on Windows.
def decompress(self, data):
return bz2.decompress(data)
else:
def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
pop.stdin.write(data)
pop.stdin.close()
ret = pop.stdout.read()
pop.stdout.close()
if pop.wait() != 0:
ret = bz2.decompress(data)
return ret
class BZ2FileTest(BaseTest): class BZ2FileTest(BaseTest):
"Test the BZ2File class." "Test the BZ2File class."
@ -252,7 +245,7 @@ class BZ2FileTest(BaseTest):
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteChunks10(self): def testWriteChunks10(self):
with BZ2File(self.filename, "w") as bz2f: with BZ2File(self.filename, "w") as bz2f:
@ -264,7 +257,7 @@ class BZ2FileTest(BaseTest):
bz2f.write(str) bz2f.write(str)
n += 1 n += 1
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteNonDefaultCompressLevel(self): def testWriteNonDefaultCompressLevel(self):
expected = bz2.compress(self.TEXT, compresslevel=5) expected = bz2.compress(self.TEXT, compresslevel=5)
@ -281,7 +274,7 @@ class BZ2FileTest(BaseTest):
# should raise an exception. # should raise an exception.
self.assertRaises(ValueError, bz2f.writelines, ["a"]) self.assertRaises(ValueError, bz2f.writelines, ["a"])
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(ext_decompress(f.read()), self.TEXT)
def testWriteMethodsOnReadOnlyFile(self): def testWriteMethodsOnReadOnlyFile(self):
with BZ2File(self.filename, "w") as bz2f: with BZ2File(self.filename, "w") as bz2f:
@ -299,7 +292,7 @@ class BZ2FileTest(BaseTest):
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
with open(self.filename, 'rb') as f: with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT * 2) self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
def testSeekForward(self): def testSeekForward(self):
self.createTempFile() self.createTempFile()
@ -602,7 +595,7 @@ class BZ2FileTest(BaseTest):
with BZ2File(bio, "w") as bz2f: with BZ2File(bio, "w") as bz2f:
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
self.assertEqual(self.decompress(bio.getvalue()), self.TEXT) self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
self.assertFalse(bio.closed) self.assertFalse(bio.closed)
def testSeekForwardBytesIO(self): def testSeekForwardBytesIO(self):
@ -639,7 +632,7 @@ class BZ2CompressorTest(BaseTest):
self.assertRaises(TypeError, bz2c.compress) self.assertRaises(TypeError, bz2c.compress)
data = bz2c.compress(self.TEXT) data = bz2c.compress(self.TEXT)
data += bz2c.flush() data += bz2c.flush()
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
def testCompressEmptyString(self): def testCompressEmptyString(self):
bz2c = BZ2Compressor() bz2c = BZ2Compressor()
@ -658,7 +651,7 @@ class BZ2CompressorTest(BaseTest):
data += bz2c.compress(str) data += bz2c.compress(str)
n += 1 n += 1
data += bz2c.flush() data += bz2c.flush()
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
@bigmemtest(size=_4G + 100, memuse=2) @bigmemtest(size=_4G + 100, memuse=2)
def testCompress4G(self, size): def testCompress4G(self, size):
@ -838,7 +831,7 @@ class BZ2DecompressorTest(BaseTest):
class CompressDecompressTest(BaseTest): class CompressDecompressTest(BaseTest):
def testCompress(self): def testCompress(self):
data = bz2.compress(self.TEXT) data = bz2.compress(self.TEXT)
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(ext_decompress(data), self.TEXT)
def testCompressEmptyString(self): def testCompressEmptyString(self):
text = bz2.compress(b'') text = bz2.compress(b'')
@ -888,14 +881,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT) self.assertEqual(file_data, self.TEXT)
with self.open(self.filename, "rb") as f: with self.open(self.filename, "rb") as f:
self.assertEqual(f.read(), self.TEXT) self.assertEqual(f.read(), self.TEXT)
with self.open(self.filename, "ab") as f: with self.open(self.filename, "ab") as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2) self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self): def test_implicit_binary_modes(self):
@ -906,14 +899,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT) self.assertEqual(file_data, self.TEXT)
with self.open(self.filename, "r") as f: with self.open(self.filename, "r") as f:
self.assertEqual(f.read(), self.TEXT) self.assertEqual(f.read(), self.TEXT)
with self.open(self.filename, "a") as f: with self.open(self.filename, "a") as f:
f.write(self.TEXT) f.write(self.TEXT)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()) file_data = ext_decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2) self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self): def test_text_modes(self):
@ -925,14 +918,14 @@ class OpenTest(BaseTest):
with self.open(self.filename, mode) as f: with self.open(self.filename, mode) as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("ascii") file_data = ext_decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol) self.assertEqual(file_data, text_native_eol)
with self.open(self.filename, "rt") as f: with self.open(self.filename, "rt") as f:
self.assertEqual(f.read(), text) self.assertEqual(f.read(), text)
with self.open(self.filename, "at") as f: with self.open(self.filename, "at") as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("ascii") file_data = ext_decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol * 2) self.assertEqual(file_data, text_native_eol * 2)
def test_x_mode(self): def test_x_mode(self):
@ -973,7 +966,7 @@ class OpenTest(BaseTest):
with self.open(self.filename, "wt", encoding="utf-16-le") as f: with self.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text) f.write(text)
with open(self.filename, "rb") as f: with open(self.filename, "rb") as f:
file_data = self.decompress(f.read()).decode("utf-16-le") file_data = ext_decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol) self.assertEqual(file_data, text_native_eol)
with self.open(self.filename, "rt", encoding="utf-16-le") as f: with self.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text) self.assertEqual(f.read(), text)