Add a function bz2.open(), to match gzip.open().

This commit is contained in:
Nadeem Vawda 2012-06-04 23:32:38 +02:00
parent 50cb936bd0
commit af518c198e
4 changed files with 184 additions and 16 deletions

View File

@ -14,7 +14,8 @@ decompressing data using the bzip2 compression algorithm.
The :mod:`bz2` module contains: The :mod:`bz2` module contains:
* The :class:`BZ2File` class for reading and writing compressed files. * The :func:`.open` function and :class:`BZ2File` class for reading and
writing compressed files.
* The :class:`BZ2Compressor` and :class:`BZ2Decompressor` classes for * The :class:`BZ2Compressor` and :class:`BZ2Decompressor` classes for
incremental (de)compression. incremental (de)compression.
* The :func:`compress` and :func:`decompress` functions for one-shot * The :func:`compress` and :func:`decompress` functions for one-shot
@ -26,9 +27,37 @@ All of the classes in this module may safely be accessed from multiple threads.
(De)compression of files (De)compression of files
------------------------ ------------------------
.. function:: open(filename, mode='r', compresslevel=9, encoding=None, errors=None, newline=None)
Open a bzip2-compressed file in binary or text mode, returning a :term:`file
object`.
As with the constructor for :class:`BZ2File`, the *filename* argument can be
an actual filename (a :class:`str` or :class:`bytes` object), or an existing
file object to read from or write to.
The *mode* argument can be any of ``'r'``, ``'rb'``, ``'w'``, ``'wb'``,
``'a'``, or ``'ab'`` for binary mode, or ``'rt'``, ``'wt'``, or ``'at'`` for
text mode. The default is ``'rb'``.
The *compresslevel* argument is an integer from 1 to 9, as for the
:class:`BZ2File` constructor.
For binary mode, this function is equivalent to the :class:`BZ2File`
constructor: ``BZ2File(filename, mode, compresslevel=compresslevel)``. In
this case, the *encoding*, *errors* and *newline* arguments must not be
provided.
For text mode, a :class:`BZ2File` object is created, and wrapped in an
:class:`io.TextIOWrapper` instance with the specified encoding, error
handling behavior, and line ending(s).
.. versionadded:: 3.3
.. class:: BZ2File(filename, mode='r', buffering=None, compresslevel=9) .. class:: BZ2File(filename, mode='r', buffering=None, compresslevel=9)
Open a bzip2-compressed file. Open a bzip2-compressed file in binary mode.
If *filename* is a :class:`str` or :class:`bytes` object, open the named file If *filename* is a :class:`str` or :class:`bytes` object, open the named file
directly. Otherwise, *filename* should be a :term:`file object`, which will directly. Otherwise, *filename* should be a :term:`file object`, which will

View File

@ -4,11 +4,12 @@ This module provides a file interface, classes for incremental
(de)compression, and functions for one-shot (de)compression. (de)compression, and functions for one-shot (de)compression.
""" """
__all__ = ["BZ2File", "BZ2Compressor", "BZ2Decompressor", "compress", __all__ = ["BZ2File", "BZ2Compressor", "BZ2Decompressor",
"decompress"] "open", "compress", "decompress"]
__author__ = "Nadeem Vawda <nadeem.vawda@gmail.com>" __author__ = "Nadeem Vawda <nadeem.vawda@gmail.com>"
import builtins
import io import io
import warnings import warnings
@ -91,7 +92,7 @@ class BZ2File(io.BufferedIOBase):
raise ValueError("Invalid mode: {!r}".format(mode)) raise ValueError("Invalid mode: {!r}".format(mode))
if isinstance(filename, (str, bytes)): if isinstance(filename, (str, bytes)):
self._fp = open(filename, mode) self._fp = builtins.open(filename, mode)
self._closefp = True self._closefp = True
self._mode = mode_code self._mode = mode_code
elif hasattr(filename, "read") or hasattr(filename, "write"): elif hasattr(filename, "read") or hasattr(filename, "write"):
@ -391,6 +392,46 @@ class BZ2File(io.BufferedIOBase):
return self._pos return self._pos
def open(filename, mode="rb", compresslevel=9,
encoding=None, errors=None, newline=None):
"""Open a bzip2-compressed file in binary or text mode.
The filename argument can be an actual filename (a str or bytes object), or
an existing file object to read from or write to.
The mode argument can be "r", "rb", "w", "wb", "a" or "ab" for binary mode,
or "rt", "wt" or "at" for text mode. The default mode is "rb", and the
default compresslevel is 9.
For binary mode, this function is equivalent to the BZ2File constructor:
BZ2File(filename, mode, compresslevel). In this case, the encoding, errors
and newline arguments must not be provided.
For text mode, a BZ2File object is created, and wrapped in an
io.TextIOWrapper instance with the specified encoding, error handling
behavior, and line ending(s).
"""
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode")
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode")
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode")
bz_mode = mode.replace("t", "")
binary_file = BZ2File(filename, bz_mode, compresslevel=compresslevel)
if "t" in mode:
return io.TextIOWrapper(binary_file, encoding, errors, newline)
else:
return binary_file
def compress(data, compresslevel=9): def compress(data, compresslevel=9):
"""Compress a block of data. """Compress a block of data.

View File

