Issue #16800: tempfile.gettempdir() no longer left temporary files when

the disk is full.  Original patch by Amir Szekely.
This commit is contained in:
Serhiy Storchaka 2013-02-13 00:34:46 +02:00
parent cdc7a91dde
commit 0127de0b87
5 changed files with 93 additions and 12 deletions

View File

@ -29,6 +29,7 @@ __all__ = [
# Imports.
import io as _io
import os as _os
import errno as _errno
from random import Random as _Random
@ -193,14 +194,17 @@ def _get_default_tempdir():
name = namer.next()
filename = _os.path.join(dir, name)
try:
fd = _os.open(filename, flags, 0600)
fp = _os.fdopen(fd, 'w')
fp.write('blat')
fp.close()
_os.unlink(filename)
del fp, fd
fd = _os.open(filename, flags, 0o600)
try:
try:
fp = _io.open(fd, 'wb', buffering=0, closefd=False)
fp.write(b'blat')
finally:
_os.close(fd)
finally:
_os.unlink(filename)
return dir
except (OSError, IOError), e:
except (OSError, IOError) as e:
if e[0] != _errno.EEXIST:
break # no point trying more names in this directory
pass

View File

@ -1298,6 +1298,33 @@ def reap_children():
except:
break
@contextlib.contextmanager
def swap_attr(obj, attr, new_val):
"""Temporary swap out an attribute with a new object.
Usage:
with swap_attr(obj, "attr", 5):
...
This will set obj.attr to 5 for the duration of the with: block,
restoring the old value at the end of the block. If `attr` doesn't
exist on `obj`, it will be created and then deleted at the end of the
block.
"""
if hasattr(obj, attr):
real_val = getattr(obj, attr)
setattr(obj, attr, new_val)
try:
yield
finally:
setattr(obj, attr, real_val)
else:
setattr(obj, attr, new_val)
try:
yield
finally:
delattr(obj, attr)
def py3k_bytes(b):
"""Emulate the py3k bytes() constructor.

View File

@ -1,13 +1,16 @@
# tempfile.py unit tests.
import tempfile
import errno
import io
import os
import signal
import shutil
import sys
import re
import warnings
import unittest
from test import test_support
from test import test_support as support
warnings.filterwarnings("ignore",
category=RuntimeWarning,
@ -177,7 +180,7 @@ class test__candidate_tempdir_list(TC):
# _candidate_tempdir_list contains the expected directories
# Make sure the interesting environment variables are all set.
with test_support.EnvironmentVarGuard() as env:
with support.EnvironmentVarGuard() as env:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
@ -202,8 +205,51 @@ class test__candidate_tempdir_list(TC):
test_classes.append(test__candidate_tempdir_list)
# We test _get_default_tempdir some more by testing gettempdir.
# We test _get_default_tempdir by testing gettempdir.
class TestGetDefaultTempdir(TC):
"""Test _get_default_tempdir()."""
def test_no_files_left_behind(self):
# use a private empty directory
our_temp_directory = tempfile.mkdtemp()
try:
# force _get_default_tempdir() to consider our empty directory
def our_candidate_list():
return [our_temp_directory]
with support.swap_attr(tempfile, "_candidate_tempdir_list",
our_candidate_list):
# verify our directory is empty after _get_default_tempdir()
tempfile._get_default_tempdir()
self.assertEqual(os.listdir(our_temp_directory), [])
def raise_OSError(*args, **kwargs):
raise OSError(-1)
with support.swap_attr(io, "open", raise_OSError):
# test again with failing io.open()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
open = io.open
def bad_writer(*args, **kwargs):
fp = open(*args, **kwargs)
fp.write = raise_OSError
return fp
with support.swap_attr(io, "open", bad_writer):
# test again with failing write()
with self.assertRaises(IOError) as cm:
tempfile._get_default_tempdir()
self.assertEqual(cm.exception.errno, errno.ENOENT)
self.assertEqual(os.listdir(our_temp_directory), [])
finally:
shutil.rmtree(our_temp_directory)
test_classes.append(TestGetDefaultTempdir)
class test__get_candidate_names(TC):
@ -299,7 +345,7 @@ class test__mkstemp_inner(TC):
if not has_spawnl:
return # ugh, can't use SkipTest.
if test_support.verbose:
if support.verbose:
v="v"
else:
v="q"
@ -913,7 +959,7 @@ if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
test_classes.append(test_TemporaryFile)
def test_main():
test_support.run_unittest(*test_classes)
support.run_unittest(*test_classes)
if __name__ == "__main__":
test_main()

View File

@ -974,6 +974,7 @@ Kalle Svensson
Paul Swartz
Thenault Sylvain
Péter Szabó
Amir Szekely
Arfrever Frehtes Taifersar Arahesis
Geoff Talvola
William Tanksley

View File

@ -202,6 +202,9 @@ Core and Builtins
Library
-------
- Issue #16800: tempfile.gettempdir() no longer left temporary files when
the disk is full. Original patch by Amir Szekely.
- Issue #13555: cPickle now supports files larger than 2 GiB.
- Issue #17052: unittest discovery should use self.testLoader.