Patch #1630118: add a SpooledTemporaryFile class to tempfile.

This commit is contained in:
Collin Winter 2007-03-19 18:52:08 +00:00
parent d9dbe72056
commit a8785cc26a
5 changed files with 237 additions and 1 deletions

View File

@ -67,6 +67,23 @@ it is closed.
\versionadded[The \var{delete} parameter]{2.6}
\end{funcdesc}
\begin{funcdesc}{SpooledTemporaryFile}{\optional{max\_size=\code{0},
\optional{mode=\code{'w+b'}\optional{,
bufsize=\code{-1}\optional{,
suffix\optional{, prefix\optional{,
dir}}}}}}}
This function operates exactly as \function{TemporaryFile()} does,
except that data is spooled in memory until the file size exceeds
\var{max_size}, or until the file's \function{fileno()} method is
called, at which point the contents are written to disk and operation
proceeds as with \function{TemporaryFile()}.
The resulting file has one additional method, \function{rollover()},
which causes the file to roll over to an on-disk file regardless of
its size.
\versionadded{2.6}
\end{funcdesc}
\begin{funcdesc}{mkstemp}{\optional{suffix\optional{,
prefix\optional{, dir\optional{, text}}}}}
Creates a temporary file in the most secure manner possible. There

View File

@ -19,6 +19,7 @@ This module also provides some data items to the user:
__all__ = [
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
"SpooledTemporaryFile",
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
@ -36,6 +37,11 @@ if _os.name == 'mac':
import Carbon.Folder as _Folder
import Carbon.Folders as _Folders
try:
from cStringIO import StringIO as _StringIO
except:
from StringIO import StringIO as _StringIO
try:
import fcntl as _fcntl
except ImportError:
@ -473,3 +479,111 @@ else:
except:
_os.close(fd)
raise
class SpooledTemporaryFile:
"""Temporary file wrapper, specialized to switch from
StringIO to a real file when it exceeds a certain size or
when a fileno is needed.
"""
_rolled = False
def __init__(self, max_size=0, mode='w+b', bufsize=-1,
suffix="", prefix=template, dir=None):
self._file = _StringIO()
self._max_size = max_size
self._rolled = False
self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir)
def _check(self, file):
if self._rolled: return
max_size = self._max_size
if max_size and file.tell() > max_size:
self.rollover()
def rollover(self):
if self._rolled: return
file = self._file
newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
del self._TemporaryFileArgs
newfile.write(file.getvalue())
newfile.seek(file.tell(), 0)
self._rolled = True
# file protocol
def __iter__(self):
return self._file.__iter__()
def close(self):
self._file.close()
@property
def closed(self):
return self._file.closed
@property
def encoding(self):
return self._file.encoding
def fileno(self):
self.rollover()
return self._file.fileno()
def flush(self):
self._file.flush()
def isatty(self):
return self._file.isatty()
@property
def mode(self):
return self._file.mode
@property
def name(self):
return self._file.name
@property
def newlines(self):
return self._file.newlines
def next(self):
return self._file.next
def read(self, *args):
return self._file.read(*args)
def readline(self, *args):
return self._file.readline(*args)
def readlines(self, *args):
return self._file.readlines(*args)
def seek(self, *args):
self._file.seek(*args)
@property
def softspace(self):
return self._file.softspace
def tell(self):
return self._file.tell()
def truncate(self):
self._file.truncate()
def write(self, s):
file = self._file
rv = file.write(s)
self._check(file)
return rv
def writelines(self, iterable):
file = self._file
rv = file.writelines(iterable)
self._check(file)
return rv
def xreadlines(self, *args):
return self._file.xreadlines(*args)

View File

@ -81,7 +81,8 @@ class test_exports(TC):
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1
"template" : 1,
"SpooledTemporaryFile" : 1
}
unexp = []
@ -632,6 +633,107 @@ class test_NamedTemporaryFile(TC):
test_classes.append(test_NamedTemporaryFile)
class test_SpooledTemporaryFile(TC):
"""Test SpooledTemporaryFile()."""
def do_create(self, max_size=0, dir=None, pre="", suf=""):
if dir is None:
dir = tempfile.gettempdir()
try:
file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
except:
self.failOnException("SpooledTemporaryFile")
return file
def test_basic(self):
# SpooledTemporaryFile can create files
f = self.do_create()
self.failIf(f._rolled)
f = self.do_create(max_size=100, pre="a", suf=".txt")
self.failIf(f._rolled)
def test_del_on_close(self):
# A SpooledTemporaryFile is deleted when closed
dir = tempfile.mkdtemp()
try:
f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
self.failIf(f._rolled)
f.write('blat ' * 5)
self.failUnless(f._rolled)
filename = f.name
f.close()
self.failIf(os.path.exists(filename),
"SpooledTemporaryFile %s exists after close" % filename)
finally:
os.rmdir(dir)
def test_rewrite_small(self):
# A SpooledTemporaryFile can be written to multiple within the max_size
f = self.do_create(max_size=30)
self.failIf(f._rolled)
for i in range(5):
f.seek(0, 0)
f.write('x' * 20)
self.failIf(f._rolled)
def test_write_sequential(self):
# A SpooledTemporaryFile should hold exactly max_size bytes, and roll
# over afterward
f = self.do_create(max_size=30)
self.failIf(f._rolled)
f.write('x' * 20)
self.failIf(f._rolled)
f.write('x' * 10)
self.failIf(f._rolled)
f.write('x')
self.failUnless(f._rolled)
def test_sparse(self):
# A SpooledTemporaryFile that is written late in the file will extend
# when that occurs
f = self.do_create(max_size=30)
self.failIf(f._rolled)
f.seek(100, 0)
self.failIf(f._rolled)
f.write('x')
self.failUnless(f._rolled)
def test_fileno(self):
# A SpooledTemporaryFile should roll over to a real file on fileno()
f = self.do_create(max_size=30)
self.failIf(f._rolled)
self.failUnless(f.fileno() > 0)
self.failUnless(f._rolled)
def test_multiple_close(self):
# A SpooledTemporaryFile can be closed many times without error
f = tempfile.SpooledTemporaryFile()
f.write('abc\n')
f.close()
try:
f.close()
f.close()
except:
self.failOnException("close")
def test_bound_methods(self):
# It should be OK to steal a bound method from a SpooledTemporaryFile
# and use it independently; when the file rolls over, those bound
# methods should continue to function
f = self.do_create(max_size=30)
read = f.read
write = f.write
seek = f.seek
write("a" * 35)
write("b" * 35)
seek(0, 0)
self.failUnless(read(70) == 'a'*35 + 'b'*35)
test_classes.append(test_SpooledTemporaryFile)
class test_TemporaryFile(TC):
"""Test TemporaryFile()."""

View File

@ -442,6 +442,7 @@ Chad Miller
Damien Miller
Roman Milner
Dom Mitchell
Dustin J. Mitchell
Doug Moen
Paul Moore
The Dragon De Monsyne

View File

@ -187,6 +187,8 @@ Core and builtins
Library
-------
- Patch #1630118: add a SpooledTemporaryFile class to tempfile.py.
- Patch #1273829: os.walk() now has a "followlinks" parameter. If set to
True (which is not the default), it visits symlinks pointing to
directories.