@ -48,6 +48,13 @@ class BaseTest(unittest.TestCase):
TEXT = b''.join(TEXT_LINES) TEXT = b''.join(TEXT_LINES)
DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
def setUp(self):
self.filename = TESTFN
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
if has_cmdline_bunzip2: if has_cmdline_bunzip2:
def decompress(self, data): def decompress(self, data):
pop = subprocess.Popen("bunzip2", shell=True, pop = subprocess.Popen("bunzip2", shell=True,
@ -70,13 +77,6 @@ class BaseTest(unittest.TestCase):
class BZ2FileTest(BaseTest): class BZ2FileTest(BaseTest):
"Test BZ2File type miscellaneous methods." "Test BZ2File type miscellaneous methods."
def setUp(self):
self.filename = TESTFN
def tearDown(self):
if os.path.isfile(self.filename):
os.unlink(self.filename)
def createTempFile(self, streams=1): def createTempFile(self, streams=1):
with open(self.filename, "wb") as f: with open(self.filename, "wb") as f:
f.write(self.DATA * streams) f.write(self.DATA * streams)
@ -650,9 +650,7 @@ class BZ2DecompressorTest(BaseTest):
decompressed = None decompressed = None
class FuncTest(BaseTest): class CompressDecompressTest(BaseTest):
"Test module functions"
def testCompress(self): def testCompress(self):
data = bz2.compress(self.TEXT) data = bz2.compress(self.TEXT)
self.assertEqual(self.decompress(data), self.TEXT) self.assertEqual(self.decompress(data), self.TEXT)
@ -672,12 +670,109 @@ class FuncTest(BaseTest):
text = bz2.decompress(self.DATA * 5) text = bz2.decompress(self.DATA * 5)
self.assertEqual(text, self.TEXT * 5) self.assertEqual(text, self.TEXT * 5)
class OpenTest(BaseTest):
def test_binary_modes(self):
with bz2.open(self.filename, "wb") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
with bz2.open(self.filename, "rb") as f:
self.assertEqual(f.read(), self.TEXT)
with bz2.open(self.filename, "ab") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_implicit_binary_modes(self):
# Test implicit binary modes (no "b" or "t" in mode string).
with bz2.open(self.filename, "w") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read())
self.assertEqual(file_data, self.TEXT)
with bz2.open(self.filename, "r") as f:
self.assertEqual(f.read(), self.TEXT)
with bz2.open(self.filename, "a") as f:
f.write(self.TEXT)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read())
self.assertEqual(file_data, self.TEXT * 2)
def test_text_modes(self):
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
with bz2.open(self.filename, "wt") as f:
f.write(text)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol)
with bz2.open(self.filename, "rt") as f:
self.assertEqual(f.read(), text)
with bz2.open(self.filename, "at") as f:
f.write(text)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read()).decode("ascii")
self.assertEqual(file_data, text_native_eol * 2)
def test_fileobj(self):
with bz2.open(BytesIO(self.DATA), "r") as f:
self.assertEqual(f.read(), self.TEXT)
with bz2.open(BytesIO(self.DATA), "rb") as f:
self.assertEqual(f.read(), self.TEXT)
text = self.TEXT.decode("ascii")
with bz2.open(BytesIO(self.DATA), "rt") as f:
self.assertEqual(f.read(), text)
def test_bad_params(self):
# Test invalid parameter combinations.
with self.assertRaises(ValueError):
bz2.open(self.filename, "wbt")
with self.assertRaises(ValueError):
bz2.open(self.filename, "rb", encoding="utf-8")
with self.assertRaises(ValueError):
bz2.open(self.filename, "rb", errors="ignore")
with self.assertRaises(ValueError):
bz2.open(self.filename, "rb", newline="\n")
def test_encoding(self):
# Test non-default encoding.
text = self.TEXT.decode("ascii")
text_native_eol = text.replace("\n", os.linesep)
with bz2.open(self.filename, "wt", encoding="utf-16-le") as f:
f.write(text)
with open(self.filename, "rb") as f:
file_data = bz2.decompress(f.read()).decode("utf-16-le")
self.assertEqual(file_data, text_native_eol)
with bz2.open(self.filename, "rt", encoding="utf-16-le") as f:
self.assertEqual(f.read(), text)
def test_encoding_error_handler(self):
# Test with non-default encoding error handler.
with bz2.open(self.filename, "wb") as f:
f.write(b"foo\xffbar")
with bz2.open(self.filename, "rt", encoding="ascii", errors="ignore") \
as f:
self.assertEqual(f.read(), "foobar")
def test_newline(self):
# Test with explicit newline (universal newline mode disabled).
text = self.TEXT.decode("ascii")
with bz2.open(self.filename, "wt", newline="\n") as f:
f.write(text)
with bz2.open(self.filename, "rt", newline="\r") as f:
self.assertEqual(f.readlines(), [text])
def test_main(): def test_main():
support.run_unittest( support.run_unittest(
BZ2FileTest, BZ2FileTest,
BZ2CompressorTest, BZ2CompressorTest,
BZ2DecompressorTest, BZ2DecompressorTest,
FuncTest CompressDecompressTest,
OpenTest,
) )
support.reap_children() support.reap_children()

View File

@ -15,6 +15,9 @@ Core and Builtins
Library Library
------- -------
- The bz2 module now contains an open() function, allowing compressed files to
conveniently be opened in text mode as well as binary mode.
- BZ2File.__init__() now accepts a file object as its first argument, rather - BZ2File.__init__() now accepts a file object as its first argument, rather
than requiring a separate "fileobj" argument. than requiring a separate "fileobj" argument.