Issue 2021: Allow NamedTemporaryFile and SpooledTemporaryFile to be used as context managers. (The NamedTemporaryFile fix should be considered for backporting to 2.5)

This commit is contained in:
Nick Coghlan 2008-02-09 15:28:09 +00:00
parent 8c6c12ca96
commit 97fac3eb0a
3 changed files with 104 additions and 8 deletions

View File

@ -51,7 +51,8 @@ The module defines the following user-callable functions:
The returned object is a true file object on POSIX platforms. On other
platforms, it is a file-like object whose :attr:`file` attribute is the
underlying true file object.
underlying true file object. This file-like object can be used in a :keyword:`with`
statement, just like a normal file.
.. function:: NamedTemporaryFile([mode='w+b'[, bufsize=-1[, suffix[, prefix[, dir[, delete]]]]]])
@ -65,7 +66,8 @@ The module defines the following user-callable functions:
If *delete* is true (the default), the file is deleted as soon as it is closed.
The returned object is always a file-like object whose :attr:`file` attribute
is the underlying true file object.
is the underlying true file object. This file-like object can be used in a :keyword:`with`
statement, just like a normal file.
.. versionadded:: 2.3
@ -85,7 +87,8 @@ The module defines the following user-callable functions:
The returned object is a file-like object whose :attr:`_file` attribute
is either a :class:`StringIO` object or a true file object, depending on
whether :func:`rollover` has been called.
whether :func:`rollover` has been called. This file-like object can be used in a
:keyword:`with` statement, just like a normal file.
.. versionadded:: 2.6

View File

@ -370,6 +370,7 @@ def mktemp(suffix="", prefix=template, dir=None):
raise IOError, (_errno.EEXIST, "No usable temporary filename found")
class _TemporaryFileWrapper:
"""Temporary file wrapper
@ -385,17 +386,25 @@ class _TemporaryFileWrapper:
self.delete = delete
def __getattr__(self, name):
# Attribute lookups are delegated to the underlying file
# and cached for non-numeric results
# (i.e. methods are cached, closed and friends are not)
file = self.__dict__['file']
a = getattr(file, name)
if type(a) != type(0):
if not issubclass(type(a), type(0)):
setattr(self, name, a)
return a
# The underlying __enter__ method returns the wrong object
# (self.file) so override it to return the wrapper
def __enter__(self):
self.file.__enter__()
return self
# NT provides delete-on-close as a primitive, so we don't need
# the wrapper to do anything special. We still use it so that
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
if _os.name != 'nt':
# Cache the unlinker so we don't get spurious errors at
# shutdown when the module-level "os" is None'd out. Note
# that this must be referenced as self.unlink, because the
@ -413,6 +422,14 @@ class _TemporaryFileWrapper:
def __del__(self):
self.close()
# Need to trap __exit__ as well to ensure the file gets
# deleted when used in a with statement
def __exit__(self, exc, value, tb):
result = self.file.__exit__(exc, value, tb)
self.close()
return result
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
prefix=template, dir=None, delete=True):
"""Create and return a temporary file.
@ -511,6 +528,20 @@ class SpooledTemporaryFile:
self._rolled = True
# The method caching trick from NamedTemporaryFile
# won't work here, because _file may change from a
# _StringIO instance to a real file. So we list
# all the methods directly.
# Context management protocol
def __enter__(self):
if self._file.closed:
raise ValueError("Cannot enter context with closed file")
return self
def __exit__(self, exc, value, tb):
self._file.close()
# file protocol
def __iter__(self):
return self._file.__iter__()

View File

@ -1,5 +1,5 @@
# tempfile.py unit tests.
from __future__ import with_statement
import tempfile
import os
import sys
@ -619,7 +619,6 @@ class test_NamedTemporaryFile(TC):
def test_multiple_close(self):
# A NamedTemporaryFile can be closed many times without error
f = tempfile.NamedTemporaryFile()
f.write('abc\n')
f.close()
@ -629,6 +628,16 @@ class test_NamedTemporaryFile(TC):
except:
self.failOnException("close")
def test_context_manager(self):
# A NamedTemporaryFile can be used as a context manager
with tempfile.NamedTemporaryFile() as f:
self.failUnless(os.path.exists(f.name))
self.failIf(os.path.exists(f.name))
def use_closed():
with f:
pass
self.failUnlessRaises(ValueError, use_closed)
# How to test the mode and bufsize parameters?
test_classes.append(test_NamedTemporaryFile)
@ -707,10 +716,23 @@ class test_SpooledTemporaryFile(TC):
self.failUnless(f.fileno() > 0)
self.failUnless(f._rolled)
def test_multiple_close(self):
def test_multiple_close_before_rollover(self):
# A SpooledTemporaryFile can be closed many times without error
f = tempfile.SpooledTemporaryFile()
f.write('abc\n')
self.failIf(f._rolled)
f.close()
try:
f.close()
f.close()
except:
self.failOnException("close")
def test_multiple_close_after_rollover(self):
# A SpooledTemporaryFile can be closed many times without error
f = tempfile.SpooledTemporaryFile(max_size=1)
f.write('abc\n')
self.failUnless(f._rolled)
f.close()
try:
f.close()
@ -732,6 +754,46 @@ class test_SpooledTemporaryFile(TC):
seek(0, 0)
self.failUnless(read(70) == 'a'*35 + 'b'*35)
def test_context_manager_before_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
with tempfile.SpooledTemporaryFile(max_size=1) as f:
self.failIf(f._rolled)
self.failIf(f.closed)
self.failUnless(f.closed)
def use_closed():
with f:
pass
self.failUnlessRaises(ValueError, use_closed)
def test_context_manager_during_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
with tempfile.SpooledTemporaryFile(max_size=1) as f:
self.failIf(f._rolled)
f.write('abc\n')
f.flush()
self.failUnless(f._rolled)
self.failIf(f.closed)
self.failUnless(f.closed)
def use_closed():
with f:
pass
self.failUnlessRaises(ValueError, use_closed)
def test_context_manager_after_rollover(self):
# A SpooledTemporaryFile can be used as a context manager
f = tempfile.SpooledTemporaryFile(max_size=1)
f.write('abc\n')
f.flush()
self.failUnless(f._rolled)
with f:
self.failIf(f.closed)
self.failUnless(f.closed)
def use_closed():
with f:
pass
self.failUnlessRaises(ValueError, use_closed)
test_classes.append(test_SpooledTemporaryFile)