Check-in of the most essential parts of SF 589982 (tempfile.py

rewrite, by Zack Weinberg).  This replaces most code in tempfile.py
(please review!!!) and adds extensive unit tests for it.

This will cause some warnings in the test suite; I'll check those in
soon, and also the docs.
This commit is contained in:
Guido van Rossum 2002-08-09 16:14:33 +00:00
parent 0f5f0b8057
commit 0e54871f82
2 changed files with 1097 additions and 229 deletions

View File

@ -1,178 +1,361 @@
"""Temporary files and filenames."""
"""Temporary files.
# XXX This tries to be not UNIX specific, but I don't know beans about
# how to choose a temp directory or filename on MS-DOS or other
# systems so it may have to be changed...
This module provides generic, low- and high-level interfaces for
creating temporary files and directories. The interfaces listed
as "safe" just below can be used without fear of race conditions.
Those listed as "unsafe" cannot, and are provided for backward
compatibility only.
import os
This module also provides some data items to the user:
__all__ = ["mktemp", "TemporaryFile", "tempdir", "gettempprefix"]
TMP_MAX - maximum number of names that will be tried before
giving up.
template - the default prefix for all temporary names.
You may change this to control the default prefix.
tempdir - If this is set to a string before the first use of
any routine from this module, it will be considered as
another candidate location to store temporary files.
"""
# Parameters that the caller may set to override the defaults
tempdir = None
template = None
__all__ = [
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
"tempdir", "gettempdir"
]
def gettempdir():
"""Function to calculate the directory to use."""
global tempdir
if tempdir is not None:
return tempdir
# _gettempdir_inner deduces whether a candidate temp dir is usable by
# trying to create a file in it, and write to it. If that succeeds,
# great, it closes the file and unlinks it. There's a race, though:
# the *name* of the test file it tries is the same across all threads
# under most OSes (Linux is an exception), and letting multiple threads
# all try to open, write to, close, and unlink a single file can cause
# a variety of bogus errors (e.g., you cannot unlink a file under
# Windows if anyone has it open, and two threads cannot create the
# same file in O_EXCL mode under Unix). The simplest cure is to serialize
# calls to _gettempdir_inner. This isn't a real expense, because the
# first thread to succeed sets the global tempdir, and all subsequent
# calls to gettempdir() reuse that without trying _gettempdir_inner.
_tempdir_lock.acquire()
try:
return _gettempdir_inner()
finally:
_tempdir_lock.release()
# Imports.
def _gettempdir_inner():
"""Function to calculate the directory to use."""
global tempdir
if tempdir is not None:
return tempdir
try:
pwd = os.getcwd()
except (AttributeError, os.error):
pwd = os.curdir
attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd]
if os.name == 'nt':
attempdirs.insert(0, 'C:\\TEMP')
attempdirs.insert(0, '\\TEMP')
elif os.name == 'mac':
import macfs, MACFS
try:
refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
MACFS.kTemporaryFolderType, 1)
dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
attempdirs.insert(0, dirname)
except macfs.error:
import os as _os
import errno as _errno
from random import Random as _Random
if _os.name == 'mac':
import macfs as _macfs
import MACFS as _MACFS
try:
import fcntl as _fcntl
def _set_cloexec(fd):
flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
if flags >= 0:
# flags read successfully, modify
flags |= _fcntl.FD_CLOEXEC
_fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
except (ImportError, AttributeError):
def _set_cloexec(fd):
pass
try:
import thread as _thread
_allocate_lock = _thread.allocate_lock
except (ImportError, AttributeError):
class _allocate_lock:
def acquire(self):
pass
elif os.name == 'riscos':
scrapdir = os.getenv('Wimp$ScrapDir')
if scrapdir:
attempdirs.insert(0, scrapdir)
for envname in 'TMPDIR', 'TEMP', 'TMP':
if envname in os.environ:
attempdirs.insert(0, os.environ[envname])
testfile = gettempprefix() + 'test'
for dir in attempdirs:
release = acquire
_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
if hasattr(_os, 'O_NOINHERIT'): _text_openflags |= _os.O_NOINHERIT
if hasattr(_os, 'O_NOFOLLOW'): _text_openflags |= _os.O_NOFOLLOW
_bin_openflags = _text_openflags
if hasattr(_os, 'O_BINARY'): _bin_openflags |= _os.O_BINARY
if hasattr(_os, 'TMP_MAX'):
TMP_MAX = _os.TMP_MAX
else:
TMP_MAX = 10000
if _os.name == 'nt':
template = '~t' # cater to eight-letter limit
else:
template = "tmp"
tempdir = None
# Internal routines.
_once_lock = _allocate_lock()
def _once(var, initializer):
"""Wrapper to execute an initialization operation just once,
even if multiple threads reach the same point at the same time.
var is the name (as a string) of the variable to be entered into
the current global namespace.
initializer is a callable which will return the appropriate initial
value for variable. It will be called only if variable is not
present in the global namespace, or its current value is None.
Do not call _once from inside an initializer routine, it will deadlock.
"""
vars = globals()
lock = _once_lock
# Check first outside the lock.
if var in vars and vars[var] is not None:
return
try:
lock.acquire()
# Check again inside the lock.
if var in vars and vars[var] is not None:
return
vars[var] = initializer()
finally:
lock.release()
class _RandomNameSequence:
"""An instance of _RandomNameSequence generates an endless
sequence of unpredictable strings which can safely be incorporated
into file names. Each string is six characters long. Multiple
threads can safely use the same instance at the same time.
_RandomNameSequence is an iterator."""
characters = ( "abcdefghijklmnopqrstuvwxyz"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "0123456789-_")
def __init__(self):
self.mutex = _allocate_lock()
self.rng = _Random()
self.normcase = _os.path.normcase
def __iter__(self):
return self
def next(self):
m = self.mutex
c = self.characters
r = self.rng
try:
filename = os.path.join(dir, testfile)
if os.name == 'posix':
try:
fd = os.open(filename,
os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700)
except OSError:
pass
else:
fp = os.fdopen(fd, 'w')
fp.write('blat')
fp.close()
os.unlink(filename)
del fp, fd
tempdir = dir
break
else:
fp = open(filename, 'w')
m.acquire()
letters = ''.join([r.choice(c), r.choice(c), r.choice(c),
r.choice(c), r.choice(c), r.choice(c)])
finally:
m.release()
return self.normcase(letters)
def _candidate_tempdir_list():
"""Generate a list of candidate temporary directories which
_get_default_tempdir will try."""
dirlist = []
# First, try the environment.
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = _os.getenv(envname)
if dirname: dirlist.append(dirname)
# Failing that, try OS-specific locations.
if _os.name == 'mac':
try:
refnum, dirid = _macfs.FindFolder(_MACFS.kOnSystemDisk,
_MACFS.kTemporaryFolderType, 1)
dirname = _macfs.FSSpec((refnum, dirid, '')).as_pathname()
dirlist.append(dirname)
except _macfs.error:
pass
elif _os.name == 'riscos':
dirname = _os.getenv('Wimp$ScrapDir')
if dirname: dirlist.append(dirname)
elif _os.name == 'nt':
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
else:
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
# As a last resort, the current directory.
try:
dirlist.append(_os.getcwd())
except (AttributeError, _os.error):
dirlist.append(_os.curdir)
return dirlist
def _get_default_tempdir():
"""Calculate the default directory to use for temporary files.
This routine should be called through '_once' (see above) as we
do not want multiple threads attempting this calculation simultaneously.
We determine whether or not a candidate temp dir is usable by
trying to create and write to a file in that directory. If this
is successful, the test file is deleted. To prevent denial of
service, the name of the test file must be randomized."""
namer = _RandomNameSequence()
dirlist = _candidate_tempdir_list()
flags = _text_openflags
for dir in dirlist:
if dir != _os.curdir:
dir = _os.path.normcase(_os.path.abspath(dir))
# Try only a few names per directory.
for seq in xrange(100):
name = namer.next()
filename = _os.path.join(dir, name)
try:
fd = _os.open(filename, flags, 0600)
fp = _os.fdopen(fd, 'w')
fp.write('blat')
fp.close()
os.unlink(filename)
tempdir = dir
break
except IOError:
pass
if tempdir is None:
msg = "Can't find a usable temporary directory amongst " + `attempdirs`
raise IOError, msg
return tempdir
_os.unlink(filename)
del fp, fd
return dir
except (OSError, IOError), e:
if e[0] != _errno.EEXIST:
break # no point trying more names in this directory
pass
raise IOError, (_errno.ENOENT,
("No usable temporary directory found in %s" % dirlist))
def _get_candidate_names():
"""Common setup sequence for all user-callable interfaces."""
_once('_name_sequence', _RandomNameSequence)
return _name_sequence
# template caches the result of gettempprefix, for speed, when possible.
# XXX unclear why this isn't "_template"; left it "template" for backward
# compatibility.
if os.name == "posix":
# We don't try to cache the template on posix: the pid may change on us
# between calls due to a fork, and on Linux the pid changes even for
# another thread in the same process. Since any attempt to keep the
# cache in synch would have to call os.getpid() anyway in order to make
# sure the pid hasn't changed between calls, a cache wouldn't save any
# time. In addition, a cache is difficult to keep correct with the pid
# changing willy-nilly, and earlier attempts proved buggy (races).
template = None
def _mkstemp_inner(dir, pre, suf, flags):
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
# Else the pid never changes, so gettempprefix always returns the same
# string.
elif os.name == "nt":
template = '~' + `os.getpid()` + '-'
elif os.name in ('mac', 'riscos'):
template = 'Python-Tmp-'
else:
template = 'tmp' # XXX might choose a better one
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0600)
_set_cloexec(fd)
return (fd, file)
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
raise
raise IOError, (_errno.EEXIST, "No usable temporary file name found")
# User visible interfaces.
def gettempprefix():
"""Function to calculate a prefix of the filename to use.
"""Accessor for tempdir.template."""
return template
This incorporates the current process id on systems that support such a
notion, so that concurrent processes don't generate the same prefix.
def gettempdir():
"""Accessor for tempdir.tempdir."""
_once('tempdir', _get_default_tempdir)
return tempdir
def mkstemp(suffix="", prefix=template, dir=gettempdir(), binary=1):
"""mkstemp([suffix, [prefix, [dir, [binary]]]])
User-callable function to create and return a unique temporary
file. The return value is a pair (fd, name) where fd is the
file descriptor returned by os.open, and name is the filename.
If 'suffix' is specified, the file name will end with that suffix,
otherwise there will be no suffix.
If 'prefix' is specified, the file name will begin with that prefix,
otherwise a default prefix is used.
If 'dir' is specified, the file will be created in that directory,
otherwise a default directory is used.
If 'binary' is specified and false, the file is opened in binary
mode. Otherwise, the file is opened in text mode. On some
operating systems, this makes no difference.
The file is readable and writable only by the creating user ID.
If the operating system uses permission bits to indicate whether a
file is executable, the file is executable by no one. The file
descriptor is not inherited by children of this process.
Caller is responsible for deleting the file when done with it.
"""
global template
if template is None:
return '@' + `os.getpid()` + '.'
if binary:
flags = _bin_openflags
else:
return template
flags = _text_openflags
return _mkstemp_inner(dir, prefix, suffix, flags)
def mktemp(suffix=""):
"""User-callable function to return a unique temporary file name."""
dir = gettempdir()
pre = gettempprefix()
while 1:
i = _counter.get_next()
file = os.path.join(dir, pre + str(i) + suffix)
if not os.path.exists(file):
def mkdtemp(suffix="", prefix=template, dir=gettempdir()):
"""mkdtemp([suffix, [prefix, [dir]]])
User-callable function to create and return a unique temporary
directory. The return value is the pathname of the directory.
Arguments are as for mkstemp, except that the 'binary' argument is
not accepted.
The directory is readable, writable, and searchable only by the
creating user.
Caller is responsible for deleting the directory when done with it.
"""
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, prefix + name + suffix)
try:
_os.mkdir(file, 0700)
return file
except OSError, e:
if e.errno == _errno.EEXIST:
continue # try again
raise
raise IOError, (_errno.EEXIST, "No usable temporary directory name found")
def mktemp(suffix="", prefix=template, dir=gettempdir()):
"""mktemp([suffix, [prefix, [dir]]])
User-callable function to return a unique temporary file name. The
file is not created.
Arguments are as for mkstemp, except that the 'binary' argument is
not accepted.
This function is unsafe and should not be used. The file name
refers to a file that did not exist at some point, but by the time
you get around to creating it, someone else may have beaten you to
the punch.
"""
from warnings import warn as _warn
_warn("mktemp is a potential security risk to your program",
RuntimeWarning, stacklevel=2)
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, prefix + name + suffix)
if not _os.path.exists(file):
return file
raise IOError, (_errno.EEXIST, "No usable temporary filename found")
class TemporaryFileWrapper:
class _TemporaryFileWrapper:
"""Temporary file wrapper
This class provides a wrapper around files opened for temporary use.
In particular, it seeks to automatically remove the file when it is
no longer needed.
This class provides a wrapper around files opened for
temporary use. In particular, it seeks to automatically
remove the file when it is no longer needed.
"""
# Cache the unlinker so we don't get spurious errors at shutdown
# when the module-level "os" is None'd out. Note that this must
# be referenced as self.unlink, because the name TemporaryFileWrapper
# may also get None'd out before __del__ is called.
unlink = os.unlink
def __init__(self, file, path):
def __init__(self, file, name):
self.file = file
self.path = path
self.name = name
self.close_called = 0
def close(self):
if not self.close_called:
self.close_called = 1
self.file.close()
self.unlink(self.path)
def __del__(self):
self.close()
def __getattr__(self, name):
file = self.__dict__['file']
a = getattr(file, name)
@ -180,93 +363,81 @@ class TemporaryFileWrapper:
setattr(self, name, a)
return a
try:
import fcntl as _fcntl
def _set_cloexec(fd, flag=_fcntl.FD_CLOEXEC):
flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
if flags >= 0:
# flags read successfully, modify
flags |= flag
_fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
except (ImportError, AttributeError):
def _set_cloexec(fd):
pass
# NT provides delete-on-close as a primitive, so we don't need
# the wrapper to do anything special. We still use it so that
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
if _os.name != 'nt':
def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
"""Create and return a temporary file (opened read-write by default)."""
name = mktemp(suffix)
if os.name == 'posix':
# Unix -- be very careful
fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
_set_cloexec(fd)
try:
os.unlink(name)
return os.fdopen(fd, mode, bufsize)
except:
os.close(fd)
raise
elif os.name == 'nt':
# Windows -- can't unlink an open file, but O_TEMPORARY creates a
# file that "deletes itself" when the last handle is closed.
# O_NOINHERIT ensures processes created via spawn() don't get a
# handle to this too. That would be a security hole, and, on my
# Win98SE box, when an O_TEMPORARY file is inherited by a spawned
# process, the fd in the spawned process seems to lack the
# O_TEMPORARY flag, so the file doesn't go away by magic then if the
# spawning process closes it first.
flags = (os.O_RDWR | os.O_CREAT | os.O_EXCL |
os.O_TEMPORARY | os.O_NOINHERIT)
if 'b' in mode:
flags |= os.O_BINARY
fd = os.open(name, flags, 0700)
return os.fdopen(fd, mode, bufsize)
else:
# Assume we can't unlink a file that's still open, or arrange for
# an automagically self-deleting file -- use wrapper.
file = open(name, mode, bufsize)
return TemporaryFileWrapper(file, name)
# Cache the unlinker so we don't get spurious errors at
# shutdown when the module-level "os" is None'd out. Note
# that this must be referenced as self.unlink, because the
# name TemporaryFileWrapper may also get None'd out before
# __del__ is called.
unlink = _os.unlink
# In order to generate unique names, mktemp() uses _counter.get_next().
# This returns a unique integer on each call, in a threadsafe way (i.e.,
# multiple threads will never see the same integer). The integer will
# usually be a Python int, but if _counter.get_next() is called often
# enough, it will become a Python long.
# Note that the only names that survive this next block of code
# are "_counter" and "_tempdir_lock".
def close(self):
if not self.close_called:
self.close_called = 1
self.file.close()
self.unlink(self.name)
class _ThreadSafeCounter:
def __init__(self, mutex, initialvalue=0):
self.mutex = mutex
self.i = initialvalue
def __del__(self):
self.close()
def get_next(self):
self.mutex.acquire()
result = self.i
try:
newi = result + 1
except OverflowError:
newi = long(result) + 1
self.i = newi
self.mutex.release()
return result
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
prefix=template, dir=gettempdir()):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'dir' -- as for mkstemp.
'mode' -- the mode argument to os.fdopen (default "w+b").
'bufsize' -- the buffer size argument to os.fdopen (default -1).
The file is created as mkstemp() would do it.
try:
import thread
Returns a file object; the name of the file is accessible as
file.name. The file will be automatically deleted when it is
closed.
"""
except ImportError:
class _DummyMutex:
def acquire(self):
pass
bin = 'b' in mode
if bin: flags = _bin_openflags
else: flags = _text_openflags
release = acquire
# Setting O_TEMPORARY in the flags causes the OS to delete
# the file when it is closed. This is only supported by Windows.
if _os.name == 'nt':
flags |= _os.O_TEMPORARY
_counter = _ThreadSafeCounter(_DummyMutex())
_tempdir_lock = _DummyMutex()
del _DummyMutex
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
file = _os.fdopen(fd, mode, bufsize)
return _TemporaryFileWrapper(file, name)
if _os.name != 'posix':
# On non-POSIX systems, assume that we cannot unlink a file while
# it is open.
TemporaryFile = NamedTemporaryFile
else:
_counter = _ThreadSafeCounter(thread.allocate_lock())
_tempdir_lock = thread.allocate_lock()
del thread
def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
prefix=template, dir=gettempdir()):
"""Create and return a temporary file.
Arguments:
'prefix', 'suffix', 'directory' -- as for mkstemp.
'mode' -- the mode argument to os.fdopen (default "w+b").
'bufsize' -- the buffer size argument to os.fdopen (default -1).
The file is created as mkstemp() would do it.
del _ThreadSafeCounter
Returns a file object. The file has no name, and will cease to
exist when it is closed.
"""
bin = 'b' in mode
if bin: flags = _bin_openflags
else: flags = _text_openflags
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
try:
_os.unlink(name)
return _os.fdopen(fd, mode, bufsize)
except:
_os.close(fd)
raise

