diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py index dd094489da0..b66eea2282d 100644 --- a/Lib/test/test_socketserver.py +++ b/Lib/test/test_socketserver.py @@ -1,20 +1,32 @@ -# Test suite for SocketServer.py +""" +Test suite for SocketServer.py. +""" -from test import test_support -from test.test_support import (verbose, verify, TESTFN, TestSkipped, - reap_children) -test_support.requires('network') - -from SocketServer import * +import os import socket import errno +import imp import select import time import threading -import os +from functools import wraps +import unittest +import SocketServer + +import test.test_support +from test.test_support import reap_children, verbose, TestSkipped +from test.test_support import TESTFN as TEST_FILE + +test.test_support.requires("network") NREQ = 3 DELAY = 0.5 +TEST_STR = "hello world\n" +HOST = "localhost" + +HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") +HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" + class MyMixinHandler: def handle(self): @@ -23,22 +35,6 @@ class MyMixinHandler: time.sleep(DELAY) self.wfile.write(line) -class MyStreamHandler(MyMixinHandler, StreamRequestHandler): - pass - -class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler): - pass - -class MyMixinServer: - def serve_a_few(self): - for i in range(NREQ): - self.handle_request() - def handle_error(self, request, client_address): - self.close_request(request) - self.server_close() - raise - -teststring = "hello world\n" def receive(sock, n, timeout=20): r, w, x = select.select([sock], [], [], timeout) @@ -47,26 +43,33 @@ def receive(sock, n, timeout=20): else: raise RuntimeError, "timed out on %r" % (sock,) -def testdgram(proto, addr): - s = socket.socket(proto, socket.SOCK_DGRAM) - s.sendto(teststring, addr) - buf = data = receive(s, 100) - while data and '\n' not in buf: - data = receive(s, 100) - buf += data - verify(buf == teststring) - s.close() -def teststream(proto, addr): - s = socket.socket(proto, socket.SOCK_STREAM) - s.connect(addr) - s.sendall(teststring) - buf = data = receive(s, 100) - while data and '\n' not in buf: - data = receive(s, 100) - buf += data - verify(buf == teststring) - s.close() +class MyStreamHandler(MyMixinHandler, SocketServer.StreamRequestHandler): + pass + +class MyDatagramHandler(MyMixinHandler, + SocketServer.DatagramRequestHandler): + pass + +class ForkingUnixStreamServer(SocketServer.ForkingMixIn, + SocketServer.UnixStreamServer): + pass + +class ForkingUnixDatagramServer(SocketServer.ForkingMixIn, + SocketServer.UnixDatagramServer): + pass + + +class MyMixinServer: + def serve_a_few(self): + for i in range(NREQ): + self.handle_request() + + def handle_error(self, request, client_address): + self.close_request(request) + self.server_close() + raise + class ServerThread(threading.Thread): def __init__(self, addr, svrcls, hdlrcls): @@ -75,6 +78,7 @@ class ServerThread(threading.Thread): self.__svrcls = svrcls self.__hdlrcls = hdlrcls self.ready = threading.Event() + def run(self): class svrcls(MyMixinServer, self.__svrcls): pass @@ -93,64 +97,8 @@ class ServerThread(threading.Thread): svr.serve_a_few() if verbose: print "thread: done" -seed = 0 -def pickport(): - global seed - seed += 1 - return 10000 + (os.getpid() % 1000)*10 + seed -host = "localhost" -testfiles = [] -def pickaddr(proto): - if proto == socket.AF_INET: - return (host, pickport()) - else: - fn = TESTFN + str(pickport()) - if os.name == 'os2': - # AF_UNIX socket names on OS/2 require a specific prefix - # which can't include a drive letter and must also use - # backslashes as directory separators - if fn[1] == ':': - fn = fn[2:] - if fn[0] in (os.sep, os.altsep): - fn = fn[1:] - fn = os.path.join('\socket', fn) - if os.sep == '/': - fn = fn.replace(os.sep, os.altsep) - else: - fn = fn.replace(os.altsep, os.sep) - testfiles.append(fn) - return fn - -def cleanup(): - for fn in testfiles: - try: - os.remove(fn) - except os.error: - pass - testfiles[:] = [] - -def testloop(proto, servers, hdlrcls, testfunc): - for svrcls in servers: - addr = pickaddr(proto) - if verbose: - print "ADDR =", addr - print "CLASS =", svrcls - t = ServerThread(addr, svrcls, hdlrcls) - if verbose: print "server created" - t.start() - if verbose: print "server running" - for i in range(NREQ): - t.ready.wait(10*DELAY) - if not t.ready.isSet(): - raise RuntimeError("Server not ready within a reasonable time") - if verbose: print "test client", i - testfunc(proto, addr) - if verbose: print "waiting for server" - t.join() - if verbose: print "done" - -class ForgivingTCPServer(TCPServer): +class ForgivingTCPServer(SocketServer.TCPServer): # prevent errors if another process is using the port we want def server_bind(self): host, default_port = self.server_address @@ -160,64 +108,147 @@ class ForgivingTCPServer(TCPServer): for port in [default_port, 3434, 8798, 23833]: try: self.server_address = host, port - TCPServer.server_bind(self) + SocketServer.TCPServer.server_bind(self) break except socket.error, (err, msg): if err != errno.EADDRINUSE: raise - print >>sys.__stderr__, \ - ' WARNING: failed to listen on port %d, trying another' % port + print >> sys.__stderr__, \ + "WARNING: failed to listen on port %d, trying another: " % port -tcpservers = [ForgivingTCPServer, ThreadingTCPServer] -if hasattr(os, 'fork') and os.name not in ('os2',): - tcpservers.append(ForkingTCPServer) -udpservers = [UDPServer, ThreadingUDPServer] -if hasattr(os, 'fork') and os.name not in ('os2',): - udpservers.append(ForkingUDPServer) -if not hasattr(socket, 'AF_UNIX'): - streamservers = [] - dgramservers = [] -else: - class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass - streamservers = [UnixStreamServer, ThreadingUnixStreamServer] - if hasattr(os, 'fork') and os.name not in ('os2',): - streamservers.append(ForkingUnixStreamServer) - class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass - dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] - if hasattr(os, 'fork') and os.name not in ('os2',): - dgramservers.append(ForkingUnixDatagramServer) +class SocketServerTest(unittest.TestCase): + """Test all socket servers.""" -def sloppy_cleanup(): - # See http://python.org/sf/1540386 - # We need to reap children here otherwise a child from one server - # can be left running for the next server and cause a test failure. - time.sleep(DELAY) - reap_children() + def setUp(self): + self.port_seed = 0 + self.test_files = [] + + def tearDown(self): + reap_children() + + for fn in self.test_files: + try: + os.remove(fn) + except os.error: + pass + self.test_files[:] = [] + + def pickport(self): + self.port_seed += 1 + return 10000 + (os.getpid() % 1000)*10 + self.port_seed + + def pickaddr(self, proto): + if proto == socket.AF_INET: + return (HOST, self.pickport()) + else: + fn = TEST_FILE + str(self.pickport()) + if os.name == 'os2': + # AF_UNIX socket names on OS/2 require a specific prefix + # which can't include a drive letter and must also use + # backslashes as directory separators + if fn[1] == ':': + fn = fn[2:] + if fn[0] in (os.sep, os.altsep): + fn = fn[1:] + fn = os.path.join('\socket', fn) + if os.sep == '/': + fn = fn.replace(os.sep, os.altsep) + else: + fn = fn.replace(os.altsep, os.sep) + self.test_files.append(fn) + return fn + + def run_servers(self, proto, servers, hdlrcls, testfunc): + for svrcls in servers: + addr = self.pickaddr(proto) + if verbose: + print "ADDR =", addr + print "CLASS =", svrcls + t = ServerThread(addr, svrcls, hdlrcls) + if verbose: print "server created" + t.start() + if verbose: print "server running" + for i in range(NREQ): + t.ready.wait(10*DELAY) + self.assert_(t.ready.isSet(), + "Server not ready within a reasonable time") + if verbose: print "test client", i + testfunc(proto, addr) + if verbose: print "waiting for server" + t.join() + if verbose: print "done" + + def stream_examine(self, proto, addr): + s = socket.socket(proto, socket.SOCK_STREAM) + s.connect(addr) + s.sendall(TEST_STR) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + self.assertEquals(buf, TEST_STR) + s.close() + + def dgram_examine(self, proto, addr): + s = socket.socket(proto, socket.SOCK_DGRAM) + s.sendto(TEST_STR, addr) + buf = data = receive(s, 100) + while data and '\n' not in buf: + data = receive(s, 100) + buf += data + self.assertEquals(buf, TEST_STR) + s.close() + + def test_TCPServers(self): + # Test SocketServer.TCPServer + servers = [ForgivingTCPServer, SocketServer.ThreadingTCPServer] + if HAVE_FORKING: + servers.append(SocketServer.ForkingTCPServer) + self.run_servers(socket.AF_INET, servers, + MyStreamHandler, self.stream_examine) + + def test_UDPServers(self): + # Test SocketServer.UPDServer + servers = [SocketServer.UDPServer, + SocketServer.ThreadingUDPServer] + if HAVE_FORKING: + servers.append(SocketServer.ForkingUDPServer) + self.run_servers(socket.AF_INET, servers, MyDatagramHandler, + self.dgram_examine) + + def test_stream_servers(self): + # Test SocketServer's stream servers + if not HAVE_UNIX_SOCKETS: + return + servers = [SocketServer.UnixStreamServer, + SocketServer.ThreadingUnixStreamServer] + if HAVE_FORKING: + servers.append(ForkingUnixStreamServer) + self.run_servers(socket.AF_UNIX, servers, MyStreamHandler, + self.stream_examine) + + # Alas, on Linux (at least) recvfrom() doesn't return a meaningful + # client address so this cannot work: + + # def test_dgram_servers(self): + # # Test SocketServer.UnixDatagramServer + # if not HAVE_UNIX_SOCKETS: + # return + # servers = [SocketServer.UnixDatagramServer, + # SocketServer.ThreadingUnixDatagramServer] + # if HAVE_FORKING: + # servers.append(ForkingUnixDatagramServer) + # self.run_servers(socket.AF_UNIX, servers, MyDatagramHandler, + # self.dgram_examine) -def testall(): - testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) - sloppy_cleanup() - testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) - if hasattr(socket, 'AF_UNIX'): - sloppy_cleanup() - testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) - # Alas, on Linux (at least) recvfrom() doesn't return a meaningful - # client address so this cannot work: - ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) def test_main(): - import imp if imp.lock_held(): - # If the import lock is held, the threads will hang. + # If the import lock is held, the threads will hang raise TestSkipped("can't run when import lock is held") - reap_children() - try: - testall() - finally: - cleanup() - reap_children() + test.test_support.run_unittest(SocketServerTest) if __name__ == "__main__": test_main()