mirror of https://github.com/python/cpython
gh-115961: Improve tests for compressed file-like objects (GH-115963)
* Increase coverage for compressed file-like objects initialized with a file name, an open file object, a file object opened by file descriptor, and a file-like object without name and mode attributes (io.BytesIO) * Increase coverage for name, fileno(), mode, readable(), writable(), seekable() in different modes and states * No longer skip tests with bytes names * Test objects implementing the path protocol, not just pathlib.Path.
This commit is contained in:
parent
02beb9f020
commit
e72576c48b
|
@ -3,19 +3,19 @@ from test.support import bigmemtest, _4G
|
|||
|
||||
import array
|
||||
import unittest
|
||||
import io
|
||||
from io import BytesIO, DEFAULT_BUFFER_SIZE
|
||||
import os
|
||||
import pickle
|
||||
import glob
|
||||
import tempfile
|
||||
import pathlib
|
||||
import random
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from test.support import import_helper
|
||||
from test.support import threading_helper
|
||||
from test.support.os_helper import unlink
|
||||
from test.support.os_helper import unlink, FakePath
|
||||
import _compression
|
||||
import sys
|
||||
|
||||
|
@ -537,12 +537,136 @@ class BZ2FileTest(BaseTest):
|
|||
with BZ2File(self.filename) as bz2f:
|
||||
self.assertEqual(bz2f.read(), data1 + data2)
|
||||
|
||||
def testOpenFilename(self):
|
||||
with BZ2File(self.filename, "wb") as f:
|
||||
f.write(b'content')
|
||||
self.assertIsInstance(f.fileno(), int)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
with BZ2File(self.filename, "ab") as f:
|
||||
f.write(b'appendix')
|
||||
self.assertIsInstance(f.fileno(), int)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
with BZ2File(self.filename, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'contentappendix')
|
||||
self.assertIsInstance(f.fileno(), int)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
with self.assertRaises(ValueError):
|
||||
f.fileno()
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
def testOpenFileWithName(self):
|
||||
with open(self.filename, 'wb') as raw:
|
||||
with BZ2File(raw, 'wb') as f:
|
||||
f.write(b'content')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
with open(self.filename, 'ab') as raw:
|
||||
with BZ2File(raw, 'ab') as f:
|
||||
f.write(b'appendix')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
with open(self.filename, 'rb') as raw:
|
||||
with BZ2File(raw, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'contentappendix')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
with self.assertRaises(ValueError):
|
||||
f.fileno()
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
def testOpenFileWithoutName(self):
|
||||
bio = BytesIO()
|
||||
with BZ2File(bio, 'wb') as f:
|
||||
f.write(b'content')
|
||||
self.assertRaises(io.UnsupportedOperation, f.fileno)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
|
||||
with BZ2File(bio, 'ab') as f:
|
||||
f.write(b'appendix')
|
||||
self.assertRaises(io.UnsupportedOperation, f.fileno)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
|
||||
bio.seek(0)
|
||||
with BZ2File(bio, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'contentappendix')
|
||||
self.assertRaises(io.UnsupportedOperation, f.fileno)
|
||||
with self.assertRaises(ValueError):
|
||||
f.fileno()
|
||||
|
||||
def testOpenFileWithIntName(self):
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
|
||||
with open(fd, 'wb') as raw:
|
||||
with BZ2File(raw, 'wb') as f:
|
||||
f.write(b'content')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
|
||||
with open(fd, 'ab') as raw:
|
||||
with BZ2File(raw, 'ab') as f:
|
||||
f.write(b'appendix')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
|
||||
fd = os.open(self.filename, os.O_RDONLY)
|
||||
with open(fd, 'rb') as raw:
|
||||
with BZ2File(raw, 'rb') as f:
|
||||
self.assertEqual(f.read(), b'contentappendix')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
with self.assertRaises(ValueError):
|
||||
f.fileno()
|
||||
|
||||
def testOpenBytesFilename(self):
|
||||
str_filename = self.filename
|
||||
try:
|
||||
bytes_filename = str_filename.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
self.skipTest("Temporary file name needs to be ASCII")
|
||||
bytes_filename = os.fsencode(str_filename)
|
||||
with BZ2File(bytes_filename, "wb") as f:
|
||||
f.write(self.DATA)
|
||||
with BZ2File(bytes_filename, "rb") as f:
|
||||
|
@ -552,7 +676,7 @@ class BZ2FileTest(BaseTest):
|
|||
self.assertEqual(f.read(), self.DATA)
|
||||
|
||||
def testOpenPathLikeFilename(self):
|
||||
filename = pathlib.Path(self.filename)
|
||||
filename = FakePath(self.filename)
|
||||
with BZ2File(filename, "wb") as f:
|
||||
f.write(self.DATA)
|
||||
with BZ2File(filename, "rb") as f:
|
||||
|
|
|
@ -5,7 +5,6 @@ import array
|
|||
import functools
|
||||
import io
|
||||
import os
|
||||
import pathlib
|
||||
import struct
|
||||
import sys
|
||||
import unittest
|
||||
|
@ -79,16 +78,18 @@ class TestGzip(BaseTest):
|
|||
f.close()
|
||||
|
||||
def test_write_read_with_pathlike_file(self):
|
||||
filename = pathlib.Path(self.filename)
|
||||
filename = os_helper.FakePath(self.filename)
|
||||
with gzip.GzipFile(filename, 'w') as f:
|
||||
f.write(data1 * 50)
|
||||
self.assertIsInstance(f.name, str)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
with gzip.GzipFile(filename, 'a') as f:
|
||||
f.write(data1)
|
||||
with gzip.GzipFile(filename) as f:
|
||||
d = f.read()
|
||||
self.assertEqual(d, data1 * 51)
|
||||
self.assertIsInstance(f.name, str)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
|
||||
# The following test_write_xy methods test that write accepts
|
||||
# the corresponding bytes-like object type as input
|
||||
|
@ -472,13 +473,118 @@ class TestGzip(BaseTest):
|
|||
with io.TextIOWrapper(f, encoding="ascii") as t:
|
||||
self.assertEqual(t.readlines(), lines)
|
||||
|
||||
def test_fileobj_with_name(self):
|
||||
with open(self.filename, "xb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="x") as f:
|
||||
f.write(b'one')
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
with open(self.filename, "wb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="w") as f:
|
||||
f.write(b'two')
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
with open(self.filename, "ab") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="a") as f:
|
||||
f.write(b'three')
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
with open(self.filename, "rb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="r") as f:
|
||||
self.assertEqual(f.read(), b'twothree')
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.mode, gzip.READ)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, raw.name)
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.READ)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
def test_fileobj_from_fdopen(self):
|
||||
# Issue #13781: Opening a GzipFile for writing fails when using a
|
||||
# fileobj created with os.fdopen().
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
|
||||
with os.fdopen(fd, "wb") as f:
|
||||
with gzip.GzipFile(fileobj=f, mode="w") as g:
|
||||
pass
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
|
||||
with os.fdopen(fd, "xb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="x") as f:
|
||||
f.write(b'one')
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
|
||||
with os.fdopen(fd, "wb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="w") as f:
|
||||
f.write(b'two')
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
|
||||
fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
|
||||
with os.fdopen(fd, "ab") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="a") as f:
|
||||
f.write(b'three')
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
|
||||
fd = os.open(self.filename, os.O_RDONLY)
|
||||
with os.fdopen(fd, "rb") as raw:
|
||||
with gzip.GzipFile(fileobj=raw, mode="r") as f:
|
||||
self.assertEqual(f.read(), b'twothree')
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
|
||||
def test_fileobj_mode(self):
|
||||
gzip.GzipFile(self.filename, "wb").close()
|
||||
|
@ -508,17 +614,69 @@ class TestGzip(BaseTest):
|
|||
|
||||
def test_bytes_filename(self):
|
||||
str_filename = self.filename
|
||||
try:
|
||||
bytes_filename = str_filename.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
self.skipTest("Temporary file name needs to be ASCII")
|
||||
bytes_filename = os.fsencode(str_filename)
|
||||
with gzip.GzipFile(bytes_filename, "wb") as f:
|
||||
f.write(data1 * 50)
|
||||
self.assertEqual(f.name, bytes_filename)
|
||||
with gzip.GzipFile(bytes_filename, "rb") as f:
|
||||
self.assertEqual(f.read(), data1 * 50)
|
||||
self.assertEqual(f.name, bytes_filename)
|
||||
# Sanity check that we are actually operating on the right file.
|
||||
with gzip.GzipFile(str_filename, "rb") as f:
|
||||
self.assertEqual(f.read(), data1 * 50)
|
||||
self.assertEqual(f.name, str_filename)
|
||||
|
||||
def test_fileobj_without_name(self):
|
||||
bio = io.BytesIO()
|
||||
with gzip.GzipFile(fileobj=bio, mode='wb') as f:
|
||||
f.write(data1 * 50)
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(io.UnsupportedOperation, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.WRITE)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
bio.seek(0)
|
||||
with gzip.GzipFile(fileobj=bio, mode='rb') as f:
|
||||
self.assertEqual(f.read(), data1 * 50)
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(io.UnsupportedOperation, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.READ)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertEqual(f.name, '')
|
||||
self.assertRaises(AttributeError, f.fileno)
|
||||
self.assertEqual(f.mode, gzip.READ)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
|
||||
def test_fileobj_and_filename(self):
|
||||
filename2 = self.filename + 'new'
|
||||
with (open(self.filename, 'wb') as fileobj,
|
||||
gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='wb') as f):
|
||||
f.write(data1 * 50)
|
||||
self.assertEqual(f.name, filename2)
|
||||
with (open(self.filename, 'rb') as fileobj,
|
||||
gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='rb') as f):
|
||||
self.assertEqual(f.read(), data1 * 50)
|
||||
self.assertEqual(f.name, filename2)
|
||||
# Sanity check that we are actually operating on the right file.
|
||||
with gzip.GzipFile(self.filename, 'rb') as f:
|
||||
self.assertEqual(f.read(), data1 * 50)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
|
||||
def test_decompress_limited(self):
|
||||
"""Decompressed data buffering should be limited"""
|
||||
|
@ -707,13 +865,16 @@ class TestOpen(BaseTest):
|
|||
self.assertEqual(file_data, uncompressed)
|
||||
|
||||
def test_pathlike_file(self):
|
||||
filename = pathlib.Path(self.filename)
|
||||
filename = os_helper.FakePath(self.filename)
|
||||
with gzip.open(filename, "wb") as f:
|
||||
f.write(data1 * 50)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
with gzip.open(filename, "ab") as f:
|
||||
f.write(data1)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
with gzip.open(filename) as f:
|
||||
self.assertEqual(f.read(), data1 * 51)
|
||||
self.assertEqual(f.name, self.filename)
|
||||
|
||||
def test_implicit_binary_modes(self):
|
||||
# Test implicit binary modes (no "b" or "t" in mode string).
|
||||
|
|
|
@ -2,7 +2,6 @@ import _compression
|
|||
import array
|
||||
from io import BytesIO, UnsupportedOperation, DEFAULT_BUFFER_SIZE
|
||||
import os
|
||||
import pathlib
|
||||
import pickle
|
||||
import random
|
||||
import sys
|
||||
|
@ -12,7 +11,7 @@ import unittest
|
|||
from test.support import _4G, bigmemtest
|
||||
from test.support.import_helper import import_module
|
||||
from test.support.os_helper import (
|
||||
TESTFN, unlink
|
||||
TESTFN, unlink, FakePath
|
||||
)
|
||||
|
||||
lzma = import_module("lzma")
|
||||
|
@ -548,7 +547,7 @@ class FileTestCase(unittest.TestCase):
|
|||
pass
|
||||
|
||||
def test_init_with_PathLike_filename(self):
|
||||
filename = pathlib.Path(TESTFN)
|
||||
filename = FakePath(TESTFN)
|
||||
with TempFile(filename, COMPRESSED_XZ):
|
||||
with LZMAFile(filename) as f:
|
||||
self.assertEqual(f.read(), INPUT)
|
||||
|
@ -585,11 +584,10 @@ class FileTestCase(unittest.TestCase):
|
|||
self.addCleanup(unlink, TESTFN)
|
||||
for mode in ("x", "xb"):
|
||||
unlink(TESTFN)
|
||||
with LZMAFile(TESTFN, mode):
|
||||
with LZMAFile(TESTFN, mode) as f:
|
||||
pass
|
||||
with self.assertRaises(FileExistsError):
|
||||
with LZMAFile(TESTFN, mode):
|
||||
pass
|
||||
LZMAFile(TESTFN, mode)
|
||||
|
||||
def test_init_bad_mode(self):
|
||||
with self.assertRaises(ValueError):
|
||||
|
@ -867,17 +865,59 @@ class FileTestCase(unittest.TestCase):
|
|||
with LZMAFile(TESTFN) as f:
|
||||
self.assertEqual(f.read(), INPUT)
|
||||
self.assertEqual(f.read(), b"")
|
||||
self.assertIsInstance(f.fileno(), int)
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
def test_read_from_file_with_bytes_filename(self):
|
||||
try:
|
||||
bytes_filename = TESTFN.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
self.skipTest("Temporary file name needs to be ASCII")
|
||||
bytes_filename = os.fsencode(TESTFN)
|
||||
with TempFile(TESTFN, COMPRESSED_XZ):
|
||||
with LZMAFile(bytes_filename) as f:
|
||||
self.assertEqual(f.read(), INPUT)
|
||||
self.assertEqual(f.read(), b"")
|
||||
|
||||
def test_read_from_fileobj(self):
|
||||
with TempFile(TESTFN, COMPRESSED_XZ):
|
||||
with open(TESTFN, 'rb') as raw:
|
||||
with LZMAFile(raw) as f:
|
||||
self.assertEqual(f.read(), INPUT)
|
||||
self.assertEqual(f.read(), b"")
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
def test_read_from_fileobj_with_int_name(self):
|
||||
with TempFile(TESTFN, COMPRESSED_XZ):
|
||||
fd = os.open(TESTFN, os.O_RDONLY)
|
||||
with open(fd, 'rb') as raw:
|
||||
with LZMAFile(raw) as f:
|
||||
self.assertEqual(f.read(), INPUT)
|
||||
self.assertEqual(f.read(), b"")
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), True)
|
||||
self.assertIs(f.writable(), False)
|
||||
self.assertIs(f.seekable(), True)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
def test_read_incomplete(self):
|
||||
with LZMAFile(BytesIO(COMPRESSED_XZ[:128])) as f:
|
||||
self.assertRaises(EOFError, f.read)
|
||||
|
@ -1051,6 +1091,17 @@ class FileTestCase(unittest.TestCase):
|
|||
try:
|
||||
with LZMAFile(TESTFN, "w") as f:
|
||||
f.write(INPUT)
|
||||
self.assertIsInstance(f.fileno(), int)
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
expected = lzma.compress(INPUT)
|
||||
with open(TESTFN, "rb") as f:
|
||||
self.assertEqual(f.read(), expected)
|
||||
|
@ -1058,10 +1109,7 @@ class FileTestCase(unittest.TestCase):
|
|||
unlink(TESTFN)
|
||||
|
||||
def test_write_to_file_with_bytes_filename(self):
|
||||
try:
|
||||
bytes_filename = TESTFN.encode("ascii")
|
||||
except UnicodeEncodeError:
|
||||
self.skipTest("Temporary file name needs to be ASCII")
|
||||
bytes_filename = os.fsencode(TESTFN)
|
||||
try:
|
||||
with LZMAFile(bytes_filename, "w") as f:
|
||||
f.write(INPUT)
|
||||
|
@ -1071,6 +1119,51 @@ class FileTestCase(unittest.TestCase):
|
|||
finally:
|
||||
unlink(TESTFN)
|
||||
|
||||
def test_write_to_fileobj(self):
|
||||
try:
|
||||
with open(TESTFN, "wb") as raw:
|
||||
with LZMAFile(raw, "w") as f:
|
||||
f.write(INPUT)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
expected = lzma.compress(INPUT)
|
||||
with open(TESTFN, "rb") as f:
|
||||
self.assertEqual(f.read(), expected)
|
||||
finally:
|
||||
unlink(TESTFN)
|
||||
|
||||
def test_write_to_fileobj_with_int_name(self):
|
||||
try:
|
||||
fd = os.open(TESTFN, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
|
||||
with open(fd, 'wb') as raw:
|
||||
with LZMAFile(raw, "w") as f:
|
||||
f.write(INPUT)
|
||||
self.assertEqual(f.fileno(), raw.fileno())
|
||||
self.assertIs(f.readable(), False)
|
||||
self.assertIs(f.writable(), True)
|
||||
self.assertIs(f.seekable(), False)
|
||||
self.assertIs(f.closed, False)
|
||||
self.assertIs(f.closed, True)
|
||||
self.assertRaises(ValueError, f.fileno)
|
||||
self.assertRaises(ValueError, f.readable)
|
||||
self.assertRaises(ValueError, f.writable)
|
||||
self.assertRaises(ValueError, f.seekable)
|
||||
|
||||
expected = lzma.compress(INPUT)
|
||||
with open(TESTFN, "rb") as f:
|
||||
self.assertEqual(f.read(), expected)
|
||||
finally:
|
||||
unlink(TESTFN)
|
||||
|
||||
def test_write_append_to_file(self):
|
||||
part1 = INPUT[:1024]
|
||||
part2 = INPUT[1024:1536]
|
||||
|
@ -1276,7 +1369,7 @@ class OpenTestCase(unittest.TestCase):
|
|||
self.assertEqual(f.read(), INPUT * 2)
|
||||
|
||||
def test_with_pathlike_filename(self):
|
||||
filename = pathlib.Path(TESTFN)
|
||||
filename = FakePath(TESTFN)
|
||||
with TempFile(filename):
|
||||
with lzma.open(filename, "wb") as f:
|
||||
f.write(INPUT)
|
||||
|
|
|
@ -507,14 +507,32 @@ class CommonReadTest(ReadTest):
|
|||
with tarfile.open(support.findfile('recursion.tar', subdir='archivetestdata')):
|
||||
pass
|
||||
|
||||
def test_extractfile_name(self):
|
||||
def test_extractfile_attrs(self):
|
||||
# gh-74468: TarFile.name must name a file, not a parent archive.
|
||||
file = self.tar.getmember('ustar/regtype')
|
||||
with self.tar.extractfile(file) as fobj:
|
||||
self.assertEqual(fobj.name, 'ustar/regtype')
|
||||
self.assertRaises(AttributeError, fobj.fileno)
|
||||
self.assertIs(fobj.readable(), True)
|
||||
self.assertIs(fobj.writable(), False)
|
||||
if self.is_stream:
|
||||
self.assertRaises(AttributeError, fobj.seekable)
|
||||
else:
|
||||
self.assertIs(fobj.seekable(), True)
|
||||
self.assertIs(fobj.closed, False)
|
||||
self.assertIs(fobj.closed, True)
|
||||
self.assertEqual(fobj.name, 'ustar/regtype')
|
||||
self.assertRaises(AttributeError, fobj.fileno)
|
||||
self.assertIs(fobj.readable(), True)
|
||||
self.assertIs(fobj.writable(), False)
|
||||
if self.is_stream:
|
||||
self.assertRaises(AttributeError, fobj.seekable)
|
||||
else:
|
||||
self.assertIs(fobj.seekable(), True)
|
||||
|
||||
|
||||
class MiscReadTestBase(CommonReadTest):
|
||||
is_stream = False
|
||||
def requires_name_attribute(self):
|
||||
pass
|
||||
|
||||
|
@ -807,6 +825,7 @@ class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
|
|||
class StreamReadTest(CommonReadTest, unittest.TestCase):
|
||||
|
||||
prefix="r|"
|
||||
is_stream = True
|
||||
|
||||
def test_read_through(self):
|
||||
# Issue #11224: A poorly designed _FileInFile.read() method
|
||||
|
|
|
@ -4,7 +4,6 @@ import importlib.util
|
|||
import io
|
||||
import itertools
|
||||
import os
|
||||
import pathlib
|
||||
import posixpath
|
||||
import struct
|
||||
import subprocess
|
||||
|
@ -25,7 +24,7 @@ from test.support import (
|
|||
captured_stdout, captured_stderr, requires_subprocess
|
||||
)
|
||||
from test.support.os_helper import (
|
||||
TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count
|
||||
TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count, FakePath
|
||||
)
|
||||
|
||||
|
||||
|
@ -160,7 +159,7 @@ class AbstractTestsWithSourceFile:
|
|||
self.zip_open_test(f, self.compression)
|
||||
|
||||
def test_open_with_pathlike(self):
|
||||
path = pathlib.Path(TESTFN2)
|
||||
path = FakePath(TESTFN2)
|
||||
self.zip_open_test(path, self.compression)
|
||||
with zipfile.ZipFile(path, "r", self.compression) as zipfp:
|
||||
self.assertIsInstance(zipfp.filename, str)
|
||||
|
@ -447,6 +446,27 @@ class AbstractTestsWithSourceFile:
|
|||
self.assertEqual(zipfp.read('file1'), b'data1')
|
||||
self.assertEqual(zipfp.read('file2'), b'data2')
|
||||
|
||||
def test_zipextfile_attrs(self):
|
||||
fname = "somefile.txt"
|
||||
with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
|
||||
zipfp.writestr(fname, "bogus")
|
||||
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
|
||||
with zipfp.open(fname) as fid:
|
||||
self.assertEqual(fid.name, fname)
|
||||
self.assertRaises(io.UnsupportedOperation, fid.fileno)
|
||||
self.assertEqual(fid.mode, 'r')
|
||||
self.assertIs(fid.readable(), True)
|
||||
self.assertIs(fid.writable(), False)
|
||||
self.assertIs(fid.seekable(), True)
|
||||
self.assertIs(fid.closed, False)
|
||||
self.assertIs(fid.closed, True)
|
||||
self.assertEqual(fid.name, fname)
|
||||
self.assertEqual(fid.mode, 'r')
|
||||
self.assertRaises(io.UnsupportedOperation, fid.fileno)
|
||||
self.assertRaises(ValueError, fid.readable)
|
||||
self.assertIs(fid.writable(), False)
|
||||
self.assertRaises(ValueError, fid.seekable)
|
||||
|
||||
def tearDown(self):
|
||||
unlink(TESTFN)
|
||||
|
@ -578,17 +598,16 @@ class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
|
|||
|
||||
def test_io_on_closed_zipextfile(self):
|
||||
fname = "somefile.txt"
|
||||
with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
|
||||
with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp:
|
||||
zipfp.writestr(fname, "bogus")
|
||||
|
||||
with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
|
||||
with zipfp.open(fname) as fid:
|
||||
fid.close()
|
||||
self.assertIs(fid.closed, True)
|
||||
self.assertRaises(ValueError, fid.read)
|
||||
self.assertRaises(ValueError, fid.seek, 0)
|
||||
self.assertRaises(ValueError, fid.tell)
|
||||
self.assertRaises(ValueError, fid.readable)
|
||||
self.assertRaises(ValueError, fid.seekable)
|
||||
|
||||
def test_write_to_readonly(self):
|
||||
"""Check that trying to call write() on a readonly ZipFile object
|
||||
|
@ -1285,6 +1304,21 @@ class AbstractWriterTests:
|
|||
self.assertEqual(data.write(q), LENGTH)
|
||||
self.assertEqual(zip.getinfo('data').file_size, LENGTH)
|
||||
|
||||
def test_zipwritefile_attrs(self):
|
||||
fname = "somefile.txt"
|
||||
with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp:
|
||||
with zipfp.open(fname, 'w') as fid:
|
||||
self.assertRaises(io.UnsupportedOperation, fid.fileno)
|
||||
self.assertIs(fid.readable(), False)
|
||||
self.assertIs(fid.writable(), True)
|
||||
self.assertIs(fid.seekable(), False)
|
||||
self.assertIs(fid.closed, False)
|
||||
self.assertIs(fid.closed, True)
|
||||
self.assertRaises(io.UnsupportedOperation, fid.fileno)
|
||||
self.assertIs(fid.readable(), False)
|
||||
self.assertIs(fid.writable(), True)
|
||||
self.assertIs(fid.seekable(), False)
|
||||
|
||||
class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
|
||||
compression = zipfile.ZIP_STORED
|
||||
|
||||
|
@ -1487,7 +1521,7 @@ class PyZipFileTests(unittest.TestCase):
|
|||
fp.write("print(42)\n")
|
||||
|
||||
with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
|
||||
zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
|
||||
zipfp.writepy(FakePath(os.path.join(TESTFN2, "mod1.py")))
|
||||
names = zipfp.namelist()
|
||||
self.assertCompiledIn('mod1.py', names)
|
||||
finally:
|
||||
|
@ -1545,7 +1579,7 @@ class ExtractTests(unittest.TestCase):
|
|||
|
||||
def test_extract_with_target_pathlike(self):
|
||||
with temp_dir() as extdir:
|
||||
self._test_extract_with_target(pathlib.Path(extdir))
|
||||
self._test_extract_with_target(FakePath(extdir))
|
||||
|
||||
def test_extract_all(self):
|
||||
with temp_cwd():
|
||||
|
@ -1580,7 +1614,7 @@ class ExtractTests(unittest.TestCase):
|
|||
|
||||
def test_extract_all_with_target_pathlike(self):
|
||||
with temp_dir() as extdir:
|
||||
self._test_extract_all_with_target(pathlib.Path(extdir))
|
||||
self._test_extract_all_with_target(FakePath(extdir))
|
||||
|
||||
def check_file(self, filename, content):
|
||||
self.assertTrue(os.path.isfile(filename))
|
||||
|
@ -1893,7 +1927,7 @@ class OtherTests(unittest.TestCase):
|
|||
fp.write("this is not a legal zip file\n")
|
||||
self.assertFalse(zipfile.is_zipfile(TESTFN))
|
||||
# - passing a path-like object
|
||||
self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
|
||||
self.assertFalse(zipfile.is_zipfile(FakePath(TESTFN)))
|
||||
# - passing a file object
|
||||
with open(TESTFN, "rb") as fp:
|
||||
self.assertFalse(zipfile.is_zipfile(fp))
|
||||
|
@ -3013,7 +3047,7 @@ class ZipInfoTests(unittest.TestCase):
|
|||
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
||||
|
||||
def test_from_file_pathlike(self):
|
||||
zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
|
||||
zi = zipfile.ZipInfo.from_file(FakePath(__file__))
|
||||
self.assertEqual(posixpath.basename(zi.filename), 'test_core.py')
|
||||
self.assertFalse(zi.is_dir())
|
||||
self.assertEqual(zi.file_size, os.path.getsize(__file__))
|
||||
|
|
Loading…
Reference in New Issue