(Merge 3.4) Closes #21582: Cleanup test_asyncore. Patch written by diana.

- Use support.captured_stderr() where appropriate
- Removes some "from test.support import xxx" import and uses support.xxx
  instead.
This commit is contained in:
Victor Stinner 2014-06-27 22:47:41 +02:00
commit c3bc856f42
1 changed files with 17 additions and 36 deletions

View File

@ -5,14 +5,11 @@ import os
import socket import socket
import sys import sys
import time import time
import warnings
import errno import errno
import struct import struct
from test import support from test import support
from test.support import TESTFN, run_unittest, unlink, HOST, HOSTv6
from io import BytesIO from io import BytesIO
from io import StringIO
try: try:
import threading import threading
@ -94,7 +91,7 @@ def bind_af_aware(sock, addr):
"""Helper function to bind a socket according to its family.""" """Helper function to bind a socket according to its family."""
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist. # Make sure the path doesn't exist.
unlink(addr) support.unlink(addr)
sock.bind(addr) sock.bind(addr)
@ -257,40 +254,29 @@ class DispatcherTests(unittest.TestCase):
d = asyncore.dispatcher() d = asyncore.dispatcher()
# capture output of dispatcher.log() (to stderr) # capture output of dispatcher.log() (to stderr)
fp = StringIO()
stderr = sys.stderr
l1 = "Lovely spam! Wonderful spam!" l1 = "Lovely spam! Wonderful spam!"
l2 = "I don't like spam!" l2 = "I don't like spam!"
try: with support.captured_stderr() as stderr:
sys.stderr = fp
d.log(l1) d.log(l1)
d.log(l2) d.log(l2)
finally:
sys.stderr = stderr
lines = fp.getvalue().splitlines() lines = stderr.getvalue().splitlines()
self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
def test_log_info(self): def test_log_info(self):
d = asyncore.dispatcher() d = asyncore.dispatcher()
# capture output of dispatcher.log_info() (to stdout via print) # capture output of dispatcher.log_info() (to stdout via print)
fp = StringIO()
stdout = sys.stdout
l1 = "Have you got anything without spam?" l1 = "Have you got anything without spam?"
l2 = "Why can't she have egg bacon spam and sausage?" l2 = "Why can't she have egg bacon spam and sausage?"
l3 = "THAT'S got spam in it!" l3 = "THAT'S got spam in it!"
try: with support.captured_stdout() as stdout:
sys.stdout = fp
d.log_info(l1, 'EGGS') d.log_info(l1, 'EGGS')
d.log_info(l2) d.log_info(l2)
d.log_info(l3, 'SPAM') d.log_info(l3, 'SPAM')
finally:
sys.stdout = stdout
lines = fp.getvalue().splitlines() lines = stdout.getvalue().splitlines()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
self.assertEqual(lines, expected) self.assertEqual(lines, expected)
def test_unhandled(self): def test_unhandled(self):
@ -298,18 +284,13 @@ class DispatcherTests(unittest.TestCase):
d.ignore_log_types = () d.ignore_log_types = ()
# capture output of dispatcher.log_info() (to stdout via print) # capture output of dispatcher.log_info() (to stdout via print)
fp = StringIO() with support.captured_stdout() as stdout:
stdout = sys.stdout
try:
sys.stdout = fp
d.handle_expt() d.handle_expt()
d.handle_read() d.handle_read()
d.handle_write() d.handle_write()
d.handle_connect() d.handle_connect()
finally:
sys.stdout = stdout
lines = fp.getvalue().splitlines() lines = stdout.getvalue().splitlines()
expected = ['warning: unhandled incoming priority event', expected = ['warning: unhandled incoming priority event',
'warning: unhandled read event', 'warning: unhandled read event',
'warning: unhandled write event', 'warning: unhandled write event',
@ -360,7 +341,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?" data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread() d = dispatcherwithsend_noread()
d.create_socket() d.create_socket()
d.connect((HOST, port)) d.connect((support.HOST, port))
# give time for socket to connect # give time for socket to connect
time.sleep(0.1) time.sleep(0.1)
@ -388,14 +369,14 @@ class DispatcherWithSendTests(unittest.TestCase):
class FileWrapperTest(unittest.TestCase): class FileWrapperTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.d = b"It's not dead, it's sleeping!" self.d = b"It's not dead, it's sleeping!"
with open(TESTFN, 'wb') as file: with open(support.TESTFN, 'wb') as file:
file.write(self.d) file.write(self.d)
def tearDown(self): def tearDown(self):
unlink(TESTFN) support.unlink(support.TESTFN)
def test_recv(self): def test_recv(self):
fd = os.open(TESTFN, os.O_RDONLY) fd = os.open(support.TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd) w = asyncore.file_wrapper(fd)
os.close(fd) os.close(fd)
@ -409,20 +390,20 @@ class FileWrapperTest(unittest.TestCase):
def test_send(self): def test_send(self):
d1 = b"Come again?" d1 = b"Come again?"
d2 = b"I want to buy some cheese." d2 = b"I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd) w = asyncore.file_wrapper(fd)
os.close(fd) os.close(fd)
w.write(d1) w.write(d1)
w.send(d2) w.send(d2)
w.close() w.close()
with open(TESTFN, 'rb') as file: with open(support.TESTFN, 'rb') as file:
self.assertEqual(file.read(), self.d + d1 + d2) self.assertEqual(file.read(), self.d + d1 + d2)
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
'asyncore.file_dispatcher required') 'asyncore.file_dispatcher required')
def test_dispatcher(self): def test_dispatcher(self):
fd = os.open(TESTFN, os.O_RDONLY) fd = os.open(support.TESTFN, os.O_RDONLY)
data = [] data = []
class FileDispatcher(asyncore.file_dispatcher): class FileDispatcher(asyncore.file_dispatcher):
def handle_read(self): def handle_read(self):
@ -793,12 +774,12 @@ class BaseTestAPI:
class TestAPI_UseIPv4Sockets(BaseTestAPI): class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET family = socket.AF_INET
addr = (HOST, 0) addr = (support.HOST, 0)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
class TestAPI_UseIPv6Sockets(BaseTestAPI): class TestAPI_UseIPv6Sockets(BaseTestAPI):
family = socket.AF_INET6 family = socket.AF_INET6
addr = (HOSTv6, 0) addr = (support.HOSTv6, 0)
@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
class TestAPI_UseUnixSockets(BaseTestAPI): class TestAPI_UseUnixSockets(BaseTestAPI):
@ -807,7 +788,7 @@ class TestAPI_UseUnixSockets(BaseTestAPI):
addr = support.TESTFN addr = support.TESTFN
def tearDown(self): def tearDown(self):
unlink(self.addr) support.unlink(self.addr)
BaseTestAPI.tearDown(self) BaseTestAPI.tearDown(self)
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase): class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):