patch [ 810023 ] Fix for off-by-one bug in urllib.URLopener.retrieve

This commit is contained in:
Georg Brandl 2005-08-26 08:51:34 +00:00
parent b3f55f4a70
commit 5a650a253c
3 changed files with 102 additions and 17 deletions

View File

@ -6,6 +6,7 @@ import unittest
from test import test_support
import os
import mimetools
import tempfile
import StringIO
def hexescape(char):
@ -125,15 +126,53 @@ class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
def setUp(self):
# Create a list of temporary files. Each item in the list is a file
# name (absolute path or relative to the current working directory).
# All files in this list will be deleted in the tearDown method. Note,
# this only helps to makes sure temporary files get deleted, but it
# does nothing about trying to close files that may still be open. It
# is the responsibility of the developer to properly close files even
# when exceptional conditions occur.
self.tempFiles = []
# Create a temporary file.
self.registerFileForCleanUp(test_support.TESTFN)
self.text = 'testing urllib.urlretrieve'
FILE = file(test_support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
try:
FILE = file(test_support.TESTFN, 'wb')
FILE.write(self.text)
FILE.close()
finally:
try: FILE.close()
except: pass
def tearDown(self):
# Delete the temporary file.
os.remove(test_support.TESTFN)
# Delete the temporary files.
for each in self.tempFiles:
try: os.remove(each)
except: pass
def constructLocalFileUrl(self, filePath):
return "file://%s" % urllib.pathname2url(os.path.abspath(filePath))
def createNewTempFile(self, data=""):
"""Creates a new temporary file containing the specified data,
registers the file for deletion during the test fixture tear down, and
returns the absolute path of the file."""
newFd, newFilePath = tempfile.mkstemp()
try:
self.registerFileForCleanUp(newFilePath)
newFile = os.fdopen(newFd, "wb")
newFile.write(data)
newFile.close()
finally:
try: newFile.close()
except: pass
return newFilePath
def registerFileForCleanUp(self, fileName):
self.tempFiles.append(fileName)
def test_basic(self):
# Make sure that a local file just gets its own location returned and
@ -147,15 +186,19 @@ class urlretrieve_FileTests(unittest.TestCase):
def test_copy(self):
# Test that setting the filename argument works.
second_temp = "%s.2" % test_support.TESTFN
result = urllib.urlretrieve("file:%s" % test_support.TESTFN, second_temp)
self.registerFileForCleanUp(second_temp)
result = urllib.urlretrieve(self.constructLocalFileUrl(
test_support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
self.assert_(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = file(second_temp, 'rb')
try:
text = FILE.read()
finally:
FILE.close()
finally:
try: FILE.close()
except: pass
self.assertEqual(self.text, text)
def test_reporthook(self):
@ -167,8 +210,49 @@ class urlretrieve_FileTests(unittest.TestCase):
self.assertEqual(count, count_holder[0])
count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % test_support.TESTFN
urllib.urlretrieve(test_support.TESTFN, second_temp, hooktester)
os.remove(second_temp)
self.registerFileForCleanUp(second_temp)
urllib.urlretrieve(self.constructLocalFileUrl(test_support.TESTFN),
second_temp, hooktester)
def test_reporthook_0_bytes(self):
# Test on zero length file. Should call reporthook only 1 time.
report = []
def hooktester(count, block_size, total_size, _report=report):
_report.append((count, block_size, total_size))
srcFileName = self.createNewTempFile()
urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
test_support.TESTFN, hooktester)
self.assertEqual(len(report), 1)
self.assertEqual(report[0][2], 0)
def test_reporthook_5_bytes(self):
# Test on 5 byte file. Should call reporthook only 2 times (once when
# the "network connection" is established and once when the block is
# read). Since the block size is 8192 bytes, only one block read is
# required to read the entire file.
report = []
def hooktester(count, block_size, total_size, _report=report):
_report.append((count, block_size, total_size))
srcFileName = self.createNewTempFile("x" * 5)
urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
test_support.TESTFN, hooktester)
self.assertEqual(len(report), 2)
self.assertEqual(report[0][1], 8192)
self.assertEqual(report[0][2], 5)
def test_reporthook_8193_bytes(self):
# Test on 8193 byte file. Should call reporthook only 3 times (once
# when the "network connection" is established, once for the next 8192
# bytes, and once for the last byte).
report = []
def hooktester(count, block_size, total_size, _report=report):
_report.append((count, block_size, total_size))
srcFileName = self.createNewTempFile("x" * 8193)
urllib.urlretrieve(self.constructLocalFileUrl(srcFileName),
test_support.TESTFN, hooktester)
self.assertEqual(len(report), 3)
self.assertEqual(report[0][1], 8192)
self.assertEqual(report[0][2], 8193)
class QuotingTests(unittest.TestCase):
"""Tests for urllib.quote() and urllib.quote_plus()

View File

@ -234,19 +234,17 @@ class URLopener:
bs = 1024*8
size = -1
read = 0
blocknum = 1
blocknum = 0
if reporthook:
if "content-length" in headers:
size = int(headers["Content-Length"])
reporthook(0, bs, size)
block = fp.read(bs)
read += len(block)
if reporthook:
reporthook(1, bs, size)
while block:
tfp.write(block)
reporthook(blocknum, bs, size)
while 1:
block = fp.read(bs)
if block == "":
break
read += len(block)
tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, bs, size)

View File

@ -204,6 +204,9 @@ Extension Modules
Library
-------
- Patch #810023: Fix off-by-one bug in urllib.urlretrieve reporthook
functionality.
- Bug #1163178: Make IDNA return an empty string when the input is empty.
- Patch #848017: Make Cookie more RFC-compliant. Use CRLF as default output