Closes #21582: Cleanup test_asyncore. Patch written by diana.

- Use support.captured_stderr() where appropriate
- Removes some "from test.support import xxx" import and uses support.xxx
  instead.
This commit is contained in:
Victor Stinner 2014-06-27 22:44:40 +02:00
parent 680241ec99
commit 252d40ef1e
1 changed files with 18 additions and 36 deletions

View File

@ -5,14 +5,12 @@ import os
import socket
import sys
import time
import warnings
import errno
import struct
import warnings
from test import support
from test.support import TESTFN, run_unittest, unlink, HOST, HOSTv6
from io import BytesIO
from io import StringIO
try:
import threading
@ -94,7 +92,7 @@ def bind_af_aware(sock, addr):
"""Helper function to bind a socket according to its family."""
if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX:
# Make sure the path doesn't exist.
unlink(addr)
support.unlink(addr)
sock.bind(addr)
@ -257,40 +255,29 @@ class DispatcherTests(unittest.TestCase):
d = asyncore.dispatcher()
# capture output of dispatcher.log() (to stderr)
fp = StringIO()
stderr = sys.stderr
l1 = "Lovely spam! Wonderful spam!"
l2 = "I don't like spam!"
try:
sys.stderr = fp
with support.captured_stderr() as stderr:
d.log(l1)
d.log(l2)
finally:
sys.stderr = stderr
lines = fp.getvalue().splitlines()
lines = stderr.getvalue().splitlines()
self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2])
def test_log_info(self):
d = asyncore.dispatcher()
# capture output of dispatcher.log_info() (to stdout via print)
fp = StringIO()
stdout = sys.stdout
l1 = "Have you got anything without spam?"
l2 = "Why can't she have egg bacon spam and sausage?"
l3 = "THAT'S got spam in it!"
try:
sys.stdout = fp
with support.captured_stdout() as stdout:
d.log_info(l1, 'EGGS')
d.log_info(l2)
d.log_info(l3, 'SPAM')
finally:
sys.stdout = stdout
lines = fp.getvalue().splitlines()
lines = stdout.getvalue().splitlines()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
self.assertEqual(lines, expected)
def test_unhandled(self):
@ -298,18 +285,13 @@ class DispatcherTests(unittest.TestCase):
d.ignore_log_types = ()
# capture output of dispatcher.log_info() (to stdout via print)
fp = StringIO()
stdout = sys.stdout
try:
sys.stdout = fp
with support.captured_stdout() as stdout:
d.handle_expt()
d.handle_read()
d.handle_write()
d.handle_connect()
finally:
sys.stdout = stdout
lines = fp.getvalue().splitlines()
lines = stdout.getvalue().splitlines()
expected = ['warning: unhandled incoming priority event',
'warning: unhandled read event',
'warning: unhandled write event',
@ -378,7 +360,7 @@ class DispatcherWithSendTests(unittest.TestCase):
data = b"Suppose there isn't a 16-ton weight?"
d = dispatcherwithsend_noread()
d.create_socket()
d.connect((HOST, port))
d.connect((support.HOST, port))
# give time for socket to connect
time.sleep(0.1)
@ -410,14 +392,14 @@ class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.d = b"It's not dead, it's sleeping!"
with open(TESTFN, 'wb') as file:
with open(support.TESTFN, 'wb') as file:
file.write(self.d)
def tearDown(self):
unlink(TESTFN)
support.unlink(support.TESTFN)
def test_recv(self):
fd = os.open(TESTFN, os.O_RDONLY)
fd = os.open(support.TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
os.close(fd)
@ -431,20 +413,20 @@ class FileWrapperTest(unittest.TestCase):
def test_send(self):
d1 = b"Come again?"
d2 = b"I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd)
os.close(fd)
w.write(d1)
w.send(d2)
w.close()
with open(TESTFN, 'rb') as file:
with open(support.TESTFN, 'rb') as file:
self.assertEqual(file.read(), self.d + d1 + d2)
@unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'),
'asyncore.file_dispatcher required')
def test_dispatcher(self):
fd = os.open(TESTFN, os.O_RDONLY)
fd = os.open(support.TESTFN, os.O_RDONLY)
data = []
class FileDispatcher(asyncore.file_dispatcher):
def handle_read(self):
@ -815,12 +797,12 @@ class BaseTestAPI:
class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET
addr = (HOST, 0)
addr = (support.HOST, 0)
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required')
class TestAPI_UseIPv6Sockets(BaseTestAPI):
family = socket.AF_INET6
addr = (HOSTv6, 0)
addr = (support.HOSTv6, 0)
@unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required')
class TestAPI_UseUnixSockets(BaseTestAPI):
@ -829,7 +811,7 @@ class TestAPI_UseUnixSockets(BaseTestAPI):
addr = support.TESTFN
def tearDown(self):
unlink(self.addr)
support.unlink(self.addr)
BaseTestAPI.tearDown(self)
class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):