Simplify usage of LZMAFile's fileobj support, like with BZ2File.

This commit is contained in:
Nadeem Vawda 2012-06-04 23:34:07 +02:00
parent af518c198e
commit 33c34da574
5 changed files with 133 additions and 143 deletions

View File

@ -29,18 +29,20 @@ from multiple threads, it is necessary to protect it with a lock.
Reading and writing compressed files
------------------------------------
.. class:: LZMAFile(filename=None, mode="r", \*, fileobj=None, format=None, check=-1, preset=None, filters=None)
.. class:: LZMAFile(filename=None, mode="r", \*, format=None, check=-1, preset=None, filters=None)
Open an LZMA-compressed file.
Open an LZMA-compressed file in binary mode.
An :class:`LZMAFile` can wrap an existing :term:`file object` (given by
*fileobj*), or operate directly on a named file (named by *filename*).
Exactly one of these two parameters should be provided. If *fileobj* is
provided, it is not closed when the :class:`LZMAFile` is closed.
An :class:`LZMAFile` can wrap an already-open :term:`file object`, or operate
directly on a named file. The *filename* argument specifies either the file
object to wrap, or the name of the file to open (as a :class:`str` or
:class:`bytes` object). When wrapping an existing file object, the wrapped
file will not be closed when the :class:`LZMAFile` is closed.
The *mode* argument can be either ``"r"`` for reading (default), ``"w"`` for
overwriting, or ``"a"`` for appending. If *fileobj* is provided, a mode of
``"w"`` does not truncate the file, and is instead equivalent to ``"a"``.
overwriting, or ``"a"`` for appending. If *filename* is an existing file
object, a mode of ``"w"`` does not truncate the file, and is instead
equivalent to ``"a"``.
When opening a file for reading, the input file may be the concatenation of
multiple separate compressed streams. These are transparently decoded as a
@ -360,7 +362,7 @@ Writing compressed data to an already-open file::
import lzma
with open("file.xz", "wb") as f:
f.write(b"This data will not be compressed\n")
with lzma.LZMAFile(fileobj=f, mode="w") as lzf:
with lzma.LZMAFile(f, "w") as lzf:
lzf.write(b"This *will* be compressed\n")
f.write(b"Not compressed\n")

View File

@ -46,13 +46,12 @@ class LZMAFile(io.BufferedIOBase):
"""
def __init__(self, filename=None, mode="r", *,
fileobj=None, format=None, check=-1,
preset=None, filters=None):
"""Open an LZMA-compressed file.
format=None, check=-1, preset=None, filters=None):
"""Open an LZMA-compressed file in binary mode.
If filename is given, open the named file. Otherwise, operate on
the file object given by fileobj. Exactly one of these two
parameters should be provided.
filename can be either an actual file name (given as a str or
bytes object), in which case the named file is opened, or it can
be an existing file object to read from or write to.
mode can be "r" for reading (default), "w" for (over)writing, or
"a" for appending.
@ -119,16 +118,16 @@ class LZMAFile(io.BufferedIOBase):
else:
raise ValueError("Invalid mode: {!r}".format(mode))
if filename is not None and fileobj is None:
if isinstance(filename, (str, bytes)):
mode += "b"
self._fp = open(filename, mode)
self._closefp = True
self._mode = mode_code
elif fileobj is not None and filename is None:
self._fp = fileobj
elif hasattr(filename, "read") or hasattr(filename, "write"):
self._fp = filename
self._mode = mode_code
else:
raise ValueError("Must give exactly one of filename and fileobj")
raise TypeError("filename must be a str or bytes object, or a file")
def close(self):
"""Flush and close the file.

View File

@ -1681,8 +1681,7 @@ class TarFile(object):
except ImportError:
raise CompressionError("lzma module is not available")
fileobj = lzma.LZMAFile(filename=name if fileobj is None else None,
mode=mode, fileobj=fileobj, preset=preset)
fileobj = lzma.LZMAFile(fileobj or name, mode, preset=preset)
try:
t = cls.taropen(name, mode, fileobj, **kwargs)

