From 5adc22b330d63d50ea41624c0ead9b6a221729fa Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Fri, 11 Nov 2016 17:10:24 +0200 Subject: [PATCH] Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2 (e.g. on Android). --- Lib/test/test_bz2.py | 59 +++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py index 478921a1d2a..34f6478356f 100644 --- a/Lib/test/test_bz2.py +++ b/Lib/test/test_bz2.py @@ -7,6 +7,7 @@ import os import pickle import glob import random +import shutil import subprocess import sys from test.support import unlink @@ -21,6 +22,16 @@ except ImportError: bz2 = support.import_module('bz2') from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor +has_cmdline_bunzip2 = None + +def ext_decompress(data): + global has_cmdline_bunzip2 + if has_cmdline_bunzip2 is None: + has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) + if has_cmdline_bunzip2: + return subprocess.check_output(['bunzip2'], input=data) + else: + return bz2.decompress(data) class BaseTest(unittest.TestCase): "Base for other testcases." @@ -73,24 +84,6 @@ class BaseTest(unittest.TestCase): if os.path.isfile(self.filename): os.unlink(self.filename) - if sys.platform == "win32": - # bunzip2 isn't available to run on Windows. - def decompress(self, data): - return bz2.decompress(data) - else: - def decompress(self, data): - pop = subprocess.Popen("bunzip2", shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - pop.stdin.write(data) - pop.stdin.close() - ret = pop.stdout.read() - pop.stdout.close() - if pop.wait() != 0: - ret = bz2.decompress(data) - return ret - class BZ2FileTest(BaseTest): "Test the BZ2File class." @@ -251,7 +244,7 @@ class BZ2FileTest(BaseTest): self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteChunks10(self): with BZ2File(self.filename, "w") as bz2f: @@ -263,7 +256,7 @@ class BZ2FileTest(BaseTest): bz2f.write(str) n += 1 with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteNonDefaultCompressLevel(self): expected = bz2.compress(self.TEXT, compresslevel=5) @@ -280,7 +273,7 @@ class BZ2FileTest(BaseTest): # should raise an exception. self.assertRaises(ValueError, bz2f.writelines, ["a"]) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT) + self.assertEqual(ext_decompress(f.read()), self.TEXT) def testWriteMethodsOnReadOnlyFile(self): with BZ2File(self.filename, "w") as bz2f: @@ -298,7 +291,7 @@ class BZ2FileTest(BaseTest): self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) with open(self.filename, 'rb') as f: - self.assertEqual(self.decompress(f.read()), self.TEXT * 2) + self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) def testSeekForward(self): self.createTempFile() @@ -594,7 +587,7 @@ class BZ2FileTest(BaseTest): with BZ2File(bio, "w") as bz2f: self.assertRaises(TypeError, bz2f.write) bz2f.write(self.TEXT) - self.assertEqual(self.decompress(bio.getvalue()), self.TEXT) + self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) self.assertFalse(bio.closed) def testSeekForwardBytesIO(self): @@ -631,7 +624,7 @@ class BZ2CompressorTest(BaseTest): self.assertRaises(TypeError, bz2c.compress) data = bz2c.compress(self.TEXT) data += bz2c.flush() - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) def testCompressEmptyString(self): bz2c = BZ2Compressor() @@ -650,7 +643,7 @@ class BZ2CompressorTest(BaseTest): data += bz2c.compress(str) n += 1 data += bz2c.flush() - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) @bigmemtest(size=_4G + 100, memuse=2) def testCompress4G(self, size): @@ -830,7 +823,7 @@ class BZ2DecompressorTest(BaseTest): class CompressDecompressTest(BaseTest): def testCompress(self): data = bz2.compress(self.TEXT) - self.assertEqual(self.decompress(data), self.TEXT) + self.assertEqual(ext_decompress(data), self.TEXT) def testCompressEmptyString(self): text = bz2.compress(b'') @@ -880,14 +873,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT) with self.open(self.filename, "rb") as f: self.assertEqual(f.read(), self.TEXT) with self.open(self.filename, "ab") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_implicit_binary_modes(self): @@ -898,14 +891,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT) with self.open(self.filename, "r") as f: self.assertEqual(f.read(), self.TEXT) with self.open(self.filename, "a") as f: f.write(self.TEXT) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()) + file_data = ext_decompress(f.read()) self.assertEqual(file_data, self.TEXT * 2) def test_text_modes(self): @@ -917,14 +910,14 @@ class OpenTest(BaseTest): with self.open(self.filename, mode) as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("ascii") + file_data = ext_decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol) with self.open(self.filename, "rt") as f: self.assertEqual(f.read(), text) with self.open(self.filename, "at") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("ascii") + file_data = ext_decompress(f.read()).decode("ascii") self.assertEqual(file_data, text_native_eol * 2) def test_x_mode(self): @@ -965,7 +958,7 @@ class OpenTest(BaseTest): with self.open(self.filename, "wt", encoding="utf-16-le") as f: f.write(text) with open(self.filename, "rb") as f: - file_data = self.decompress(f.read()).decode("utf-16-le") + file_data = ext_decompress(f.read()).decode("utf-16-le") self.assertEqual(file_data, text_native_eol) with self.open(self.filename, "rt", encoding="utf-16-le") as f: self.assertEqual(f.read(), text)