diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index 482242c00e3..eaa472a6ccd 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -8,6 +8,7 @@ import pickle import glob import pathlib import random +import shutil import subprocess import sys from test.support import unlink @@ -22,6 +23,16 @@ except ImportError: bz2 = support.import_module('bz2') from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor +has_cmdline_bunzip2 = None + +def ext_decompress(data): + global has_cmdline_bunzip2 + if has_cmdline_bunzip2 is None: + has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) + if has_cmdline_bunzip2: + return subprocess.check_output(['bunzip2'], input=data) + else: + return bz2.decompress(data) class BaseTest(unittest.TestCase): "Base for other testcases." @@ -74,24 +85,6 @@ class BaseTest(unittest.TestCase): if os.path.isfile(self.filename): os.unlink(self.filename) - if sys.platform == "win32": - # bunzip2 isn't available to run on Windows. - def decompress(self, data): - return bz2.decompress(data) - else: - def decompress(self, data): - pop = subprocess.Popen("bunzip2", shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - pop.stdin.write(data) - pop.stdin.close() - ret = pop.stdout.read() - pop.stdout.close() - if pop.wait() != 0: - ret = bz2.decompress(data) - return ret - class BZ2FileTest(BaseTest): "Test the BZ2File class." @@ -252,7 +245,7 @@ class BZ2FileTest(BaseTest): self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteChunks10(self): with BZ2File(self.filename, "w") as bz2f: @@ -264,7 +257,7 @@ class BZ2FileTest(BaseTest): bz2f.write(str) n += 1 with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteNonDefaultCompressLevel(self): expected = bz2.compress(self.TEXT, compresslevel=5) @@ -281,7 +274,7 @@ class BZ2FileTest(BaseTest): # should raise an exception. self.assertRaises(ValueError, bz2f.writelines, ["a"]) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteMethodsOnReadOnlyFile(self): with BZ2File(self.filename, "w") as bz2f: @@ -299,7 +292,7 @@ class BZ2FileTest(BaseTest): self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT * 2) + self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) def testSeekForward(self): self.createTempFile() @@ -602,7 +595,7 @@ class BZ2FileTest(BaseTest): with BZ2File(bio, "w") as bz2f: self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) - self.assertEqual(self.decompress(bio.getvalue()), self.TEXT) + self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) self.assertFalse(bio.closed) def testSeekForwardBytesIO(self): @@ -639,7 +632,7 @@ class BZ2CompressorTest(BaseTest): self.assertRaises(TypeError, bz2c.compress) data = bz2c.compress(self.TEXT) data += bz2c.flush() - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) def testCompressEmptyString(self): bz2c = BZ2Compressor() @@ -658,7 +651,7 @@ class BZ2CompressorTest(BaseTest): data += bz2c.compress(str) n += 1 data += bz2c.flush() - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) @bigmemtest(size=_4G + 100, memuse=2) def testCompress4G(self, size): @@ -838,7 +831,7 @@ class BZ2DecompressorTest(BaseTest): class CompressDecompressTest(BaseTest): def testCompress(self): data = bz2.compress(self.TEXT) - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) def testCompressEmptyString(self): text = bz2.compress(b'') @@ -888,14 +881,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT) with self.open(self.filename, "rb") as f: self.assertEqual(f.read(), self.TEXT) with self.open(self.filename, "ab") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_implicit_binary_modes(self): @@ -906,14 +899,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT) with self.open(self.filename, "r") as f: self.assertEqual(f.read(), self.TEXT) with self.open(self.filename, "a") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_text_modes(self): @@ -925,14 +918,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("ascii") + file_data = ext_decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol) with self.open(self.filename, "rt") as f: self.assertEqual(f.read(), text) with self.open(self.filename, "at") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("ascii") + file_data = ext_decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol * 2) def test_x_mode(self): @@ -973,7 +966,7 @@ class OpenTest(BaseTest): with self.open(self.filename, "wt", encoding="utf-16-le") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("utf-16-le") + file_data = ext_decompress(f.read()).decode("utf-16-le") self.assertEqual(file_data, text_native_eol) with self.open(self.filename, "rt", encoding="utf-16-le") as f: self.assertEqual(f.read(), text)