View File

@ -358,11 +358,11 @@ class TempFile:
class FileTestCase(unittest.TestCase):
def test_init(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
pass
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
pass
with LZMAFile(fileobj=BytesIO(), mode="a") as f:
with LZMAFile(BytesIO(), "a") as f:
pass
def test_init_with_filename(self):
@ -376,88 +376,84 @@ class FileTestCase(unittest.TestCase):
def test_init_bad_mode(self):
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode=(3, "x"))
LZMAFile(BytesIO(COMPRESSED_XZ), (3, "x"))
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="")
LZMAFile(BytesIO(COMPRESSED_XZ), "")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="x")
LZMAFile(BytesIO(COMPRESSED_XZ), "x")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="rb")
LZMAFile(BytesIO(COMPRESSED_XZ), "rb")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="r+")
LZMAFile(BytesIO(COMPRESSED_XZ), "r+")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="wb")
LZMAFile(BytesIO(COMPRESSED_XZ), "wb")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="w+")
LZMAFile(BytesIO(COMPRESSED_XZ), "w+")
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="rw")
LZMAFile(BytesIO(COMPRESSED_XZ), "rw")
def test_init_bad_check(self):
with self.assertRaises(TypeError):
LZMAFile(fileobj=BytesIO(), mode="w", check=b"asd")
LZMAFile(BytesIO(), "w", check=b"asd")
# CHECK_UNKNOWN and anything above CHECK_ID_MAX should be invalid.
with self.assertRaises(LZMAError):
LZMAFile(fileobj=BytesIO(), mode="w", check=lzma.CHECK_UNKNOWN)
LZMAFile(BytesIO(), "w", check=lzma.CHECK_UNKNOWN)
with self.assertRaises(LZMAError):
LZMAFile(fileobj=BytesIO(), mode="w", check=lzma.CHECK_ID_MAX + 3)
LZMAFile(BytesIO(), "w", check=lzma.CHECK_ID_MAX + 3)
# Cannot specify a check with mode="r".
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), check=lzma.CHECK_NONE)
LZMAFile(BytesIO(COMPRESSED_XZ), check=lzma.CHECK_NONE)
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), check=lzma.CHECK_CRC32)
LZMAFile(BytesIO(COMPRESSED_XZ), check=lzma.CHECK_CRC32)
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), check=lzma.CHECK_CRC64)
LZMAFile(BytesIO(COMPRESSED_XZ), check=lzma.CHECK_CRC64)
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), check=lzma.CHECK_SHA256)
LZMAFile(BytesIO(COMPRESSED_XZ), check=lzma.CHECK_SHA256)
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), check=lzma.CHECK_UNKNOWN)
LZMAFile(BytesIO(COMPRESSED_XZ), check=lzma.CHECK_UNKNOWN)
def test_init_bad_preset(self):
with self.assertRaises(TypeError):
LZMAFile(fileobj=BytesIO(), mode="w", preset=4.39)
LZMAFile(BytesIO(), "w", preset=4.39)
with self.assertRaises(LZMAError):
LZMAFile(fileobj=BytesIO(), mode="w", preset=10)
LZMAFile(BytesIO(), "w", preset=10)
with self.assertRaises(LZMAError):
LZMAFile(fileobj=BytesIO(), mode="w", preset=23)
LZMAFile(BytesIO(), "w", preset=23)
with self.assertRaises(OverflowError):
LZMAFile(fileobj=BytesIO(), mode="w", preset=-1)
LZMAFile(BytesIO(), "w", preset=-1)
with self.assertRaises(OverflowError):
LZMAFile(fileobj=BytesIO(), mode="w", preset=-7)
LZMAFile(BytesIO(), "w", preset=-7)
with self.assertRaises(TypeError):
LZMAFile(fileobj=BytesIO(), mode="w", preset="foo")
LZMAFile(BytesIO(), "w", preset="foo")
# Cannot specify a preset with mode="r".
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), preset=3)
LZMAFile(BytesIO(COMPRESSED_XZ), preset=3)
def test_init_bad_filter_spec(self):
with self.assertRaises(TypeError):
LZMAFile(fileobj=BytesIO(), mode="w", filters=[b"wobsite"])
LZMAFile(BytesIO(), "w", filters=[b"wobsite"])
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w", filters=[{"xyzzy": 3}])
LZMAFile(BytesIO(), "w", filters=[{"xyzzy": 3}])
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w", filters=[{"id": 98765}])
LZMAFile(BytesIO(), "w", filters=[{"id": 98765}])
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w",
LZMAFile(BytesIO(), "w",
filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}])
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w",
LZMAFile(BytesIO(), "w",
filters=[{"id": lzma.FILTER_DELTA, "foo": 0}])
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w",
LZMAFile(BytesIO(), "w",
filters=[{"id": lzma.FILTER_X86, "foo": 0}])
def test_init_with_preset_and_filters(self):
with self.assertRaises(ValueError):
LZMAFile(fileobj=BytesIO(), mode="w", format=lzma.FORMAT_RAW,
preset=6, filters=FILTERS_RAW_1)
def test_init_with_filename_and_fileobj(self):
with self.assertRaises(ValueError):
LZMAFile("/dev/null", fileobj=BytesIO())
LZMAFile(BytesIO(), "w", format=lzma.FORMAT_RAW,
preset=6, filters=FILTERS_RAW_1)
def test_close(self):
with BytesIO(COMPRESSED_XZ) as src:
f = LZMAFile(fileobj=src)
f = LZMAFile(src)
f.close()
# LZMAFile.close() should not close the underlying file object.
self.assertFalse(src.closed)
@ -476,7 +472,7 @@ class FileTestCase(unittest.TestCase):
f.close()
def test_closed(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
try:
self.assertFalse(f.closed)
f.read()
@ -485,7 +481,7 @@ class FileTestCase(unittest.TestCase):
f.close()
self.assertTrue(f.closed)
f = LZMAFile(fileobj=BytesIO(), mode="w")
f = LZMAFile(BytesIO(), "w")
try:
self.assertFalse(f.closed)
finally:
@ -493,7 +489,7 @@ class FileTestCase(unittest.TestCase):
self.assertTrue(f.closed)
def test_fileno(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
try:
self.assertRaises(UnsupportedOperation, f.fileno)
finally:
@ -509,7 +505,7 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.fileno)
def test_seekable(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
try:
self.assertTrue(f.seekable())
f.read()
@ -518,7 +514,7 @@ class FileTestCase(unittest.TestCase):
f.close()
self.assertRaises(ValueError, f.seekable)
f = LZMAFile(fileobj=BytesIO(), mode="w")
f = LZMAFile(BytesIO(), "w")
try:
self.assertFalse(f.seekable())
finally:
@ -527,7 +523,7 @@ class FileTestCase(unittest.TestCase):
src = BytesIO(COMPRESSED_XZ)
src.seekable = lambda: False
f = LZMAFile(fileobj=src)
f = LZMAFile(src)
try:
self.assertFalse(f.seekable())
finally:
@ -535,7 +531,7 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.seekable)
def test_readable(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
try:
self.assertTrue(f.readable())
f.read()
@ -544,7 +540,7 @@ class FileTestCase(unittest.TestCase):
f.close()
self.assertRaises(ValueError, f.readable)
f = LZMAFile(fileobj=BytesIO(), mode="w")
f = LZMAFile(BytesIO(), "w")
try:
self.assertFalse(f.readable())
finally:
@ -552,7 +548,7 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.readable)
def test_writable(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
try:
self.assertFalse(f.writable())
f.read()
@ -561,7 +557,7 @@ class FileTestCase(unittest.TestCase):
f.close()
self.assertRaises(ValueError, f.writable)
f = LZMAFile(fileobj=BytesIO(), mode="w")
f = LZMAFile(BytesIO(), "w")
try:
self.assertTrue(f.writable())
finally:
@ -569,50 +565,46 @@ class FileTestCase(unittest.TestCase):
self.assertRaises(ValueError, f.writable)
def test_read(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE)) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE)) as f:
self.assertEqual(f.read(), INPUT)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ),
format=lzma.FORMAT_XZ) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ), format=lzma.FORMAT_XZ) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE),
format=lzma.FORMAT_ALONE) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE), format=lzma.FORMAT_ALONE) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_1),
with LZMAFile(BytesIO(COMPRESSED_RAW_1),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_1) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_2),
with LZMAFile(BytesIO(COMPRESSED_RAW_2),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_2) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_3),
with LZMAFile(BytesIO(COMPRESSED_RAW_3),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_3) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_4),
with LZMAFile(BytesIO(COMPRESSED_RAW_4),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_4) as f:
self.assertEqual(f.read(), INPUT)
self.assertEqual(f.read(), b"")
def test_read_0(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertEqual(f.read(0), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE)) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE)) as f:
self.assertEqual(f.read(0), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ),
format=lzma.FORMAT_XZ) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ), format=lzma.FORMAT_XZ) as f:
self.assertEqual(f.read(0), b"")
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE),
format=lzma.FORMAT_ALONE) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE), format=lzma.FORMAT_ALONE) as f:
self.assertEqual(f.read(0), b"")
def test_read_10(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
chunks = []
while True:
result = f.read(10)
@ -623,11 +615,11 @@ class FileTestCase(unittest.TestCase):
self.assertEqual(b"".join(chunks), INPUT)
def test_read_multistream(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ * 5)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f:
self.assertEqual(f.read(), INPUT * 5)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ + COMPRESSED_ALONE)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ + COMPRESSED_ALONE)) as f:
self.assertEqual(f.read(), INPUT * 2)
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_3 * 4),
with LZMAFile(BytesIO(COMPRESSED_RAW_3 * 4),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_3) as f:
self.assertEqual(f.read(), INPUT * 4)
@ -637,7 +629,7 @@ class FileTestCase(unittest.TestCase):
saved_buffer_size = lzma._BUFFER_SIZE
lzma._BUFFER_SIZE = len(COMPRESSED_XZ)
try:
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ * 5)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f:
self.assertEqual(f.read(), INPUT * 5)
finally:
lzma._BUFFER_SIZE = saved_buffer_size
@ -649,20 +641,20 @@ class FileTestCase(unittest.TestCase):
self.assertEqual(f.read(), b"")
def test_read_incomplete(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ[:128])) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ[:128])) as f:
self.assertRaises(EOFError, f.read)
def test_read_bad_args(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
f.close()
self.assertRaises(ValueError, f.read)
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(ValueError, f.read)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertRaises(TypeError, f.read, None)
def test_read1(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
blocks = []
while True:
result = f.read1()
@ -673,11 +665,11 @@ class FileTestCase(unittest.TestCase):
self.assertEqual(f.read1(), b"")
def test_read1_0(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertEqual(f.read1(0), b"")
def test_read1_10(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
blocks = []
while True:
result = f.read1(10)
@ -688,7 +680,7 @@ class FileTestCase(unittest.TestCase):
self.assertEqual(f.read1(), b"")
def test_read1_multistream(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ * 5)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ * 5)) as f:
blocks = []
while True:
result = f.read1()
@ -699,78 +691,76 @@ class FileTestCase(unittest.TestCase):
self.assertEqual(f.read1(), b"")
def test_read1_bad_args(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
f.close()
self.assertRaises(ValueError, f.read1)
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(ValueError, f.read1)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertRaises(TypeError, f.read1, None)
def test_peek(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
result = f.peek()
self.assertGreater(len(result), 0)
self.assertTrue(INPUT.startswith(result))
self.assertEqual(f.read(), INPUT)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
result = f.peek(10)
self.assertGreater(len(result), 0)
self.assertTrue(INPUT.startswith(result))
self.assertEqual(f.read(), INPUT)
def test_peek_bad_args(self):
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(ValueError, f.peek)
def test_iterator(self):
with BytesIO(INPUT) as f:
lines = f.readlines()
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertListEqual(list(iter(f)), lines)
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE)) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE)) as f:
self.assertListEqual(list(iter(f)), lines)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ),
format=lzma.FORMAT_XZ) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ), format=lzma.FORMAT_XZ) as f:
self.assertListEqual(list(iter(f)), lines)
with LZMAFile(fileobj=BytesIO(COMPRESSED_ALONE),
format=lzma.FORMAT_ALONE) as f:
with LZMAFile(BytesIO(COMPRESSED_ALONE), format=lzma.FORMAT_ALONE) as f:
self.assertListEqual(list(iter(f)), lines)
with LZMAFile(fileobj=BytesIO(COMPRESSED_RAW_2),
with LZMAFile(BytesIO(COMPRESSED_RAW_2),
format=lzma.FORMAT_RAW, filters=FILTERS_RAW_2) as f:
self.assertListEqual(list(iter(f)), lines)
def test_readline(self):
with BytesIO(INPUT) as f:
lines = f.readlines()
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
for line in lines:
self.assertEqual(f.readline(), line)
def test_readlines(self):
with BytesIO(INPUT) as f:
lines = f.readlines()
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertListEqual(f.readlines(), lines)
def test_write(self):
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w") as f:
with LZMAFile(dst, "w") as f:
f.write(INPUT)
expected = lzma.compress(INPUT)
self.assertEqual(dst.getvalue(), expected)
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w", format=lzma.FORMAT_XZ) as f:
with LZMAFile(dst, "w", format=lzma.FORMAT_XZ) as f:
f.write(INPUT)
expected = lzma.compress(INPUT, format=lzma.FORMAT_XZ)
self.assertEqual(dst.getvalue(), expected)
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w", format=lzma.FORMAT_ALONE) as f:
with LZMAFile(dst, "w", format=lzma.FORMAT_ALONE) as f:
f.write(INPUT)
expected = lzma.compress(INPUT, format=lzma.FORMAT_ALONE)
self.assertEqual(dst.getvalue(), expected)
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w", format=lzma.FORMAT_RAW,
with LZMAFile(dst, "w", format=lzma.FORMAT_RAW,
filters=FILTERS_RAW_2) as f:
f.write(INPUT)
expected = lzma.compress(INPUT, format=lzma.FORMAT_RAW,
@ -779,7 +769,7 @@ class FileTestCase(unittest.TestCase):
def test_write_10(self):
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w") as f:
with LZMAFile(dst, "w") as f:
for start in range(0, len(INPUT), 10):
f.write(INPUT[start:start+10])
expected = lzma.compress(INPUT)
@ -791,11 +781,11 @@ class FileTestCase(unittest.TestCase):
part3 = INPUT[1536:]
expected = b"".join(lzma.compress(x) for x in (part1, part2, part3))
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w") as f:
with LZMAFile(dst, "w") as f:
f.write(part1)
with LZMAFile(fileobj=dst, mode="a") as f:
with LZMAFile(dst, "a") as f:
f.write(part2)
with LZMAFile(fileobj=dst, mode="a") as f:
with LZMAFile(dst, "a") as f:
f.write(part3)
self.assertEqual(dst.getvalue(), expected)
@ -827,12 +817,12 @@ class FileTestCase(unittest.TestCase):
unlink(TESTFN)
def test_write_bad_args(self):
f = LZMAFile(fileobj=BytesIO(), mode="w")
f = LZMAFile(BytesIO(), "w")
f.close()
self.assertRaises(ValueError, f.write, b"foo")
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ), mode="r") as f:
with LZMAFile(BytesIO(COMPRESSED_XZ), "r") as f:
self.assertRaises(ValueError, f.write, b"bar")
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(TypeError, f.write, None)
self.assertRaises(TypeError, f.write, "text")
self.assertRaises(TypeError, f.write, 789)
@ -841,75 +831,75 @@ class FileTestCase(unittest.TestCase):
with BytesIO(INPUT) as f:
lines = f.readlines()
with BytesIO() as dst:
with LZMAFile(fileobj=dst, mode="w") as f:
with LZMAFile(dst, "w") as f:
f.writelines(lines)
expected = lzma.compress(INPUT)
self.assertEqual(dst.getvalue(), expected)
def test_seek_forward(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.seek(555)
self.assertEqual(f.read(), INPUT[555:])
def test_seek_forward_across_streams(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ * 2)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ * 2)) as f:
f.seek(len(INPUT) + 123)
self.assertEqual(f.read(), INPUT[123:])
def test_seek_forward_relative_to_current(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.read(100)
f.seek(1236, 1)
self.assertEqual(f.read(), INPUT[1336:])
def test_seek_forward_relative_to_end(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.seek(-555, 2)
self.assertEqual(f.read(), INPUT[-555:])
def test_seek_backward(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.read(1001)
f.seek(211)
self.assertEqual(f.read(), INPUT[211:])
def test_seek_backward_across_streams(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ * 2)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ * 2)) as f:
f.read(len(INPUT) + 333)
f.seek(737)
self.assertEqual(f.read(), INPUT[737:] + INPUT)
def test_seek_backward_relative_to_end(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.seek(-150, 2)
self.assertEqual(f.read(), INPUT[-150:])
def test_seek_past_end(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.seek(len(INPUT) + 9001)
self.assertEqual(f.tell(), len(INPUT))
self.assertEqual(f.read(), b"")
def test_seek_past_start(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
f.seek(-88)
self.assertEqual(f.tell(), 0)
self.assertEqual(f.read(), INPUT)
def test_seek_bad_args(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
f.close()
self.assertRaises(ValueError, f.seek, 0)
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
self.assertRaises(ValueError, f.seek, 0)
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
self.assertRaises(ValueError, f.seek, 0, 3)
self.assertRaises(ValueError, f.seek, 9, ())
self.assertRaises(TypeError, f.seek, None)
self.assertRaises(TypeError, f.seek, b"derp")
def test_tell(self):
with LZMAFile(fileobj=BytesIO(COMPRESSED_XZ)) as f:
with LZMAFile(BytesIO(COMPRESSED_XZ)) as f:
pos = 0
while True:
self.assertEqual(f.tell(), pos)
@ -918,14 +908,14 @@ class FileTestCase(unittest.TestCase):
break
pos += len(result)
self.assertEqual(f.tell(), len(INPUT))
with LZMAFile(fileobj=BytesIO(), mode="w") as f:
with LZMAFile(BytesIO(), "w") as f:
for pos in range(0, len(INPUT), 144):
self.assertEqual(f.tell(), pos)
f.write(INPUT[pos:pos+144])
self.assertEqual(f.tell(), len(INPUT))
def test_tell_bad_args(self):
f = LZMAFile(fileobj=BytesIO(COMPRESSED_XZ))
f = LZMAFile(BytesIO(COMPRESSED_XZ))
f.close()
self.assertRaises(ValueError, f.tell)

View File

@ -18,8 +18,8 @@ Library
- The bz2 module now contains an open() function, allowing compressed files to
conveniently be opened in text mode as well as binary mode.
- BZ2File.__init__() now accepts a file object as its first argument, rather
than requiring a separate "fileobj" argument.
- BZ2File.__init__() and LZMAFile.__init__() now accept a file object as their
first argument, rather than requiring a separate "fileobj" argument.
- gzip.open() now accepts file objects as well as filenames.