View File

@ -1,10 +1,707 @@
# SF bug #476138: tempfile behavior across platforms
# Ensure that a temp file can be closed any number of times without error.
# tempfile.py unit tests.
import tempfile
import os
import sys
import re
import errno
import warnings
f = tempfile.TemporaryFile("w+b")
f.write('abc\n')
f.close()
f.close()
f.close()
import unittest
from test import test_support
if hasattr(os, 'stat'):
import stat
has_stat = 1
else:
has_stat = 0
has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
# This is organized as one test for each chunk of code in tempfile.py,
# in order of their appearance in the file. Testing which requires
# threads is not done here.
# Common functionality.
class TC(unittest.TestCase):
str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
def failOnException(self, what, ei=None):
if ei is None:
ei = sys.exc_info()
self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
def nameCheck(self, name, dir, pre, suf):
(ndir, nbase) = os.path.split(name)
npre = nbase[:len(pre)]
nsuf = nbase[len(nbase)-len(suf):]
self.assertEqual(ndir, dir,
"file '%s' not in directory '%s'" % (name, dir))
self.assertEqual(npre, pre,
"file '%s' does not begin with '%s'" % (nbase, pre))
self.assertEqual(nsuf, suf,
"file '%s' does not end with '%s'" % (nbase, suf))
nbase = nbase[len(pre):len(nbase)-len(suf)]
self.assert_(self.str_check.match(nbase),
"random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
% nbase)
test_classes = []
class test_exports(TC):
def test_exports(self):
"""There are no surprising symbols in the tempfile module"""
dict = tempfile.__dict__
expected = {
"NamedTemporaryFile" : 1,
"TemporaryFile" : 1,
"mkstemp" : 1,
"mkdtemp" : 1,
"mktemp" : 1,
"TMP_MAX" : 1,
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
"template" : 1
}
unexp = []
for key in dict:
if key[0] != '_' and key not in expected:
unexp.append(key)
self.failUnless(len(unexp) == 0,
"unexpected keys: %s" % unexp)
test_classes.append(test_exports)
class test__once(TC):
"""Test the internal function _once."""
def setUp(self):
tempfile.once_var = None
self.already_called = 0
def tearDown(self):
del tempfile.once_var
def callMeOnce(self):
self.failIf(self.already_called, "callMeOnce called twice")
self.already_called = 1
return 24
def do_once(self):
tempfile._once('once_var', self.callMeOnce)
def test_once_initializes(self):
"""_once initializes its argument"""
self.do_once()
self.assertEqual(tempfile.once_var, 24,
"once_var=%d, not 24" % tempfile.once_var)
self.assertEqual(self.already_called, 1,
"already_called=%d, not 1" % self.already_called)
def test_once_means_once(self):
"""_once calls the callback just once"""
self.do_once()
self.do_once()
self.do_once()
self.do_once()
def test_once_namespace_safe(self):
"""_once does not modify anything but its argument"""
env_copy = tempfile.__dict__.copy()
self.do_once()
env = tempfile.__dict__
a = env.keys()
a.sort()
b = env_copy.keys()
b.sort()
self.failIf(len(a) != len(b))
for i in xrange(len(a)):
self.failIf(a[i] != b[i])
key = a[i]
if key != 'once_var':
self.failIf(env[key] != env_copy[key])
test_classes.append(test__once)
class test__RandomNameSequence(TC):
"""Test the internal iterator object _RandomNameSequence."""
def setUp(self):
self.r = tempfile._RandomNameSequence()
def test_get_six_char_str(self):
"""_RandomNameSequence returns a six-character string"""
s = self.r.next()
self.nameCheck(s, '', '', '')
def test_many(self):
"""_RandomNameSequence returns no duplicate strings (stochastic)"""
dict = {}
r = self.r
for i in xrange(1000):
s = r.next()
self.nameCheck(s, '', '', '')
self.failIf(s in dict)
dict[s] = 1
def test_supports_iter(self):
"""_RandomNameSequence supports the iterator protocol"""
i = 0
r = self.r
try:
for s in r:
i += 1
if i == 20:
break
except:
failOnException("iteration")
test_classes.append(test__RandomNameSequence)
class test__candidate_tempdir_list(TC):
"""Test the internal function _candidate_tempdir_list."""
def test_nonempty_list(self):
"""_candidate_tempdir_list returns a nonempty list of strings"""
cand = tempfile._candidate_tempdir_list()
self.failIf(len(cand) == 0)
for c in cand:
self.assert_(isinstance(c, basestring),
"%s is not a string" % c)
def test_wanted_dirs(self):
"""_candidate_tempdir_list contains the expected directories"""
# Make sure the interesting environment variables are all set.
added = []
try:
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname:
os.environ[envname] = os.path.abspath(envname)
added.append(envname)
cand = tempfile._candidate_tempdir_list()
for envname in 'TMPDIR', 'TEMP', 'TMP':
dirname = os.getenv(envname)
if not dirname: raise ValueError
self.assert_(dirname in cand)
try:
dirname = os.getcwd()
except (AttributeError, os.error):
dirname = os.curdir
self.assert_(dirname in cand)
# Not practical to try to verify the presence of OS-specific
# paths in this list.
finally:
for p in added:
del os.environ[p]
test_classes.append(test__candidate_tempdir_list)
# We test _get_default_tempdir by testing gettempdir.
class test__get_candidate_names(TC):
"""Test the internal function _get_candidate_names."""
def test_retval(self):
"""_get_candidate_names returns a _RandomNameSequence object"""
obj = tempfile._get_candidate_names()
self.assert_(isinstance(obj, tempfile._RandomNameSequence))
def test_same_thing(self):
"""_get_candidate_names always returns the same object"""
a = tempfile._get_candidate_names()
b = tempfile._get_candidate_names()
self.assert_(a is b)
test_classes.append(test__get_candidate_names)
class test__mkstemp_inner(TC):
"""Test the internal function _mkstemp_inner."""
class mkstemped:
_bflags = tempfile._bin_openflags
_tflags = tempfile._text_openflags
_close = os.close
_unlink = os.unlink
def __init__(self, dir, pre, suf, bin):
if bin: flags = self._bflags
else: flags = self._tflags
(self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
def write(self, str):
os.write(self.fd, str)
def __del__(self):
self._close(self.fd)
self._unlink(self.name)
def do_create(self, dir=None, pre="", suf="", bin=1):
if dir is None:
dir = tempfile.gettempdir()
try:
file = self.mkstemped(dir, pre, suf, bin)
except:
self.failOnException("_mkstemp_inner")
self.nameCheck(file.name, dir, pre, suf)
return file
def test_basic(self):
"""_mkstemp_inner can create files"""
self.do_create().write("blat")
self.do_create(pre="a").write("blat")
self.do_create(suf="b").write("blat")
self.do_create(pre="a", suf="b").write("blat")
self.do_create(pre="aa", suf=".txt").write("blat")
def test_basic_many(self):
"""_mkstemp_inner can create many files (stochastic)"""
extant = range(1000)
for i in extant:
extant[i] = self.do_create(pre="aa")
def test_choose_directory(self):
"""_mkstemp_inner can create files in a user-selected directory"""
dir = tempfile.mkdtemp()
try:
self.do_create(dir=dir).write("blat")
finally:
os.rmdir(dir)
def test_file_mode(self):
"""_mkstemp_inner creates files with the proper mode"""
if not has_stat:
return # ugh, can't use TestSkipped.
file = self.do_create()
mode = stat.S_IMODE(os.stat(file.name).st_mode)
self.assertEqual(mode, 0600)
def test_noinherit(self):
"""_mkstemp_inner file handles are not inherited by child processes"""
# FIXME: Find a way to test this on Windows.
if os.name != 'posix':
return # ugh, can't use TestSkipped.
file = self.do_create()
# We have to exec something, so that FD_CLOEXEC will take
# effect. The sanest thing to try is /bin/sh; we can easily
# instruct it to attempt to write to the fd and report success
# or failure. Unfortunately, sh syntax does not permit use of
# fds numerically larger than 9; abandon this test if so.
if file.fd > 9:
raise test_support.TestSkipped, 'cannot test with fd %d' % file.fd
pid = os.fork()
if pid:
status = os.wait()[1]
self.failUnless(os.WIFEXITED(status),
"child process did not exit (status %d)" % status)
# We want the child to have exited _un_successfully, indicating
# failure to write to the closed fd.
self.failUnless(os.WEXITSTATUS(status) != 0,
"child process exited successfully")
else:
try:
# Throw away stderr.
nul = os.open('/dev/null', os.O_RDWR)
os.dup2(nul, 2)
os.execv('/bin/sh', ['sh', '-c', 'echo blat >&%d' % file.fd])
except:
os._exit(0)
def test_textmode(self):
"""_mkstemp_inner can create files in text mode"""
if not has_textmode:
return # ugh, can't use TestSkipped.
self.do_create(bin=0).write("blat\n")
# XXX should test that the file really is a text file
test_classes.append(test__mkstemp_inner)
class test_gettempprefix(TC):
"""Test gettempprefix()."""
def test_sane_template(self):
"""gettempprefix returns a nonempty prefix string"""
p = tempfile.gettempprefix()
self.assert_(isinstance(p, basestring))
self.assert_(len(p) > 0)
def test_usable_template(self):
"""gettempprefix returns a usable prefix string"""
# Create a temp directory, avoiding use of the prefix.
# Then attempt to create a file whose name is
# prefix + 'xxxxxx.xxx' in that directory.
p = tempfile.gettempprefix() + "xxxxxx.xxx"
d = tempfile.mkdtemp(prefix="")
try:
p = os.path.join(d, p)
try:
fd = os.open(p, os.O_RDWR | os.O_CREAT)
except:
self.failOnException("os.open")
os.close(fd)
os.unlink(p)
finally:
os.rmdir(d)
test_classes.append(test_gettempprefix)
class test_gettempdir(TC):
"""Test gettempdir()."""
def test_directory_exists(self):
"""gettempdir returns a directory which exists"""
dir = tempfile.gettempdir()
self.assert_(os.path.isabs(dir) or dir == os.curdir,
"%s is not an absolute path" % dir)
self.assert_(os.path.isdir(dir),
"%s is not a directory" % dir)
def test_directory_writable(self):
"""gettempdir returns a directory writable by the user"""
# sneaky: just instantiate a NamedTemporaryFile, which
# defaults to writing into the directory returned by
# gettempdir.
try:
file = tempfile.NamedTemporaryFile()
file.write("blat")
file.close()
except:
self.failOnException("create file in %s" % tempfile.gettempdir())
def test_same_thing(self):
"""gettempdir always returns the same object"""
a = tempfile.gettempdir()
b = tempfile.gettempdir()
self.assert_(a is b)
test_classes.append(test_gettempdir)
class test_mkstemp(TC):
"""Test mkstemp()."""
def do_create(self, dir=None, pre="", suf="", ):
if dir is None:
dir = tempfile.gettempdir()
try:
(fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
except:
self.failOnException("mkstemp")
try:
self.nameCheck(name, dir, pre, suf)
finally:
os.close(fd)
os.unlink(name)
def test_basic(self):
"""mkstemp can create files"""
self.do_create()
self.do_create(pre="a")
self.do_create(suf="b")
self.do_create(pre="a", suf="b")
self.do_create(pre="aa", suf=".txt")
def test_choose_directory(self):
"""mkstemp can create directories in a user-selected directory"""
dir = tempfile.mkdtemp()
try:
self.do_create(dir=dir)
finally:
os.rmdir(dir)
test_classes.append(test_mkstemp)
class test_mkdtemp(TC):
"""Test mkdtemp()."""
def do_create(self, dir=None, pre="", suf=""):
if dir is None:
dir = tempfile.gettempdir()
try:
name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
except:
self.failOnException("mkdtemp")
try:
self.nameCheck(name, dir, pre, suf)
return name
except:
os.rmdir(name)
raise
def test_basic(self):
"""mkdtemp can create directories"""
os.rmdir(self.do_create())
os.rmdir(self.do_create(pre="a"))
os.rmdir(self.do_create(suf="b"))
os.rmdir(self.do_create(pre="a", suf="b"))
os.rmdir(self.do_create(pre="aa", suf=".txt"))
def test_basic_many(self):
"""mkdtemp can create many directories (stochastic)"""
extant = range(1000)
try:
for i in extant:
extant[i] = self.do_create(pre="aa")
finally:
for i in extant:
if(isinstance(i, basestring)):
os.rmdir(i)
def test_choose_directory(self):
"""mkdtemp can create directories in a user-selected directory"""
dir = tempfile.mkdtemp()
try:
os.rmdir(self.do_create(dir=dir))
finally:
os.rmdir(dir)
def test_mode(self):
"""mkdtemp creates directories with the proper mode"""
if not has_stat:
return # ugh, can't use TestSkipped.
dir = self.do_create()
try:
mode = stat.S_IMODE(os.stat(dir).st_mode)
self.assertEqual(mode, 0700)
finally:
os.rmdir(dir)
test_classes.append(test_mkdtemp)
class test_mktemp(TC):
"""Test mktemp()."""
# For safety, all use of mktemp must occur in a private directory.
# We must also suppress the RuntimeWarning it generates.
def setUp(self):
self.dir = tempfile.mkdtemp()
warnings.filterwarnings("ignore",
category=RuntimeWarning,
message="mktemp")
def tearDown(self):
if self.dir:
os.rmdir(self.dir)
self.dir = None
# XXX This clobbers any -W options.
warnings.resetwarnings()
class mktemped:
_unlink = os.unlink
_bflags = tempfile._bin_openflags
def __init__(self, dir, pre, suf):
self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
# Create the file. This will raise an exception if it's
# mysteriously appeared in the meanwhile.
os.close(os.open(self.name, self._bflags, 0600))
def __del__(self):
self._unlink(self.name)
def do_create(self, pre="", suf=""):
try:
file = self.mktemped(self.dir, pre, suf)
except:
self.failOnException("mktemp")
self.nameCheck(file.name, self.dir, pre, suf)
return file
def test_basic(self):
"""mktemp can choose usable file names"""
self.do_create()
self.do_create(pre="a")
self.do_create(suf="b")
self.do_create(pre="a", suf="b")
self.do_create(pre="aa", suf=".txt")
def test_many(self):
"""mktemp can choose many usable file names (stochastic)"""
extant = range(1000)
for i in extant:
extant[i] = self.do_create(pre="aa")
def test_warning(self):
"""mktemp issues a warning when used"""
warnings.filterwarnings("error",
category=RuntimeWarning,
message="mktemp")
self.assertRaises(RuntimeWarning,
tempfile.mktemp, (), { 'dir': self.dir })
test_classes.append(test_mktemp)
# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
class test_NamedTemporaryFile(TC):
"""Test NamedTemporaryFile()."""
def do_create(self, dir=None, pre="", suf=""):
if dir is None:
dir = tempfile.gettempdir()
try:
file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf)
except:
self.failOnException("NamedTemporaryFile")
self.nameCheck(file.name, dir, pre, suf)
return file
def test_basic(self):
"""NamedTemporaryFile can create files"""
self.do_create()
self.do_create(pre="a")
self.do_create(suf="b")
self.do_create(pre="a", suf="b")
self.do_create(pre="aa", suf=".txt")
def test_creates_named(self):
"""NamedTemporaryFile creates files with names"""
f = tempfile.NamedTemporaryFile()
self.failUnless(os.path.exists(f.name),
"NamedTemporaryFile %s does not exist" % f.name)
def test_del_on_close(self):
"""A NamedTemporaryFile is deleted when closed"""
dir = tempfile.mkdtemp()
try:
f = tempfile.NamedTemporaryFile(dir=dir)
f.write('blat')
f.close()
self.failIf(os.path.exists(f.name),
"NamedTemporaryFile %s exists after close" % f.name)
finally:
os.rmdir(dir)
def test_multiple_close(self):
"""A NamedTemporaryFile can be closed many times without error"""
f = tempfile.NamedTemporaryFile()
f.write('abc\n')
f.close()
try:
f.close()
f.close()
except:
self.failOnException("close")
# How to test the mode and bufsize parameters?
test_classes.append(test_NamedTemporaryFile)
class test_TemporaryFile(TC):
"""Test TemporaryFile()."""
def test_basic(self):
"""TemporaryFile can create files"""
# No point in testing the name params - the file has no name.
try:
tempfile.TemporaryFile()
except:
self.failOnException("TemporaryFile")
def test_has_no_name(self):
"""TemporaryFile creates files with no names (on this system)"""
dir = tempfile.mkdtemp()
f = tempfile.TemporaryFile(dir=dir)
f.write('blat')
# Sneaky: because this file has no name, it should not prevent
# us from removing the directory it was created in.
try:
os.rmdir(dir)
except:
ei = sys.exc_info()
# cleanup
f.close()
os.rmdir(dir)
self.failOnException("rmdir", ei)
def test_multiple_close(self):
"""A TemporaryFile can be closed many times without error"""
f = tempfile.TemporaryFile()
f.write('abc\n')
f.close()
try:
f.close()
f.close()
except:
self.failOnException("close")
# How to test the mode and bufsize parameters?
class dummy_test_TemporaryFile(TC):
def test_dummy(self):
"""TemporaryFile and NamedTemporaryFile are the same (on this system)"""
pass
if tempfile.NamedTemporaryFile is tempfile.TemporaryFile:
test_classes.append(dummy_test_TemporaryFile)
else:
test_classes.append(test_TemporaryFile)
def test_main():
suite = unittest.TestSuite()
for c in test_classes:
suite.addTest(unittest.makeSuite(c))
test_support.run_suite(suite)
if __name__ == "__main__":
test_main()