From 252d40ef1ee627148ae12ab9cbf812cfeeb27655 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 27 Jun 2014 22:44:40 +0200 Subject: [PATCH] Closes #21582: Cleanup test_asyncore. Patch written by diana. - Use support.captured_stderr() where appropriate - Removes some "from test.support import xxx" import and uses support.xxx instead. --- Lib/test/test_asyncore.py | 54 +++++++++++++-------------------------- 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index 084d2472952..7defe653877 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -5,14 +5,12 @@ import os import socket import sys import time -import warnings import errno import struct +import warnings from test import support -from test.support import TESTFN, run_unittest, unlink, HOST, HOSTv6 from io import BytesIO -from io import StringIO try: import threading @@ -94,7 +92,7 @@ def bind_af_aware(sock, addr): """Helper function to bind a socket according to its family.""" if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. - unlink(addr) + support.unlink(addr) sock.bind(addr) @@ -257,40 +255,29 @@ class DispatcherTests(unittest.TestCase): d = asyncore.dispatcher() # capture output of dispatcher.log() (to stderr) - fp = StringIO() - stderr = sys.stderr l1 = "Lovely spam! Wonderful spam!" l2 = "I don't like spam!" - try: - sys.stderr = fp + with support.captured_stderr() as stderr: d.log(l1) d.log(l2) - finally: - sys.stderr = stderr - lines = fp.getvalue().splitlines() + lines = stderr.getvalue().splitlines() self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) def test_log_info(self): d = asyncore.dispatcher() # capture output of dispatcher.log_info() (to stdout via print) - fp = StringIO() - stdout = sys.stdout l1 = "Have you got anything without spam?" l2 = "Why can't she have egg bacon spam and sausage?" l3 = "THAT'S got spam in it!" - try: - sys.stdout = fp + with support.captured_stdout() as stdout: d.log_info(l1, 'EGGS') d.log_info(l2) d.log_info(l3, 'SPAM') - finally: - sys.stdout = stdout - lines = fp.getvalue().splitlines() + lines = stdout.getvalue().splitlines() expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] - self.assertEqual(lines, expected) def test_unhandled(self): @@ -298,18 +285,13 @@ class DispatcherTests(unittest.TestCase): d.ignore_log_types = () # capture output of dispatcher.log_info() (to stdout via print) - fp = StringIO() - stdout = sys.stdout - try: - sys.stdout = fp + with support.captured_stdout() as stdout: d.handle_expt() d.handle_read() d.handle_write() d.handle_connect() - finally: - sys.stdout = stdout - lines = fp.getvalue().splitlines() + lines = stdout.getvalue().splitlines() expected = ['warning: unhandled incoming priority event', 'warning: unhandled read event', 'warning: unhandled write event', @@ -378,7 +360,7 @@ class DispatcherWithSendTests(unittest.TestCase): data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() d.create_socket() - d.connect((HOST, port)) + d.connect((support.HOST, port)) # give time for socket to connect time.sleep(0.1) @@ -410,14 +392,14 @@ class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): class FileWrapperTest(unittest.TestCase): def setUp(self): self.d = b"It's not dead, it's sleeping!" - with open(TESTFN, 'wb') as file: + with open(support.TESTFN, 'wb') as file: file.write(self.d) def tearDown(self): - unlink(TESTFN) + support.unlink(support.TESTFN) def test_recv(self): - fd = os.open(TESTFN, os.O_RDONLY) + fd = os.open(support.TESTFN, os.O_RDONLY) w = asyncore.file_wrapper(fd) os.close(fd) @@ -431,20 +413,20 @@ class FileWrapperTest(unittest.TestCase): def test_send(self): d1 = b"Come again?" d2 = b"I want to buy some cheese." - fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) w = asyncore.file_wrapper(fd) os.close(fd) w.write(d1) w.send(d2) w.close() - with open(TESTFN, 'rb') as file: + with open(support.TESTFN, 'rb') as file: self.assertEqual(file.read(), self.d + d1 + d2) @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 'asyncore.file_dispatcher required') def test_dispatcher(self): - fd = os.open(TESTFN, os.O_RDONLY) + fd = os.open(support.TESTFN, os.O_RDONLY) data = [] class FileDispatcher(asyncore.file_dispatcher): def handle_read(self): @@ -815,12 +797,12 @@ class BaseTestAPI: class TestAPI_UseIPv4Sockets(BaseTestAPI): family = socket.AF_INET - addr = (HOST, 0) + addr = (support.HOST, 0) @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') class TestAPI_UseIPv6Sockets(BaseTestAPI): family = socket.AF_INET6 - addr = (HOSTv6, 0) + addr = (support.HOSTv6, 0) @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') class TestAPI_UseUnixSockets(BaseTestAPI): @@ -829,7 +811,7 @@ class TestAPI_UseUnixSockets(BaseTestAPI): addr = support.TESTFN def tearDown(self): - unlink(self.addr) + support.unlink(self.addr) BaseTestAPI.tearDown(self) class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):