bpo-26730: Fix SpooledTemporaryFile data corruption (GH-17400)

SpooledTemporaryFile.rollback() might cause data corruption
when it is in text mode.

Co-Authored-By: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
Inada Naoki 2019-11-27 22:22:06 +09:00 committed by GitHub
parent 1bddf890e5
commit ea9835c5d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 19 deletions

View File

@ -105,8 +105,8 @@ The module defines the following user-callable items:
causes the file to roll over to an on-disk file regardless of its size. causes the file to roll over to an on-disk file regardless of its size.
The returned object is a file-like object whose :attr:`_file` attribute The returned object is a file-like object whose :attr:`_file` attribute
is either an :class:`io.BytesIO` or :class:`io.StringIO` object (depending on is either an :class:`io.BytesIO` or :class:`io.TextIOWrapper` object
whether binary or text *mode* was specified) or a true file (depending on whether binary or text *mode* was specified) or a true file
object, depending on whether :func:`rollover` has been called. This object, depending on whether :func:`rollover` has been called. This
file-like object can be used in a :keyword:`with` statement, just like file-like object can be used in a :keyword:`with` statement, just like
a normal file. a normal file.

View File

@ -633,10 +633,9 @@ class SpooledTemporaryFile:
if 'b' in mode: if 'b' in mode:
self._file = _io.BytesIO() self._file = _io.BytesIO()
else: else:
# Setting newline="\n" avoids newline translation; self._file = _io.TextIOWrapper(_io.BytesIO(),
# this is important because otherwise on Windows we'd encoding=encoding, errors=errors,
# get double newline translation upon rollover(). newline=newline)
self._file = _io.StringIO(newline="\n")
self._max_size = max_size self._max_size = max_size
self._rolled = False self._rolled = False
self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering, self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
@ -656,8 +655,12 @@ class SpooledTemporaryFile:
newfile = self._file = TemporaryFile(**self._TemporaryFileArgs) newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
del self._TemporaryFileArgs del self._TemporaryFileArgs
pos = file.tell()
if hasattr(newfile, 'buffer'):
newfile.buffer.write(file.detach().getvalue())
else:
newfile.write(file.getvalue()) newfile.write(file.getvalue())
newfile.seek(file.tell(), 0) newfile.seek(pos, 0)
self._rolled = True self._rolled = True

View File

@ -1114,7 +1114,8 @@ class TestSpooledTemporaryFile(BaseTestCase):
def test_text_mode(self): def test_text_mode(self):
# Creating a SpooledTemporaryFile with a text mode should produce # Creating a SpooledTemporaryFile with a text mode should produce
# a file object reading and writing (Unicode) text strings. # a file object reading and writing (Unicode) text strings.
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10) f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
encoding="utf-8")
f.write("abc\n") f.write("abc\n")
f.seek(0) f.seek(0)
self.assertEqual(f.read(), "abc\n") self.assertEqual(f.read(), "abc\n")
@ -1124,9 +1125,9 @@ class TestSpooledTemporaryFile(BaseTestCase):
self.assertFalse(f._rolled) self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+') self.assertEqual(f.mode, 'w+')
self.assertIsNone(f.name) self.assertIsNone(f.name)
self.assertIsNone(f.newlines) self.assertEqual(f.newlines, os.linesep)
self.assertIsNone(f.encoding) self.assertEqual(f.encoding, "utf-8")
self.assertIsNone(f.errors) self.assertEqual(f.errors, "strict")
f.write("xyzzy\n") f.write("xyzzy\n")
f.seek(0) f.seek(0)
@ -1139,8 +1140,8 @@ class TestSpooledTemporaryFile(BaseTestCase):
self.assertEqual(f.mode, 'w+') self.assertEqual(f.mode, 'w+')
self.assertIsNotNone(f.name) self.assertIsNotNone(f.name)
self.assertEqual(f.newlines, os.linesep) self.assertEqual(f.newlines, os.linesep)
self.assertIsNotNone(f.encoding) self.assertEqual(f.encoding, "utf-8")
self.assertIsNotNone(f.errors) self.assertEqual(f.errors, "strict")
def test_text_newline_and_encoding(self): def test_text_newline_and_encoding(self):
f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10, f = tempfile.SpooledTemporaryFile(mode='w+', max_size=10,
@ -1152,13 +1153,15 @@ class TestSpooledTemporaryFile(BaseTestCase):
self.assertFalse(f._rolled) self.assertFalse(f._rolled)
self.assertEqual(f.mode, 'w+') self.assertEqual(f.mode, 'w+')
self.assertIsNone(f.name) self.assertIsNone(f.name)
self.assertIsNone(f.newlines) self.assertIsNotNone(f.newlines)
self.assertIsNone(f.encoding) self.assertEqual(f.encoding, "utf-8")
self.assertIsNone(f.errors) self.assertEqual(f.errors, "ignore")
f.write("\u039B" * 20 + "\r\n") f.write("\u039C" * 10 + "\r\n")
f.write("\u039D" * 20)
f.seek(0) f.seek(0)
self.assertEqual(f.read(), "\u039B\r\n" + ("\u039B" * 20) + "\r\n") self.assertEqual(f.read(),
"\u039B\r\n" + ("\u039C" * 10) + "\r\n" + ("\u039D" * 20))
self.assertTrue(f._rolled) self.assertTrue(f._rolled)
self.assertEqual(f.mode, 'w+') self.assertEqual(f.mode, 'w+')
self.assertIsNotNone(f.name) self.assertIsNotNone(f.name)

View File

@ -0,0 +1,2 @@
Fix ``SpooledTemporaryFile.rollover()`` might corrupt the file when it is in
text mode. Patch by Serhiy Storchaka.