Rewrite test_socketserver as unittest, written for GHOP by Benjamin Petersen.

This commit is contained in:
Georg Brandl 2008-02-02 11:05:00 +00:00
parent bbc4fc2933
commit 61fdd71ad4
1 changed files with 176 additions and 145 deletions

View File

@ -1,20 +1,32 @@
# Test suite for SocketServer.py
"""
Test suite for SocketServer.py.
"""
from test import test_support
from test.test_support import (verbose, verify, TESTFN, TestSkipped,
reap_children)
test_support.requires('network')
from SocketServer import *
import os
import socket
import errno
import imp
import select
import time
import threading
import os
from functools import wraps
import unittest
import SocketServer
import test.test_support
from test.test_support import reap_children, verbose, TestSkipped
from test.test_support import TESTFN as TEST_FILE
test.test_support.requires("network")
NREQ = 3
DELAY = 0.5
TEST_STR = "hello world\n"
HOST = "localhost"
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
class MyMixinHandler:
def handle(self):
@ -23,22 +35,6 @@ class MyMixinHandler:
time.sleep(DELAY)
self.wfile.write(line)
class MyStreamHandler(MyMixinHandler, StreamRequestHandler):
pass
class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler):
pass
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
teststring = "hello world\n"
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
@ -47,26 +43,33 @@ def receive(sock, n, timeout=20):
else:
raise RuntimeError, "timed out on %r" % (sock,)
def testdgram(proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(teststring, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
def teststream(proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(teststring)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
s.close()
class MyStreamHandler(MyMixinHandler, SocketServer.StreamRequestHandler):
pass
class MyDatagramHandler(MyMixinHandler,
SocketServer.DatagramRequestHandler):
pass
class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
SocketServer.UnixStreamServer):
pass
class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
SocketServer.UnixDatagramServer):
pass
class MyMixinServer:
def serve_a_few(self):
for i in range(NREQ):
self.handle_request()
def handle_error(self, request, client_address):
self.close_request(request)
self.server_close()
raise
class ServerThread(threading.Thread):
def __init__(self, addr, svrcls, hdlrcls):
@ -75,6 +78,7 @@ class ServerThread(threading.Thread):
self.__svrcls = svrcls
self.__hdlrcls = hdlrcls
self.ready = threading.Event()
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
@ -93,64 +97,8 @@ class ServerThread(threading.Thread):
svr.serve_a_few()
if verbose: print "thread: done"
seed = 0
def pickport():
global seed
seed += 1
return 10000 + (os.getpid() % 1000)*10 + seed
host = "localhost"
testfiles = []
def pickaddr(proto):
if proto == socket.AF_INET:
return (host, pickport())
else:
fn = TESTFN + str(pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
fn = os.path.join('\socket', fn)
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
testfiles.append(fn)
return fn
def cleanup():
for fn in testfiles:
try:
os.remove(fn)
except os.error:
pass
testfiles[:] = []
def testloop(proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = pickaddr(proto)
if verbose:
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print "server created"
t.start()
if verbose: print "server running"
for i in range(NREQ):
t.ready.wait(10*DELAY)
if not t.ready.isSet():
raise RuntimeError("Server not ready within a reasonable time")
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print "waiting for server"
t.join()
if verbose: print "done"
class ForgivingTCPServer(TCPServer):
class ForgivingTCPServer(SocketServer.TCPServer):
# prevent errors if another process is using the port we want
def server_bind(self):
host, default_port = self.server_address
@ -160,64 +108,147 @@ class ForgivingTCPServer(TCPServer):
for port in [default_port, 3434, 8798, 23833]:
try:
self.server_address = host, port
TCPServer.server_bind(self)
SocketServer.TCPServer.server_bind(self)
break
except socket.error, (err, msg):
if err != errno.EADDRINUSE:
raise
print >>sys.__stderr__, \
' WARNING: failed to listen on port %d, trying another' % port
print >> sys.__stderr__, \
"WARNING: failed to listen on port %d, trying another: " % port
tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
tcpservers.append(ForkingTCPServer)
udpservers = [UDPServer, ThreadingUDPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
udpservers.append(ForkingUDPServer)
if not hasattr(socket, 'AF_UNIX'):
streamservers = []
dgramservers = []
else:
class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass
streamservers = [UnixStreamServer, ThreadingUnixStreamServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
streamservers.append(ForkingUnixStreamServer)
class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass
dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
dgramservers.append(ForkingUnixDatagramServer)
class SocketServerTest(unittest.TestCase):
"""Test all socket servers."""
def sloppy_cleanup():
# See http://python.org/sf/1540386
# We need to reap children here otherwise a child from one server
# can be left running for the next server and cause a test failure.
time.sleep(DELAY)
reap_children()
def setUp(self):
self.port_seed = 0
self.test_files = []
def tearDown(self):
reap_children()
for fn in self.test_files:
try:
os.remove(fn)
except os.error:
pass
self.test_files[:] = []
def pickport(self):
self.port_seed += 1
return 10000 + (os.getpid() % 1000)*10 + self.port_seed
def pickaddr(self, proto):
if proto == socket.AF_INET:
return (HOST, self.pickport())
else:
fn = TEST_FILE + str(self.pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
# backslashes as directory separators
if fn[1] == ':':
fn = fn[2:]
if fn[0] in (os.sep, os.altsep):
fn = fn[1:]
fn = os.path.join('\socket', fn)
if os.sep == '/':
fn = fn.replace(os.sep, os.altsep)
else:
fn = fn.replace(os.altsep, os.sep)
self.test_files.append(fn)
return fn
def run_servers(self, proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = self.pickaddr(proto)
if verbose:
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print "server created"
t.start()
if verbose: print "server running"
for i in range(NREQ):
t.ready.wait(10*DELAY)
self.assert_(t.ready.isSet(),
"Server not ready within a reasonable time")
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print "waiting for server"
t.join()
if verbose: print "done"
def stream_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_STREAM)
s.connect(addr)
s.sendall(TEST_STR)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
self.assertEquals(buf, TEST_STR)
s.close()
def dgram_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(TEST_STR, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
self.assertEquals(buf, TEST_STR)
s.close()
def test_TCPServers(self):
# Test SocketServer.TCPServer
servers = [ForgivingTCPServer, SocketServer.ThreadingTCPServer]
if HAVE_FORKING:
servers.append(SocketServer.ForkingTCPServer)
self.run_servers(socket.AF_INET, servers,
MyStreamHandler, self.stream_examine)
def test_UDPServers(self):
# Test SocketServer.UPDServer
servers = [SocketServer.UDPServer,
SocketServer.ThreadingUDPServer]
if HAVE_FORKING:
servers.append(SocketServer.ForkingUDPServer)
self.run_servers(socket.AF_INET, servers, MyDatagramHandler,
self.dgram_examine)
def test_stream_servers(self):
# Test SocketServer's stream servers
if not HAVE_UNIX_SOCKETS:
return
servers = [SocketServer.UnixStreamServer,
SocketServer.ThreadingUnixStreamServer]
if HAVE_FORKING:
servers.append(ForkingUnixStreamServer)
self.run_servers(socket.AF_UNIX, servers, MyStreamHandler,
self.stream_examine)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
# def test_dgram_servers(self):
# # Test SocketServer.UnixDatagramServer
# if not HAVE_UNIX_SOCKETS:
# return
# servers = [SocketServer.UnixDatagramServer,
# SocketServer.ThreadingUnixDatagramServer]
# if HAVE_FORKING:
# servers.append(ForkingUnixDatagramServer)
# self.run_servers(socket.AF_UNIX, servers, MyDatagramHandler,
# self.dgram_examine)
def testall():
testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream)
sloppy_cleanup()
testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram)
if hasattr(socket, 'AF_UNIX'):
sloppy_cleanup()
testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
def test_main():
import imp
if imp.lock_held():
# If the import lock is held, the threads will hang.
# If the import lock is held, the threads will hang
raise TestSkipped("can't run when import lock is held")
reap_children()
try:
testall()
finally:
cleanup()
reap_children()
test.test_support.run_unittest(SocketServerTest)
if __name__ == "__main__":
test_main()