bpo-38334: Fix seeking backward on an encrypted zipfile.ZipExtFile. (GH-16937)

Test by Daniel Hillier.
This commit is contained in:
Serhiy Storchaka 2019-10-27 10:22:14 +02:00 committed by GitHub
parent a8fb9327fb
commit 5c32af7522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 26 deletions

View File

@ -1934,6 +1934,44 @@ class DecryptionTests(unittest.TestCase):
self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
def test_seek_tell(self):
self.zip.setpassword(b"python")
txt = self.plain
test_word = b'encryption'
bloc = txt.find(test_word)
bloc_len = len(test_word)
with self.zip.open("test.txt", "r") as fp:
fp.seek(bloc, os.SEEK_SET)
self.assertEqual(fp.tell(), bloc)
fp.seek(-bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.tell(), bloc)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
# Make sure that the second read after seeking back beyond
# _readbuffer returns the same content (ie. rewind to the start of
# the file to read forward to the required position).
old_read_size = fp.MIN_READ_SIZE
fp.MIN_READ_SIZE = 1
fp._readbuffer = b''
fp._offset = 0
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)
fp.seek(bloc, os.SEEK_CUR)
self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
fp.MIN_READ_SIZE = old_read_size
fp.seek(0, os.SEEK_END)
self.assertEqual(fp.tell(), len(txt))
fp.seek(0, os.SEEK_SET)
self.assertEqual(fp.tell(), 0)
# Read the file completely to definitely call any eof integrity
# checks (crc) and make sure they still pass.
fp.read()
class AbstractTestsWithRandomBinaryFiles: class AbstractTestsWithRandomBinaryFiles:
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):

View File

@ -792,10 +792,10 @@ class ZipExtFile(io.BufferedIOBase):
# Chunk size to read during seek # Chunk size to read during seek
MAX_SEEK_READ = 1 << 24 MAX_SEEK_READ = 1 << 24
def __init__(self, fileobj, mode, zipinfo, decrypter=None, def __init__(self, fileobj, mode, zipinfo, pwd=None,
close_fileobj=False): close_fileobj=False):
self._fileobj = fileobj self._fileobj = fileobj
self._decrypter = decrypter self._pwd = pwd
self._close_fileobj = close_fileobj self._close_fileobj = close_fileobj
self._compress_type = zipinfo.compress_type self._compress_type = zipinfo.compress_type
@ -810,11 +810,6 @@ class ZipExtFile(io.BufferedIOBase):
self.newlines = None self.newlines = None
# Adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information.
if self._decrypter is not None:
self._compress_left -= 12
self.mode = mode self.mode = mode
self.name = zipinfo.filename self.name = zipinfo.filename
@ -835,6 +830,30 @@ class ZipExtFile(io.BufferedIOBase):
except AttributeError: except AttributeError:
pass pass
self._decrypter = None
if pwd:
if zipinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zipinfo._raw_time >> 8) & 0xff
else:
# compare against the CRC otherwise
check_byte = (zipinfo.CRC >> 24) & 0xff
h = self._init_decrypter()
if h != check_byte:
raise RuntimeError("Bad password for file %r" % zipinfo.orig_filename)
def _init_decrypter(self):
self._decrypter = _ZipDecrypter(self._pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = self._fileobj.read(12)
self._compress_left -= 12
return self._decrypter(header)[11]
def __repr__(self): def __repr__(self):
result = ['<%s.%s' % (self.__class__.__module__, result = ['<%s.%s' % (self.__class__.__module__,
self.__class__.__qualname__)] self.__class__.__qualname__)]
@ -1061,6 +1080,8 @@ class ZipExtFile(io.BufferedIOBase):
self._decompressor = _get_decompressor(self._compress_type) self._decompressor = _get_decompressor(self._compress_type)
self._eof = False self._eof = False
read_offset = new_pos read_offset = new_pos
if self._decrypter is not None:
self._init_decrypter()
while read_offset > 0: while read_offset > 0:
read_len = min(self.MAX_SEEK_READ, read_offset) read_len = min(self.MAX_SEEK_READ, read_offset)
@ -1524,32 +1545,16 @@ class ZipFile:
# check for encrypted flag & handle password # check for encrypted flag & handle password
is_encrypted = zinfo.flag_bits & 0x1 is_encrypted = zinfo.flag_bits & 0x1
zd = None
if is_encrypted: if is_encrypted:
if not pwd: if not pwd:
pwd = self.pwd pwd = self.pwd
if not pwd: if not pwd:
raise RuntimeError("File %r is encrypted, password " raise RuntimeError("File %r is encrypted, password "
"required for extraction" % name) "required for extraction" % name)
zd = _ZipDecrypter(pwd)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# or the MSB of the file time depending on the header type
# and is used to check the correctness of the password.
header = zef_file.read(12)
h = zd(header[0:12])
if zinfo.flag_bits & 0x8:
# compare against the file type from extended local headers
check_byte = (zinfo._raw_time >> 8) & 0xff
else: else:
# compare against the CRC otherwise pwd = None
check_byte = (zinfo.CRC >> 24) & 0xff
if h[11] != check_byte:
raise RuntimeError("Bad password for file %r" % name)
return ZipExtFile(zef_file, mode, zinfo, zd, True) return ZipExtFile(zef_file, mode, zinfo, pwd, True)
except: except:
zef_file.close() zef_file.close()
raise raise

View File

@ -0,0 +1 @@
Fixed seeking backward on an encrypted :class:`zipfile.ZipExtFile`.