Merged revisions 85403 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r85403 | brian.curtin | 2010-10-12 21:56:26 -0500 (Tue, 12 Oct 2010) | 2 lines

  Implement #7944. Use `with` throughout the test suite.
........
This commit is contained in:
Brian Curtin 2010-10-14 02:01:12 +00:00
parent 04d8b220e1
commit 6ec1eb8e73
1 changed files with 97 additions and 122 deletions

View File

@ -54,68 +54,68 @@ class BZ2FileTest(BaseTest):
os.unlink(self.filename) os.unlink(self.filename)
def createTempFile(self, crlf=0): def createTempFile(self, crlf=0):
f = open(self.filename, "wb") with open(self.filename, "wb") as f:
if crlf: if crlf:
data = self.DATA_CRLF data = self.DATA_CRLF
else: else:
data = self.DATA data = self.DATA
f.write(data) f.write(data)
f.close()
def testRead(self): def testRead(self):
# "Test BZ2File.read()" # "Test BZ2File.read()"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
self.assertRaises(TypeError, bz2f.read, None) self.assertRaises(TypeError, bz2f.read, None)
self.assertEqual(bz2f.read(), self.TEXT) self.assertEqual(bz2f.read(), self.TEXT)
bz2f.close()
def testRead0(self):
# Test BBZ2File.read(0)"
self.createTempFile()
with BZ2File(self.filename) as bz2f:
self.assertRaises(TypeError, bz2f.read, None)
self.assertEqual(bz2f.read(0), "")
def testReadChunk10(self): def testReadChunk10(self):
# "Test BZ2File.read() in chunks of 10 bytes" # "Test BZ2File.read() in chunks of 10 bytes"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
text = '' text = ''
while 1: while 1:
str = bz2f.read(10) str = bz2f.read(10)
if not str: if not str:
break break
text += str text += str
self.assertEqual(text, text) self.assertEqual(text, text)
bz2f.close()
def testRead100(self): def testRead100(self):
# "Test BZ2File.read(100)" # "Test BZ2File.read(100)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
self.assertEqual(bz2f.read(100), self.TEXT[:100]) self.assertEqual(bz2f.read(100), self.TEXT[:100])
bz2f.close()
def testReadLine(self): def testReadLine(self):
# "Test BZ2File.readline()" # "Test BZ2File.readline()"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
self.assertRaises(TypeError, bz2f.readline, None) self.assertRaises(TypeError, bz2f.readline, None)
sio = StringIO(self.TEXT) sio = StringIO(self.TEXT)
for line in sio.readlines(): for line in sio.readlines():
self.assertEqual(bz2f.readline(), line) self.assertEqual(bz2f.readline(), line)
bz2f.close()
def testReadLines(self): def testReadLines(self):
# "Test BZ2File.readlines()" # "Test BZ2File.readlines()"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
self.assertRaises(TypeError, bz2f.readlines, None) self.assertRaises(TypeError, bz2f.readlines, None)
sio = StringIO(self.TEXT) sio = StringIO(self.TEXT)
self.assertEqual(bz2f.readlines(), sio.readlines()) self.assertEqual(bz2f.readlines(), sio.readlines())
bz2f.close()
def testIterator(self): def testIterator(self):
# "Test iter(BZ2File)" # "Test iter(BZ2File)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
sio = StringIO(self.TEXT) sio = StringIO(self.TEXT)
self.assertEqual(list(iter(bz2f)), sio.readlines()) self.assertEqual(list(iter(bz2f)), sio.readlines())
bz2f.close()
def testClosedIteratorDeadlock(self): def testClosedIteratorDeadlock(self):
# "Test that iteration on a closed bz2file releases the lock." # "Test that iteration on a closed bz2file releases the lock."
@ -154,104 +154,91 @@ class BZ2FileTest(BaseTest):
def testWrite(self): def testWrite(self):
# "Test BZ2File.write()" # "Test BZ2File.write()"
bz2f = BZ2File(self.filename, "w") with BZ2File(self.filename, "w") as bz2f:
self.assertRaises(TypeError, bz2f.write) self.assertRaises(TypeError, bz2f.write)
bz2f.write(self.TEXT) bz2f.write(self.TEXT)
bz2f.close() with open(self.filename, 'rb') as f:
f = open(self.filename, 'rb') self.assertEqual(self.decompress(f.read()), self.TEXT)
self.assertEqual(self.decompress(f.read()), self.TEXT)
f.close()
def testWriteChunks10(self): def testWriteChunks10(self):
# "Test BZ2File.write() with chunks of 10 bytes" # "Test BZ2File.write() with chunks of 10 bytes"
bz2f = BZ2File(self.filename, "w") with BZ2File(self.filename, "w") as bz2f:
n = 0 n = 0
while 1: while 1:
str = self.TEXT[n*10:(n+1)*10] str = self.TEXT[n*10:(n+1)*10]
if not str: if not str:
break break
bz2f.write(str) bz2f.write(str)
n += 1 n += 1
bz2f.close() with open(self.filename, 'rb') as f:
f = open(self.filename, 'rb') self.assertEqual(self.decompress(f.read()), self.TEXT)
self.assertEqual(self.decompress(f.read()), self.TEXT)
f.close()
def testWriteLines(self): def testWriteLines(self):
# "Test BZ2File.writelines()" # "Test BZ2File.writelines()"
bz2f = BZ2File(self.filename, "w") with BZ2File(self.filename, "w") as bz2f:
self.assertRaises(TypeError, bz2f.writelines) self.assertRaises(TypeError, bz2f.writelines)
sio = StringIO(self.TEXT) sio = StringIO(self.TEXT)
bz2f.writelines(sio.readlines()) bz2f.writelines(sio.readlines())
bz2f.close()
# patch #1535500 # patch #1535500
self.assertRaises(ValueError, bz2f.writelines, ["a"]) self.assertRaises(ValueError, bz2f.writelines, ["a"])
f = open(self.filename, 'rb') with open(self.filename, 'rb') as f:
self.assertEqual(self.decompress(f.read()), self.TEXT) self.assertEqual(self.decompress(f.read()), self.TEXT)
f.close()
def testWriteMethodsOnReadOnlyFile(self): def testWriteMethodsOnReadOnlyFile(self):
bz2f = BZ2File(self.filename, "w") with BZ2File(self.filename, "w") as bz2f:
bz2f.write("abc") bz2f.write("abc")
bz2f.close()
bz2f = BZ2File(self.filename, "r") with BZ2File(self.filename, "r") as bz2f:
self.assertRaises(IOError, bz2f.write, "a") self.assertRaises(IOError, bz2f.write, "a")
self.assertRaises(IOError, bz2f.writelines, ["a"]) self.assertRaises(IOError, bz2f.writelines, ["a"])
def testSeekForward(self): def testSeekForward(self):
# "Test BZ2File.seek(150, 0)" # "Test BZ2File.seek(150, 0)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
self.assertRaises(TypeError, bz2f.seek) self.assertRaises(TypeError, bz2f.seek)
bz2f.seek(150) bz2f.seek(150)
self.assertEqual(bz2f.read(), self.TEXT[150:]) self.assertEqual(bz2f.read(), self.TEXT[150:])
bz2f.close()
def testSeekBackwards(self): def testSeekBackwards(self):
# "Test BZ2File.seek(-150, 1)" # "Test BZ2File.seek(-150, 1)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
bz2f.read(500) bz2f.read(500)
bz2f.seek(-150, 1) bz2f.seek(-150, 1)
self.assertEqual(bz2f.read(), self.TEXT[500-150:]) self.assertEqual(bz2f.read(), self.TEXT[500-150:])
bz2f.close()
def testSeekBackwardsFromEnd(self): def testSeekBackwardsFromEnd(self):
# "Test BZ2File.seek(-150, 2)" # "Test BZ2File.seek(-150, 2)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
bz2f.seek(-150, 2) bz2f.seek(-150, 2)
self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:])
bz2f.close()
def testSeekPostEnd(self): def testSeekPostEnd(self):
# "Test BZ2File.seek(150000)" # "Test BZ2File.seek(150000)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
bz2f.seek(150000) bz2f.seek(150000)
self.assertEqual(bz2f.tell(), len(self.TEXT)) self.assertEqual(bz2f.tell(), len(self.TEXT))
self.assertEqual(bz2f.read(), "") self.assertEqual(bz2f.read(), "")
bz2f.close()
def testSeekPostEndTwice(self): def testSeekPostEndTwice(self):
# "Test BZ2File.seek(150000) twice" # "Test BZ2File.seek(150000) twice"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
bz2f.seek(150000) bz2f.seek(150000)
bz2f.seek(150000) bz2f.seek(150000)
self.assertEqual(bz2f.tell(), len(self.TEXT)) self.assertEqual(bz2f.tell(), len(self.TEXT))
self.assertEqual(bz2f.read(), "") self.assertEqual(bz2f.read(), "")
bz2f.close()
def testSeekPreStart(self): def testSeekPreStart(self):
# "Test BZ2File.seek(-150, 0)" # "Test BZ2File.seek(-150, 0)"
self.createTempFile() self.createTempFile()
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
bz2f.seek(-150) bz2f.seek(-150)
self.assertEqual(bz2f.tell(), 0) self.assertEqual(bz2f.tell(), 0)
self.assertEqual(bz2f.read(), self.TEXT) self.assertEqual(bz2f.read(), self.TEXT)
bz2f.close()
def testOpenDel(self): def testOpenDel(self):
# "Test opening and deleting a file many times" # "Test opening and deleting a file many times"
@ -277,16 +264,13 @@ class BZ2FileTest(BaseTest):
def testBug1191043(self): def testBug1191043(self):
# readlines() for files containing no newline # readlines() for files containing no newline
data = 'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' data = 'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
f = open(self.filename, "wb") with open(self.filename, "wb") as f:
f.write(data) f.write(data)
f.close() with BZ2File(self.filename) as bz2f:
bz2f = BZ2File(self.filename) lines = bz2f.readlines()
lines = bz2f.readlines()
bz2f.close()
self.assertEqual(lines, ['Test']) self.assertEqual(lines, ['Test'])
bz2f = BZ2File(self.filename) with BZ2File(self.filename) as bz2f:
xlines = list(bz2f.xreadlines()) xlines = list(bz2f.readlines())
bz2f.close()
self.assertEqual(xlines, ['Test']) self.assertEqual(xlines, ['Test'])
def testContextProtocol(self): def testContextProtocol(self):
@ -316,8 +300,7 @@ class BZ2FileTest(BaseTest):
# Using a BZ2File from several threads doesn't deadlock (issue #7205). # Using a BZ2File from several threads doesn't deadlock (issue #7205).
data = "1" * 2**20 data = "1" * 2**20
nthreads = 10 nthreads = 10
f = bz2.BZ2File(self.filename, 'wb') with bz2.BZ2File(self.filename, 'wb') as f:
try:
def comp(): def comp():
for i in range(5): for i in range(5):
f.write(data) f.write(data)
@ -326,27 +309,19 @@ class BZ2FileTest(BaseTest):
t.start() t.start()
for t in threads: for t in threads:
t.join() t.join()
finally:
f.close()
def testMixedIterationReads(self): def testMixedIterationReads(self):
# Issue #8397: mixed iteration and reads should be forbidden. # Issue #8397: mixed iteration and reads should be forbidden.
f = bz2.BZ2File(self.filename, 'wb') with bz2.BZ2File(self.filename, 'wb') as f:
try:
# The internal buffer size is hard-wired to 8192 bytes, we must # The internal buffer size is hard-wired to 8192 bytes, we must
# write out more than that for the test to stop half through # write out more than that for the test to stop half through
# the buffer. # the buffer.
f.write(self.TEXT * 100) f.write(self.TEXT * 100)
finally: with bz2.BZ2File(self.filename, 'rb') as f:
f.close()
f = bz2.BZ2File(self.filename, 'rb')
try:
next(f) next(f)
self.assertRaises(ValueError, f.read) self.assertRaises(ValueError, f.read)
self.assertRaises(ValueError, f.readline) self.assertRaises(ValueError, f.readline)
self.assertRaises(ValueError, f.readlines) self.assertRaises(ValueError, f.readlines)
finally:
f.close()
class BZ2CompressorTest(BaseTest): class BZ2CompressorTest(BaseTest):
def testCompress(self): def testCompress(self):