Merged revisions 81043 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r81043 | giampaolo.rodola | 2010-05-10 17:33:22 +0200 (lun, 10 mag 2010) | 1 line

  Issue #8490: adds a more solid test suite for asyncore
........
This commit is contained in:
Giampaolo Rodolà 2010-05-10 15:40:49 +00:00
parent bd576b75b7
commit 8fc1178cf8
1 changed files with 275 additions and 1 deletions

View File

@ -419,11 +419,285 @@ if hasattr(asyncore, 'file_wrapper'):
self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
class BaseTestHandler(asyncore.dispatcher):
def __init__(self, sock=None):
asyncore.dispatcher.__init__(self, sock)
self.flag = False
def handle_accept(self):
raise Exception("handle_accept not supposed to be called")
def handle_connect(self):
raise Exception("handle_connect not supposed to be called")
def handle_expt(self):
raise Exception("handle_expt not supposed to be called")
def handle_close(self):
raise Exception("handle_close not supposed to be called")
def handle_error(self):
raise
class TCPServer(asyncore.dispatcher):
"""A server which listens on an address and dispatches the
connection to a handler.
"""
def __init__(self, handler=BaseTestHandler, host=HOST, port=0):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.handler = handler
@property
def address(self):
return self.socket.getsockname()[:2]
def handle_accept(self):
sock, addr = self.accept()
self.handler(sock)
def handle_error(self):
raise
class BaseClient(BaseTestHandler):
def __init__(self, address):
BaseTestHandler.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(address)
def handle_connect(self):
pass
class BaseTestAPI(unittest.TestCase):
def tearDown(self):
asyncore.close_all()
def loop_waiting_for_flag(self, instance, timeout=5):
timeout = float(timeout) / 100
count = 100
while asyncore.socket_map and count > 0:
asyncore.loop(timeout=0.01, count=1, use_poll=self.use_poll)
if instance.flag:
return
count -= 1
time.sleep(timeout)
self.fail("flag not set")
def test_handle_connect(self):
# make sure handle_connect is called on connect()
class TestClient(BaseClient):
def handle_connect(self):
self.flag = True
server = TCPServer()
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_accept(self):
# make sure handle_accept() is called when a client connects
class TestListener(BaseTestHandler):
def __init__(self):
BaseTestHandler.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind((HOST, 0))
self.listen(5)
self.address = self.socket.getsockname()[:2]
def handle_accept(self):
self.flag = True
server = TestListener()
client = BaseClient(server.address)
self.loop_waiting_for_flag(server)
def test_handle_read(self):
# make sure handle_read is called on data received
class TestClient(BaseClient):
def handle_read(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.send(b'x' * 1024)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_write(self):
# make sure handle_write is called
class TestClient(BaseClient):
def handle_write(self):
self.flag = True
server = TCPServer()
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_close(self):
# make sure handle_close is called when the other end closes
# the connection
class TestClient(BaseClient):
def handle_read(self):
# in order to make handle_close be called we are supposed
# to make at least one recv() call
self.recv(1024)
def handle_close(self):
self.flag = True
self.close()
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.close()
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
@unittest.skipIf(sys.platform.startswith("sunos"),
"OOB support is broken on Solaris")
def test_handle_expt(self):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class TestClient(BaseClient):
def handle_expt(self):
self.flag = True
class TestHandler(BaseTestHandler):
def __init__(self, conn):
BaseTestHandler.__init__(self, conn)
self.socket.send(bytes(chr(244), 'latin-1'), socket.MSG_OOB)
server = TCPServer(TestHandler)
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_handle_error(self):
class TestClient(BaseClient):
def handle_write(self):
1.0 / 0
def handle_error(self):
self.flag = True
try:
raise
except ZeroDivisionError:
pass
else:
raise Exception("exception not raised")
server = TCPServer()
client = TestClient(server.address)
self.loop_waiting_for_flag(client)
def test_connection_attributes(self):
server = TCPServer()
client = BaseClient(server.address)
# we start disconnected
self.assertFalse(server.connected)
self.assertTrue(server.accepting)
# XXX - Solaris seems to connect() immediately even without
# starting the poller. This is something which should be
# fixed as handle_connect() gets called immediately even if
# no connection actually took place (see issue #8490).
if not sys.platform.startswith("sunos"):
self.assertFalse(client.connected)
self.assertFalse(client.accepting)
# execute some loops so that client connects to server
asyncore.loop(timeout=0.01, use_poll=self.use_poll, count=100)
self.assertFalse(server.connected)
self.assertTrue(server.accepting)
self.assertTrue(client.connected)
self.assertFalse(client.accepting)
# disconnect the client
client.close()
self.assertFalse(server.connected)
self.assertTrue(server.accepting)
self.assertFalse(client.connected)
self.assertFalse(client.accepting)
# stop serving
server.close()
self.assertFalse(server.connected)
self.assertFalse(server.accepting)
def test_create_socket(self):
s = asyncore.dispatcher()
s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.assertEqual(s.socket.family, socket.AF_INET)
self.assertEqual(s.socket.type, socket.SOCK_STREAM)
def test_bind(self):
s1 = asyncore.dispatcher()
s1.create_socket(socket.AF_INET, socket.SOCK_STREAM)
s1.bind((HOST, 0))
s1.listen(5)
port = s1.socket.getsockname()[1]
s2 = asyncore.dispatcher()
s2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# EADDRINUSE indicates the socket was correctly bound
self.assertRaises(socket.error, s2.bind, (HOST, port))
def test_set_reuse_addr(self):
sock = socket.socket()
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error:
unittest.skip("SO_REUSEADDR not supported on this platform")
else:
# if SO_REUSEADDR succeeded for sock we expect asyncore
# to do the same
s = asyncore.dispatcher(socket.socket())
self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
s.create_socket(socket.AF_INET, socket.SOCK_STREAM)
s.set_reuse_addr()
self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
finally:
sock.close()
class TestAPI_UseSelect(BaseTestAPI):
use_poll = False
class TestAPI_UsePoll(BaseTestAPI):
use_poll = True
def test_main():
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
DispatcherWithSendTests_UsePoll]
DispatcherWithSendTests_UsePoll, TestAPI_UseSelect]
if hasattr(asyncore, 'file_wrapper'):
tests.append(FileWrapperTest)
if hasattr(select, 'poll'):
tests.append(TestAPI_UsePoll)
run_unittest(*tests)