# Test suite for SocketServer.py from test import test_support from test.test_support import (verbose, verify, TESTFN, TestSkipped, reap_children) test_support.requires('network') from SocketServer import * import socket import errno import select import time import threading import os NREQ = 3 DELAY = 0.5 class MyMixinHandler: def handle(self): time.sleep(DELAY) line = self.rfile.readline() time.sleep(DELAY) self.wfile.write(line) class MyStreamHandler(MyMixinHandler, StreamRequestHandler): pass class MyDatagramHandler(MyMixinHandler, DatagramRequestHandler): pass class MyMixinServer: def serve_a_few(self): for i in range(NREQ): self.handle_request() def handle_error(self, request, client_address): self.close_request(request) self.server_close() raise teststring = "hello world\n" def receive(sock, n, timeout=20): r, w, x = select.select([sock], [], [], timeout) if sock in r: return sock.recv(n) else: raise RuntimeError, "timed out on %r" % (sock,) def testdgram(proto, addr): s = socket.socket(proto, socket.SOCK_DGRAM) s.sendto(teststring, addr) buf = data = receive(s, 100) while data and '\n' not in buf: data = receive(s, 100) buf += data verify(buf == teststring) s.close() def teststream(proto, addr): s = socket.socket(proto, socket.SOCK_STREAM) s.connect(addr) s.sendall(teststring) buf = data = receive(s, 100) while data and '\n' not in buf: data = receive(s, 100) buf += data verify(buf == teststring) s.close() class ServerThread(threading.Thread): def __init__(self, addr, svrcls, hdlrcls): threading.Thread.__init__(self) self.__addr = addr self.__svrcls = svrcls self.__hdlrcls = hdlrcls def run(self): class svrcls(MyMixinServer, self.__svrcls): pass if verbose: print "thread: creating server" svr = svrcls(self.__addr, self.__hdlrcls) # pull the address out of the server in case it changed # this can happen if another process is using the port addr = getattr(svr, 'server_address') if addr: self.__addr = addr assert self.__addr == svr.socket.getsockname() if verbose: print "thread: serving three times" svr.serve_a_few() if verbose: print "thread: done" seed = 0 def pickport(): global seed seed += 1 return 10000 + (os.getpid() % 1000)*10 + seed host = "localhost" testfiles = [] def pickaddr(proto): if proto == socket.AF_INET: return (host, pickport()) else: fn = TESTFN + str(pickport()) if os.name == 'os2': # AF_UNIX socket names on OS/2 require a specific prefix # which can't include a drive letter and must also use # backslashes as directory separators if fn[1] == ':': fn = fn[2:] if fn[0] in (os.sep, os.altsep): fn = fn[1:] fn = os.path.join('\socket', fn) if os.sep == '/': fn = fn.replace(os.sep, os.altsep) else: fn = fn.replace(os.altsep, os.sep) testfiles.append(fn) return fn def cleanup(): for fn in testfiles: try: os.remove(fn) except os.error: pass testfiles[:] = [] def testloop(proto, servers, hdlrcls, testfunc): for svrcls in servers: addr = pickaddr(proto) if verbose: print "ADDR =", addr print "CLASS =", svrcls t = ServerThread(addr, svrcls, hdlrcls) if verbose: print "server created" t.start() if verbose: print "server running" for i in range(NREQ): time.sleep(DELAY) if verbose: print "test client", i testfunc(proto, addr) if verbose: print "waiting for server" t.join() if verbose: print "done" class ForgivingTCPServer(TCPServer): # prevent errors if another process is using the port we want def server_bind(self): host, default_port = self.server_address # this code shamelessly stolen from test.test_support # the ports were changed to protect the innocent import sys for port in [default_port, 3434, 8798, 23833]: try: self.server_address = host, port TCPServer.server_bind(self) break except socket.error, (err, msg): if err != errno.EADDRINUSE: raise print >>sys.__stderr__, \ ' WARNING: failed to listen on port %d, trying another' % port tcpservers = [ForgivingTCPServer, ThreadingTCPServer] if hasattr(os, 'fork') and os.name not in ('os2',): tcpservers.append(ForkingTCPServer) udpservers = [UDPServer, ThreadingUDPServer] if hasattr(os, 'fork') and os.name not in ('os2',): udpservers.append(ForkingUDPServer) if not hasattr(socket, 'AF_UNIX'): streamservers = [] dgramservers = [] else: class ForkingUnixStreamServer(ForkingMixIn, UnixStreamServer): pass streamservers = [UnixStreamServer, ThreadingUnixStreamServer] if hasattr(os, 'fork') and os.name not in ('os2',): streamservers.append(ForkingUnixStreamServer) class ForkingUnixDatagramServer(ForkingMixIn, UnixDatagramServer): pass dgramservers = [UnixDatagramServer, ThreadingUnixDatagramServer] if hasattr(os, 'fork') and os.name not in ('os2',): dgramservers.append(ForkingUnixDatagramServer) def sloppy_cleanup(): # See http://python.org/sf/1540386 # We need to reap children here otherwise a child from one server # can be left running for the next server and cause a test failure. time.sleep(DELAY) reap_children() def testall(): testloop(socket.AF_INET, tcpservers, MyStreamHandler, teststream) sloppy_cleanup() testloop(socket.AF_INET, udpservers, MyDatagramHandler, testdgram) if hasattr(socket, 'AF_UNIX'): sloppy_cleanup() testloop(socket.AF_UNIX, streamservers, MyStreamHandler, teststream) # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: ##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram) def test_main(): import imp if imp.lock_held(): # If the import lock is held, the threads will hang. raise TestSkipped("can't run when import lock is held") try: testall() finally: cleanup() reap_children() if __name__ == "__main__": test_main()