mirror of https://github.com/python/cpython
Issue #28664: test_bz2 now works on non-Windows platforms without bunzip2
(e.g. on Android).
This commit is contained in:
parent
62e32d6352
commit
5adc22b330
|
@ -7,6 +7,7 @@ import os
|
|||
import pickle
|
||||
import glob
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
from test.support import unlink
|
||||
|
@ -21,6 +22,16 @@ except ImportError:
|
|||
bz2 = support.import_module('bz2')
|
||||
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
|
||||
|
||||
has_cmdline_bunzip2 = None
|
||||
|
||||
def ext_decompress(data):
|
||||
global has_cmdline_bunzip2
|
||||
if has_cmdline_bunzip2 is None:
|
||||
has_cmdline_bunzip2 = bool(shutil.which('bunzip2'))
|
||||
if has_cmdline_bunzip2:
|
||||
return subprocess.check_output(['bunzip2'], input=data)
|
||||
else:
|
||||
return bz2.decompress(data)
|
||||
|
||||
class BaseTest(unittest.TestCase):
|
||||
"Base for other testcases."
|
||||
|
@ -73,24 +84,6 @@ class BaseTest(unittest.TestCase):
|
|||
if os.path.isfile(self.filename):
|
||||
os.unlink(self.filename)
|
||||
|
||||
if sys.platform == "win32":
|
||||
# bunzip2 isn't available to run on Windows.
|
||||
def decompress(self, data):
|
||||
return bz2.decompress(data)
|
||||
else:
|
||||
def decompress(self, data):
|
||||
pop = subprocess.Popen("bunzip2", shell=True,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
pop.stdin.write(data)
|
||||
pop.stdin.close()
|
||||
ret = pop.stdout.read()
|
||||
pop.stdout.close()
|
||||
if pop.wait() != 0:
|
||||
ret = bz2.decompress(data)
|
||||
return ret
|
||||
|
||||
|
||||
class BZ2FileTest(BaseTest):
|
||||
"Test the BZ2File class."
|
||||
|
@ -251,7 +244,7 @@ class BZ2FileTest(BaseTest):
|
|||
self.assertRaises(TypeError, bz2f.write)
|
||||
bz2f.write(self.TEXT)
|
||||
with open(self.filename, 'rb') as f:
|
||||
self.assertEqual(self.decompress(f.read()), self.TEXT)
|
||||
self.assertEqual(ext_decompress(f.read()), self.TEXT)
|
||||
|
||||
def testWriteChunks10(self):
|
||||
with BZ2File(self.filename, "w") as bz2f:
|
||||
|
@ -263,7 +256,7 @@ class BZ2FileTest(BaseTest):
|
|||
bz2f.write(str)
|
||||
n += 1
|
||||
with open(self.filename, 'rb') as f:
|
||||
self.assertEqual(self.decompress(f.read()), self.TEXT)
|
||||
self.assertEqual(ext_decompress(f.read()), self.TEXT)
|
||||
|
||||
def testWriteNonDefaultCompressLevel(self):
|
||||
expected = bz2.compress(self.TEXT, compresslevel=5)
|
||||
|
@ -280,7 +273,7 @@ class BZ2FileTest(BaseTest):
|
|||
# should raise an exception.
|
||||
self.assertRaises(ValueError, bz2f.writelines, ["a"])
|
||||
with open(self.filename, 'rb') as f:
|
||||
self.assertEqual(self.decompress(f.read()), self.TEXT)
|
||||
self.assertEqual(ext_decompress(f.read()), self.TEXT)
|
||||
|
||||
def testWriteMethodsOnReadOnlyFile(self):
|
||||
with BZ2File(self.filename, "w") as bz2f:
|
||||
|
@ -298,7 +291,7 @@ class BZ2FileTest(BaseTest):
|
|||
self.assertRaises(TypeError, bz2f.write)
|
||||
bz2f.write(self.TEXT)
|
||||
with open(self.filename, 'rb') as f:
|
||||
self.assertEqual(self.decompress(f.read()), self.TEXT * 2)
|
||||
self.assertEqual(ext_decompress(f.read()), self.TEXT * 2)
|
||||
|
||||
def testSeekForward(self):
|
||||
self.createTempFile()
|
||||
|
@ -594,7 +587,7 @@ class BZ2FileTest(BaseTest):
|
|||
with BZ2File(bio, "w") as bz2f:
|
||||
self.assertRaises(TypeError, bz2f.write)
|
||||
bz2f.write(self.TEXT)
|
||||
self.assertEqual(self.decompress(bio.getvalue()), self.TEXT)
|
||||
self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT)
|
||||
self.assertFalse(bio.closed)
|
||||
|
||||
def testSeekForwardBytesIO(self):
|
||||
|
@ -631,7 +624,7 @@ class BZ2CompressorTest(BaseTest):
|
|||
self.assertRaises(TypeError, bz2c.compress)
|
||||
data = bz2c.compress(self.TEXT)
|
||||
data += bz2c.flush()
|
||||
self.assertEqual(self.decompress(data), self.TEXT)
|
||||
self.assertEqual(ext_decompress(data), self.TEXT)
|
||||
|
||||
def testCompressEmptyString(self):
|
||||
bz2c = BZ2Compressor()
|
||||
|
@ -650,7 +643,7 @@ class BZ2CompressorTest(BaseTest):
|
|||
data += bz2c.compress(str)
|
||||
n += 1
|
||||
data += bz2c.flush()
|
||||
self.assertEqual(self.decompress(data), self.TEXT)
|
||||
self.assertEqual(ext_decompress(data), self.TEXT)
|
||||
|
||||
@bigmemtest(size=_4G + 100, memuse=2)
|
||||
def testCompress4G(self, size):
|
||||
|
@ -830,7 +823,7 @@ class BZ2DecompressorTest(BaseTest):
|
|||
class CompressDecompressTest(BaseTest):
|
||||
def testCompress(self):
|
||||
data = bz2.compress(self.TEXT)
|
||||
self.assertEqual(self.decompress(data), self.TEXT)
|
||||
self.assertEqual(ext_decompress(data), self.TEXT)
|
||||
|
||||
def testCompressEmptyString(self):
|
||||
text = bz2.compress(b'')
|
||||
|
@ -880,14 +873,14 @@ class OpenTest(BaseTest):
|
|||
with self.open(self.filename, mode) as f:
|
||||
f.write(self.TEXT)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read())
|
||||
file_data = ext_decompress(f.read())
|
||||
self.assertEqual(file_data, self.TEXT)
|
||||
with self.open(self.filename, "rb") as f:
|
||||
self.assertEqual(f.read(), self.TEXT)
|
||||
with self.open(self.filename, "ab") as f:
|
||||
f.write(self.TEXT)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read())
|
||||
file_data = ext_decompress(f.read())
|
||||
self.assertEqual(file_data, self.TEXT * 2)
|
||||
|
||||
def test_implicit_binary_modes(self):
|
||||
|
@ -898,14 +891,14 @@ class OpenTest(BaseTest):
|
|||
with self.open(self.filename, mode) as f:
|
||||
f.write(self.TEXT)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read())
|
||||
file_data = ext_decompress(f.read())
|
||||
self.assertEqual(file_data, self.TEXT)
|
||||
with self.open(self.filename, "r") as f:
|
||||
self.assertEqual(f.read(), self.TEXT)
|
||||
with self.open(self.filename, "a") as f:
|
||||
f.write(self.TEXT)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read())
|
||||
file_data = ext_decompress(f.read())
|
||||
self.assertEqual(file_data, self.TEXT * 2)
|
||||
|
||||
def test_text_modes(self):
|
||||
|
@ -917,14 +910,14 @@ class OpenTest(BaseTest):
|
|||
with self.open(self.filename, mode) as f:
|
||||
f.write(text)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read()).decode("ascii")
|
||||
file_data = ext_decompress(f.read()).decode("ascii")
|
||||
self.assertEqual(file_data, text_native_eol)
|
||||
with self.open(self.filename, "rt") as f:
|
||||
self.assertEqual(f.read(), text)
|
||||
with self.open(self.filename, "at") as f:
|
||||
f.write(text)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read()).decode("ascii")
|
||||
file_data = ext_decompress(f.read()).decode("ascii")
|
||||
self.assertEqual(file_data, text_native_eol * 2)
|
||||
|
||||
def test_x_mode(self):
|
||||
|
@ -965,7 +958,7 @@ class OpenTest(BaseTest):
|
|||
with self.open(self.filename, "wt", encoding="utf-16-le") as f:
|
||||
f.write(text)
|
||||
with open(self.filename, "rb") as f:
|
||||
file_data = self.decompress(f.read()).decode("utf-16-le")
|
||||
file_data = ext_decompress(f.read()).decode("utf-16-le")
|
||||
self.assertEqual(file_data, text_native_eol)
|
||||
with self.open(self.filename, "rt", encoding="utf-16-le") as f:
|
||||
self.assertEqual(f.read(), text)
|
||||
|
|
Loading…
Reference in New Issue