- Issue #2550: The approach used by client/server code for obtaining ports

to listen on in network-oriented tests has been refined in an effort to
  facilitate running multiple instances of the entire regression test suite
  in parallel without issue.  test_support.bind_port() has been fixed such
  that it will always return a unique port -- which wasn't always the case
  with the previous implementation, especially if socket options had been
  set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT).  The
  new implementation of bind_port() will actually raise an exception if it
  is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
  SO_REUSEPORT socket option set.  Furthermore, if available, bind_port()
  will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
  This currently only applies to Windows.  This option prevents any other
  sockets from binding to the host/port we've bound to, thus removing the
  possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
  that occurs when a second SOCK_STREAM socket binds and accepts to a
  host/port that's already been bound by another socket.  The optional
  preferred port parameter to bind_port() has been removed.  Under no
  circumstances should tests be hard coding ports!

  test_support.find_unused_port() has also been introduced, which will pass
  a temporary socket object to bind_port() in order to obtain an unused port.
  The temporary socket object is then closed and deleted, and the port is
  returned.  This method should only be used for obtaining an unused port
  in order to pass to an external program (i.e. the -accept [port] argument
  to openssl's s_server mode) or as a parameter to a server-oriented class
  that doesn't give you direct access to the underlying socket used.

  Finally, test_support.HOST has been introduced, which should be used for
  the host argument of any relevant socket calls (i.e. bind and connect).

  The following tests were updated to following the new conventions:
    test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
    test_poplib, test_ftplib, test_telnetlib, test_socketserver,
    test_asynchat and test_socket_ssl.

  It is now possible for multiple instances of the regression test suite to
  run in parallel without issue.
This commit is contained in:
Trent Nelson 2008-04-08 23:47:30 +00:00
parent 02f33b43dc
commit e41b0061dd
13 changed files with 713 additions and 648 deletions

View File

@ -6,8 +6,7 @@ import unittest
import sys import sys
from test import test_support from test import test_support
HOST = "127.0.0.1" HOST = test_support.HOST
PORT = 54322
SERVER_QUIT = 'QUIT\n' SERVER_QUIT = 'QUIT\n'
class echo_server(threading.Thread): class echo_server(threading.Thread):
@ -18,15 +17,13 @@ class echo_server(threading.Thread):
def __init__(self, event): def __init__(self, event):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.event = event self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = test_support.bind_port(self.sock)
def run(self): def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.listen(1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(sock, HOST, PORT)
sock.listen(1)
self.event.set() self.event.set()
conn, client = sock.accept() conn, client = self.sock.accept()
self.buffer = "" self.buffer = ""
# collect data until quit message is seen # collect data until quit message is seen
while SERVER_QUIT not in self.buffer: while SERVER_QUIT not in self.buffer:
@ -50,15 +47,15 @@ class echo_server(threading.Thread):
pass pass
conn.close() conn.close()
sock.close() self.sock.close()
class echo_client(asynchat.async_chat): class echo_client(asynchat.async_chat):
def __init__(self, terminator): def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self) asynchat.async_chat.__init__(self)
self.contents = [] self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, PORT)) self.connect((HOST, server_port))
self.set_terminator(terminator) self.set_terminator(terminator)
self.buffer = '' self.buffer = ''
@ -106,7 +103,7 @@ class TestAsynchat(unittest.TestCase):
event.wait() event.wait()
event.clear() event.clear()
time.sleep(0.01) # Give server time to start accepting. time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term) c = echo_client(term, s.port)
c.push("hello ") c.push("hello ")
c.push("world%s" % term) c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term) c.push("I'm not dead yet!%s" % term)
@ -138,7 +135,7 @@ class TestAsynchat(unittest.TestCase):
def numeric_terminator_check(self, termlen): def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes # Try reading a fixed number of bytes
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client(termlen) c = echo_client(termlen, s.port)
data = "hello world, I'm not dead yet!\n" data = "hello world, I'm not dead yet!\n"
c.push(data) c.push(data)
c.push(SERVER_QUIT) c.push(SERVER_QUIT)
@ -159,7 +156,7 @@ class TestAsynchat(unittest.TestCase):
def test_none_terminator(self): def test_none_terminator(self):
# Try reading a fixed number of bytes # Try reading a fixed number of bytes
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client(None) c = echo_client(None, s.port)
data = "hello world, I'm not dead yet!\n" data = "hello world, I'm not dead yet!\n"
c.push(data) c.push(data)
c.push(SERVER_QUIT) c.push(SERVER_QUIT)
@ -171,7 +168,7 @@ class TestAsynchat(unittest.TestCase):
def test_simple_producer(self): def test_simple_producer(self):
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client('\n') c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n" data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8) p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p) c.push_with_producer(p)
@ -182,7 +179,7 @@ class TestAsynchat(unittest.TestCase):
def test_string_producer(self): def test_string_producer(self):
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client('\n') c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n" data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT) c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@ -193,7 +190,7 @@ class TestAsynchat(unittest.TestCase):
def test_empty_line(self): def test_empty_line(self):
# checks that empty lines are handled correctly # checks that empty lines are handled correctly
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client('\n') c = echo_client('\n', s.port)
c.push("hello world\n\nI'm not dead yet!\n") c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT) c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@ -203,7 +200,7 @@ class TestAsynchat(unittest.TestCase):
def test_close_when_done(self): def test_close_when_done(self):
s, event = start_echo_server() s, event = start_echo_server()
c = echo_client('\n') c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n") c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT) c.push(SERVER_QUIT)
c.close_when_done() c.close_when_done()

View File

@ -1,421 +1,412 @@
import asyncore import asyncore
import unittest import unittest
import select import select
import os import os
import socket import socket
import threading import threading
import sys import sys
import time import time
from test import test_support from test import test_support
from test.test_support import TESTFN, run_unittest, unlink from test.test_support import TESTFN, run_unittest, unlink
from StringIO import StringIO from StringIO import StringIO
HOST = "127.0.0.1" HOST = test_support.HOST
PORT = None
class dummysocket:
class dummysocket: def __init__(self):
def __init__(self): self.closed = False
self.closed = False
def close(self):
def close(self): self.closed = True
self.closed = True
def fileno(self):
def fileno(self): return 42
return 42
class dummychannel:
class dummychannel: def __init__(self):
def __init__(self): self.socket = dummysocket()
self.socket = dummysocket()
class exitingdummy:
class exitingdummy: def __init__(self):
def __init__(self): pass
pass
def handle_read_event(self):
def handle_read_event(self): raise asyncore.ExitNow()
raise asyncore.ExitNow()
handle_write_event = handle_read_event
handle_write_event = handle_read_event handle_expt_event = handle_read_event
handle_expt_event = handle_read_event
class crashingdummy:
class crashingdummy: def __init__(self):
def __init__(self): self.error_handled = False
self.error_handled = False
def handle_read_event(self):
def handle_read_event(self): raise Exception()
raise Exception()
handle_write_event = handle_read_event
handle_write_event = handle_read_event handle_expt_event = handle_read_event
handle_expt_event = handle_read_event
def handle_error(self):
def handle_error(self): self.error_handled = True
self.error_handled = True
# used when testing senders; just collects what it gets until newline is sent
# used when testing senders; just collects what it gets until newline is sent def capture_server(evt, buf, serv):
def capture_server(evt, buf): try:
try: serv.listen(5)
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) conn, addr = serv.accept()
serv.settimeout(3) except socket.timeout:
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) pass
serv.bind(("", 0)) else:
global PORT n = 200
PORT = serv.getsockname()[1] while n > 0:
serv.listen(5) r, w, e = select.select([conn], [], [])
conn, addr = serv.accept() if r:
except socket.timeout: data = conn.recv(10)
pass # keep everything except for the newline terminator
else: buf.write(data.replace('\n', ''))
n = 200 if '\n' in data:
while n > 0: break
r, w, e = select.select([conn], [], []) n -= 1
if r: time.sleep(0.01)
data = conn.recv(10)
# keep everything except for the newline terminator conn.close()
buf.write(data.replace('\n', '')) finally:
if '\n' in data: serv.close()
break evt.set()
n -= 1
time.sleep(0.01)
class HelperFunctionTests(unittest.TestCase):
conn.close() def test_readwriteexc(self):
finally: # Check exception handling behavior of read, write and _exception
serv.close()
PORT = None # check that ExitNow exceptions in the object handler method
evt.set() # bubbles all the way up through asyncore read/write/_exception calls
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
class HelperFunctionTests(unittest.TestCase): self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
def test_readwriteexc(self): self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
# Check exception handling behavior of read, write and _exception
# check that an exception other than ExitNow in the object handler
# check that ExitNow exceptions in the object handler method # method causes the handle_error method to get called
# bubbles all the way up through asyncore read/write/_exception calls tr2 = crashingdummy()
tr1 = exitingdummy() asyncore.read(tr2)
self.assertRaises(asyncore.ExitNow, asyncore.read, tr1) self.assertEqual(tr2.error_handled, True)
self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1) tr2 = crashingdummy()
asyncore.write(tr2)
# check that an exception other than ExitNow in the object handler self.assertEqual(tr2.error_handled, True)
# method causes the handle_error method to get called
tr2 = crashingdummy() tr2 = crashingdummy()
asyncore.read(tr2) asyncore._exception(tr2)
self.assertEqual(tr2.error_handled, True) self.assertEqual(tr2.error_handled, True)
tr2 = crashingdummy() # asyncore.readwrite uses constants in the select module that
asyncore.write(tr2) # are not present in Windows systems (see this thread:
self.assertEqual(tr2.error_handled, True) # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
# These constants should be present as long as poll is available
tr2 = crashingdummy()
asyncore._exception(tr2) if hasattr(select, 'poll'):
self.assertEqual(tr2.error_handled, True) def test_readwrite(self):
# Check that correct methods are called by readwrite()
# asyncore.readwrite uses constants in the select module that
# are not present in Windows systems (see this thread: class testobj:
# http://mail.python.org/pipermail/python-list/2001-October/109973.html) def __init__(self):
# These constants should be present as long as poll is available self.read = False
self.write = False
if hasattr(select, 'poll'): self.expt = False
def test_readwrite(self):
# Check that correct methods are called by readwrite() def handle_read_event(self):
self.read = True
class testobj:
def __init__(self): def handle_write_event(self):
self.read = False self.write = True
self.write = False
self.expt = False def handle_expt_event(self):
self.expt = True
def handle_read_event(self):
self.read = True def handle_error(self):
self.error_handled = True
def handle_write_event(self):
self.write = True for flag in (select.POLLIN, select.POLLPRI):
tobj = testobj()
def handle_expt_event(self): self.assertEqual(tobj.read, False)
self.expt = True asyncore.readwrite(tobj, flag)
self.assertEqual(tobj.read, True)
def handle_error(self):
self.error_handled = True # check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore readwrite call
for flag in (select.POLLIN, select.POLLPRI): tr1 = exitingdummy()
tobj = testobj() self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
self.assertEqual(tobj.read, False)
asyncore.readwrite(tobj, flag) # check that an exception other than ExitNow in the object handler
self.assertEqual(tobj.read, True) # method causes the handle_error method to get called
tr2 = crashingdummy()
# check that ExitNow exceptions in the object handler method asyncore.readwrite(tr2, flag)
# bubbles all the way up through asyncore readwrite call self.assertEqual(tr2.error_handled, True)
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) tobj = testobj()
self.assertEqual(tobj.write, False)
# check that an exception other than ExitNow in the object handler asyncore.readwrite(tobj, select.POLLOUT)
# method causes the handle_error method to get called self.assertEqual(tobj.write, True)
tr2 = crashingdummy()
asyncore.readwrite(tr2, flag) # check that ExitNow exceptions in the object handler method
self.assertEqual(tr2.error_handled, True) # bubbles all the way up through asyncore readwrite call
tr1 = exitingdummy()
tobj = testobj() self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
self.assertEqual(tobj.write, False) select.POLLOUT)
asyncore.readwrite(tobj, select.POLLOUT)
self.assertEqual(tobj.write, True) # check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
# check that ExitNow exceptions in the object handler method tr2 = crashingdummy()
# bubbles all the way up through asyncore readwrite call asyncore.readwrite(tr2, select.POLLOUT)
tr1 = exitingdummy() self.assertEqual(tr2.error_handled, True)
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
select.POLLOUT) for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
tobj = testobj()
# check that an exception other than ExitNow in the object handler self.assertEqual(tobj.expt, False)
# method causes the handle_error method to get called asyncore.readwrite(tobj, flag)
tr2 = crashingdummy() self.assertEqual(tobj.expt, True)
asyncore.readwrite(tr2, select.POLLOUT)
self.assertEqual(tr2.error_handled, True) # check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore readwrite calls
for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL): tr1 = exitingdummy()
tobj = testobj() self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
self.assertEqual(tobj.expt, False)
asyncore.readwrite(tobj, flag) # check that an exception other than ExitNow in the object handler
self.assertEqual(tobj.expt, True) # method causes the handle_error method to get called
tr2 = crashingdummy()
# check that ExitNow exceptions in the object handler method asyncore.readwrite(tr2, flag)
# bubbles all the way up through asyncore readwrite calls self.assertEqual(tr2.error_handled, True)
tr1 = exitingdummy()
self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag) def test_closeall(self):
self.closeall_check(False)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called def test_closeall_default(self):
tr2 = crashingdummy() self.closeall_check(True)
asyncore.readwrite(tr2, flag)
self.assertEqual(tr2.error_handled, True) def closeall_check(self, usedefault):
# Check that close_all() closes everything in a given map
def test_closeall(self):
self.closeall_check(False) l = []
testmap = {}
def test_closeall_default(self): for i in range(10):
self.closeall_check(True) c = dummychannel()
l.append(c)
def closeall_check(self, usedefault): self.assertEqual(c.socket.closed, False)
# Check that close_all() closes everything in a given map testmap[i] = c
l = [] if usedefault:
testmap = {} socketmap = asyncore.socket_map
for i in range(10): try:
c = dummychannel() asyncore.socket_map = testmap
l.append(c) asyncore.close_all()
self.assertEqual(c.socket.closed, False) finally:
testmap[i] = c testmap, asyncore.socket_map = asyncore.socket_map, socketmap
else:
if usedefault: asyncore.close_all(testmap)
socketmap = asyncore.socket_map
try: self.assertEqual(len(testmap), 0)
asyncore.socket_map = testmap
asyncore.close_all() for c in l:
finally: self.assertEqual(c.socket.closed, True)
testmap, asyncore.socket_map = asyncore.socket_map, socketmap
else: def test_compact_traceback(self):
asyncore.close_all(testmap) try:
raise Exception("I don't like spam!")
self.assertEqual(len(testmap), 0) except:
real_t, real_v, real_tb = sys.exc_info()
for c in l: r = asyncore.compact_traceback()
self.assertEqual(c.socket.closed, True) else:
self.fail("Expected exception")
def test_compact_traceback(self):
try: (f, function, line), t, v, info = r
raise Exception("I don't like spam!") self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
except: self.assertEqual(function, 'test_compact_traceback')
real_t, real_v, real_tb = sys.exc_info() self.assertEqual(t, real_t)
r = asyncore.compact_traceback() self.assertEqual(v, real_v)
else: self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
self.fail("Expected exception")
(f, function, line), t, v, info = r class DispatcherTests(unittest.TestCase):
self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py') def setUp(self):
self.assertEqual(function, 'test_compact_traceback') pass
self.assertEqual(t, real_t)
self.assertEqual(v, real_v) def tearDown(self):
self.assertEqual(info, '[%s|%s|%s]' % (f, function, line)) asyncore.close_all()
def test_basic(self):
class DispatcherTests(unittest.TestCase): d = asyncore.dispatcher()
def setUp(self): self.assertEqual(d.readable(), True)
pass self.assertEqual(d.writable(), True)
def tearDown(self): def test_repr(self):
asyncore.close_all() d = asyncore.dispatcher()
self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
def test_basic(self):
d = asyncore.dispatcher() def test_log(self):
self.assertEqual(d.readable(), True) d = asyncore.dispatcher()
self.assertEqual(d.writable(), True)
# capture output of dispatcher.log() (to stderr)
def test_repr(self): fp = StringIO()
d = asyncore.dispatcher() stderr = sys.stderr
self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d)) l1 = "Lovely spam! Wonderful spam!"
l2 = "I don't like spam!"
def test_log(self): try:
d = asyncore.dispatcher() sys.stderr = fp
d.log(l1)
# capture output of dispatcher.log() (to stderr) d.log(l2)
fp = StringIO() finally:
stderr = sys.stderr sys.stderr = stderr
l1 = "Lovely spam! Wonderful spam!"
l2 = "I don't like spam!" lines = fp.getvalue().splitlines()
try: self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
sys.stderr = fp
d.log(l1) def test_log_info(self):
d.log(l2) d = asyncore.dispatcher()
finally:
sys.stderr = stderr # capture output of dispatcher.log_info() (to stdout via print)
fp = StringIO()
lines = fp.getvalue().splitlines() stdout = sys.stdout
self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2]) l1 = "Have you got anything without spam?"
l2 = "Why can't she have egg bacon spam and sausage?"
def test_log_info(self): l3 = "THAT'S got spam in it!"
d = asyncore.dispatcher() try:
sys.stdout = fp
# capture output of dispatcher.log_info() (to stdout via print) d.log_info(l1, 'EGGS')
fp = StringIO() d.log_info(l2)
stdout = sys.stdout d.log_info(l3, 'SPAM')
l1 = "Have you got anything without spam?" finally:
l2 = "Why can't she have egg bacon spam and sausage?" sys.stdout = stdout
l3 = "THAT'S got spam in it!"
try: lines = fp.getvalue().splitlines()
sys.stdout = fp if __debug__:
d.log_info(l1, 'EGGS') expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
d.log_info(l2) else:
d.log_info(l3, 'SPAM') expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
finally:
sys.stdout = stdout self.assertEquals(lines, expected)
lines = fp.getvalue().splitlines() def test_unhandled(self):
if __debug__: d = asyncore.dispatcher()
expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
else: # capture output of dispatcher.log_info() (to stdout via print)
expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3] fp = StringIO()
stdout = sys.stdout
self.assertEquals(lines, expected) try:
sys.stdout = fp
def test_unhandled(self): d.handle_expt()
d = asyncore.dispatcher() d.handle_read()
d.handle_write()
# capture output of dispatcher.log_info() (to stdout via print) d.handle_connect()
fp = StringIO() d.handle_accept()
stdout = sys.stdout finally:
try: sys.stdout = stdout
sys.stdout = fp
d.handle_expt() lines = fp.getvalue().splitlines()
d.handle_read() expected = ['warning: unhandled exception',
d.handle_write() 'warning: unhandled read event',
d.handle_connect() 'warning: unhandled write event',
d.handle_accept() 'warning: unhandled connect event',
finally: 'warning: unhandled accept event']
sys.stdout = stdout self.assertEquals(lines, expected)
lines = fp.getvalue().splitlines()
expected = ['warning: unhandled exception',
'warning: unhandled read event', class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
'warning: unhandled write event', def readable(self):
'warning: unhandled connect event', return False
'warning: unhandled accept event']
self.assertEquals(lines, expected) def handle_connect(self):
pass
class DispatcherWithSendTests(unittest.TestCase):
class dispatcherwithsend_noread(asyncore.dispatcher_with_send): usepoll = False
def readable(self):
return False def setUp(self):
pass
def handle_connect(self):
pass def tearDown(self):
asyncore.close_all()
class DispatcherWithSendTests(unittest.TestCase):
usepoll = False def test_send(self):
self.evt = threading.Event()
def setUp(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
pass self.sock.settimeout(3)
self.port = test_support.bind_port(self.sock)
def tearDown(self):
asyncore.close_all() cap = StringIO()
args = (self.evt, cap, self.sock)
def test_send(self): threading.Thread(target=capture_server, args=args).start()
self.evt = threading.Event()
cap = StringIO() # wait a little longer for the server to initialize (it sometimes
threading.Thread(target=capture_server, args=(self.evt,cap)).start() # refuses connections on slow machines without this wait)
time.sleep(0.2)
# wait until server thread has assigned a port number
n = 1000 data = "Suppose there isn't a 16-ton weight?"
while PORT is None and n > 0: d = dispatcherwithsend_noread()
time.sleep(0.01) d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
n -= 1 d.connect((HOST, self.port))
# wait a little longer for the server to initialize (it sometimes # give time for socket to connect
# refuses connections on slow machines without this wait) time.sleep(0.1)
time.sleep(0.2)
d.send(data)
data = "Suppose there isn't a 16-ton weight?" d.send(data)
d = dispatcherwithsend_noread() d.send('\n')
d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
d.connect((HOST, PORT)) n = 1000
while d.out_buffer and n > 0:
# give time for socket to connect asyncore.poll()
time.sleep(0.1) n -= 1
d.send(data) self.evt.wait()
d.send(data)
d.send('\n') self.assertEqual(cap.getvalue(), data*2)
n = 1000
while d.out_buffer and n > 0: class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
asyncore.poll() usepoll = True
n -= 1
if hasattr(asyncore, 'file_wrapper'):
self.evt.wait() class FileWrapperTest(unittest.TestCase):
def setUp(self):
self.assertEqual(cap.getvalue(), data*2) self.d = "It's not dead, it's sleeping!"
file(TESTFN, 'w').write(self.d)
class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests): def tearDown(self):
usepoll = True unlink(TESTFN)
if hasattr(asyncore, 'file_wrapper'): def test_recv(self):
class FileWrapperTest(unittest.TestCase): fd = os.open(TESTFN, os.O_RDONLY)
def setUp(self): w = asyncore.file_wrapper(fd)
self.d = "It's not dead, it's sleeping!"
file(TESTFN, 'w').write(self.d) self.assertEqual(w.fd, fd)
self.assertEqual(w.fileno(), fd)
def tearDown(self): self.assertEqual(w.recv(13), "It's not dead")
unlink(TESTFN) self.assertEqual(w.read(6), ", it's")
w.close()
def test_recv(self): self.assertRaises(OSError, w.read, 1)
fd = os.open(TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd) def test_send(self):
d1 = "Come again?"
self.assertEqual(w.fd, fd) d2 = "I want to buy some cheese."
self.assertEqual(w.fileno(), fd) fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
self.assertEqual(w.recv(13), "It's not dead") w = asyncore.file_wrapper(fd)
self.assertEqual(w.read(6), ", it's")
w.close() w.write(d1)
self.assertRaises(OSError, w.read, 1) w.send(d2)
w.close()
def test_send(self): self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
d1 = "Come again?"
d2 = "I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) def test_main():
w = asyncore.file_wrapper(fd) tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
DispatcherWithSendTests_UsePoll]
w.write(d1) if hasattr(asyncore, 'file_wrapper'):
w.send(d2) tests.append(FileWrapperTest)
w.close()
self.assertEqual(file(TESTFN).read(), self.d + d1 + d2) run_unittest(*tests)
if __name__ == "__main__":
def test_main(): test_main()
tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
DispatcherWithSendTests_UsePoll]
if hasattr(asyncore, 'file_wrapper'):
tests.append(FileWrapperTest)
run_unittest(*tests)
if __name__ == "__main__":
test_main()

View File

@ -6,18 +6,13 @@ import time
from unittest import TestCase from unittest import TestCase
from test import test_support from test import test_support
server_port = None HOST = test_support.HOST
# This function sets the evt 3 times: # This function sets the evt 3 times:
# 1) when the connection is ready to be accepted. # 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection # 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket # 3) when we have closed the socket
def server(evt): def server(evt, serv):
global server_port
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_port = test_support.bind_port(serv, "", 9091)
serv.listen(5) serv.listen(5)
# (1) Signal the caller that we are ready to accept the connection. # (1) Signal the caller that we are ready to accept the connection.
evt.set() evt.set()
@ -39,14 +34,16 @@ class GeneralTests(TestCase):
def setUp(self): def setUp(self):
self.evt = threading.Event() self.evt = threading.Event()
threading.Thread(target=server, args=(self.evt,)).start() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = test_support.bind_port(self.sock)
threading.Thread(target=server, args=(self.evt,self.sock)).start()
# Wait for the server to be ready. # Wait for the server to be ready.
self.evt.wait() self.evt.wait()
self.evt.clear() self.evt.clear()
ftplib.FTP.port = server_port ftplib.FTP.port = self.port
def tearDown(self): def tearDown(self):
# Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait() self.evt.wait()
def testBasic(self): def testBasic(self):
@ -54,34 +51,34 @@ class GeneralTests(TestCase):
ftplib.FTP() ftplib.FTP()
# connects # connects
ftp = ftplib.FTP("localhost") ftp = ftplib.FTP(HOST)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
def testTimeoutDefault(self): def testTimeoutDefault(self):
# default # default
ftp = ftplib.FTP("localhost") ftp = ftplib.FTP(HOST)
self.assertTrue(ftp.sock.gettimeout() is None) self.assertTrue(ftp.sock.gettimeout() is None)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
def testTimeoutValue(self): def testTimeoutValue(self):
# a value # a value
ftp = ftplib.FTP("localhost", timeout=30) ftp = ftplib.FTP(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30) self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
def testTimeoutConnect(self): def testTimeoutConnect(self):
ftp = ftplib.FTP() ftp = ftplib.FTP()
ftp.connect("localhost", timeout=30) ftp.connect(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30) self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
def testTimeoutDifferentOrder(self): def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30) ftp = ftplib.FTP(timeout=30)
ftp.connect("localhost") ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30) self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
@ -89,7 +86,7 @@ class GeneralTests(TestCase):
def testTimeoutDirectAccess(self): def testTimeoutDirectAccess(self):
ftp = ftplib.FTP() ftp = ftplib.FTP()
ftp.timeout = 30 ftp.timeout = 30
ftp.connect("localhost") ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30) self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait() self.evt.wait()
ftp.sock.close() ftp.sock.close()
@ -99,7 +96,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
ftp = ftplib.FTP("localhost", timeout=None) ftp = ftplib.FTP(HOST, timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30) self.assertEqual(ftp.sock.gettimeout(), 30)

View File

@ -6,6 +6,8 @@ from unittest import TestCase
from test import test_support from test import test_support
HOST = test_support.HOST
class FakeSocket: class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO): def __init__(self, text, fileclass=StringIO.StringIO):
self.text = text self.text = text
@ -196,16 +198,12 @@ class OfflineTest(TestCase):
def test_responses(self): def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found") self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
PORT = 50003
HOST = "localhost"
class TimeoutTest(TestCase): class TimeoutTest(TestCase):
PORT = None
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.PORT = test_support.bind_port(self.serv)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
self.serv.listen(5) self.serv.listen(5)
def tearDown(self): def tearDown(self):
@ -217,13 +215,13 @@ class TimeoutTest(TestCase):
HTTPConnection and into the socket. HTTPConnection and into the socket.
''' '''
# default # default
httpConn = httplib.HTTPConnection(HOST, PORT) httpConn = httplib.HTTPConnection(HOST, self.PORT)
httpConn.connect() httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None) self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close() httpConn.close()
# a value # a value
httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30) httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=30)
httpConn.connect() httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30) self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close() httpConn.close()
@ -232,7 +230,7 @@ class TimeoutTest(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None) httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=None)
httpConn.connect() httpConn.connect()
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
@ -246,11 +244,12 @@ class HTTPSTimeoutTest(TestCase):
def test_attributes(self): def test_attributes(self):
# simple test to check it's storing it # simple test to check it's storing it
if hasattr(httplib, 'HTTPSConnection'): if hasattr(httplib, 'HTTPSConnection'):
h = httplib.HTTPSConnection(HOST, PORT, timeout=30) h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30) self.assertEqual(h.timeout, 30)
def test_main(verbose=None): def test_main(verbose=None):
test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest) test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
HTTPSTimeoutTest)
if __name__ == '__main__': if __name__ == '__main__':
test_main() test_main()

View File

@ -6,12 +6,9 @@ import time
from unittest import TestCase from unittest import TestCase
from test import test_support from test import test_support
HOST = test_support.HOST
def server(evt): def server(evt, serv):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv.bind(("", 9091))
serv.listen(5) serv.listen(5)
try: try:
conn, addr = serv.accept() conn, addr = serv.accept()
@ -28,7 +25,10 @@ class GeneralTests(TestCase):
def setUp(self): def setUp(self):
self.evt = threading.Event() self.evt = threading.Event()
threading.Thread(target=server, args=(self.evt,)).start() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = test_support.bind_port(self.sock)
threading.Thread(target=server, args=(self.evt,self.sock)).start()
time.sleep(.1) time.sleep(.1)
def tearDown(self): def tearDown(self):
@ -36,18 +36,18 @@ class GeneralTests(TestCase):
def testBasic(self): def testBasic(self):
# connects # connects
pop = poplib.POP3("localhost", 9091) pop = poplib.POP3(HOST, self.port)
pop.sock.close() pop.sock.close()
def testTimeoutDefault(self): def testTimeoutDefault(self):
# default # default
pop = poplib.POP3("localhost", 9091) pop = poplib.POP3(HOST, self.port)
self.assertTrue(pop.sock.gettimeout() is None) self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close() pop.sock.close()
def testTimeoutValue(self): def testTimeoutValue(self):
# a value # a value
pop = poplib.POP3("localhost", 9091, timeout=30) pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30) self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close() pop.sock.close()
@ -56,7 +56,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
pop = poplib.POP3("localhost", 9091, timeout=None) pop = poplib.POP3(HOST, self.port, timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(pop.sock.gettimeout(), 30) self.assertEqual(pop.sock.gettimeout(), 30)

View File

@ -12,18 +12,9 @@ import select
from unittest import TestCase from unittest import TestCase
from test import test_support from test import test_support
# PORT is used to communicate the port number assigned to the server HOST = test_support.HOST
# to the test client
HOST = "localhost"
PORT = None
def server(evt, buf): def server(evt, buf, serv):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(15)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv.bind(("", 0))
global PORT
PORT = serv.getsockname()[1]
serv.listen(5) serv.listen(5)
evt.set() evt.set()
try: try:
@ -43,14 +34,16 @@ def server(evt, buf):
conn.close() conn.close()
finally: finally:
serv.close() serv.close()
PORT = None
evt.set() evt.set()
class GeneralTests(TestCase): class GeneralTests(TestCase):
def setUp(self): def setUp(self):
self.evt = threading.Event() self.evt = threading.Event()
servargs = (self.evt, "220 Hola mundo\n") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(15)
self.port = test_support.bind_port(self.sock)
servargs = (self.evt, "220 Hola mundo\n", self.sock)
threading.Thread(target=server, args=servargs).start() threading.Thread(target=server, args=servargs).start()
self.evt.wait() self.evt.wait()
self.evt.clear() self.evt.clear()
@ -60,29 +53,29 @@ class GeneralTests(TestCase):
def testBasic1(self): def testBasic1(self):
# connects # connects
smtp = smtplib.SMTP(HOST, PORT) smtp = smtplib.SMTP(HOST, self.port)
smtp.sock.close() smtp.sock.close()
def testBasic2(self): def testBasic2(self):
# connects, include port in host name # connects, include port in host name
smtp = smtplib.SMTP("%s:%s" % (HOST, PORT)) smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
smtp.sock.close() smtp.sock.close()
def testLocalHostName(self): def testLocalHostName(self):
# check that supplied local_hostname is used # check that supplied local_hostname is used
smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost") smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost") self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close() smtp.sock.close()
def testTimeoutDefault(self): def testTimeoutDefault(self):
# default # default
smtp = smtplib.SMTP(HOST, PORT) smtp = smtplib.SMTP(HOST, self.port)
self.assertTrue(smtp.sock.gettimeout() is None) self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close() smtp.sock.close()
def testTimeoutValue(self): def testTimeoutValue(self):
# a value # a value
smtp = smtplib.SMTP(HOST, PORT, timeout=30) smtp = smtplib.SMTP(HOST, self.port, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30) self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close() smtp.sock.close()
@ -91,7 +84,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
smtp = smtplib.SMTP(HOST, PORT, timeout=None) smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30) self.assertEqual(smtp.sock.gettimeout(), 30)
@ -99,10 +92,7 @@ class GeneralTests(TestCase):
# Test server thread using the specified SMTP server class # Test server thread using the specified SMTP server class
def debugging_server(server_class, serv_evt, client_evt): def debugging_server(serv, serv_evt, client_evt):
serv = server_class(("", 0), ('nowhere', -1))
global PORT
PORT = serv.getsockname()[1]
serv_evt.set() serv_evt.set()
try: try:
@ -131,7 +121,6 @@ def debugging_server(server_class, serv_evt, client_evt):
time.sleep(0.5) time.sleep(0.5)
serv.close() serv.close()
asyncore.close_all() asyncore.close_all()
PORT = None
serv_evt.set() serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n' MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
@ -153,7 +142,9 @@ class DebuggingServerTests(TestCase):
self.serv_evt = threading.Event() self.serv_evt = threading.Event()
self.client_evt = threading.Event() self.client_evt = threading.Event()
serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt) self.port = test_support.find_unused_port()
self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start() threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number # wait until server thread has assigned a port number
@ -170,31 +161,31 @@ class DebuggingServerTests(TestCase):
def testBasic(self): def testBasic(self):
# connect # connect
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.quit() smtp.quit()
def testNOOP(self): def testNOOP(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok') expected = (250, 'Ok')
self.assertEqual(smtp.noop(), expected) self.assertEqual(smtp.noop(), expected)
smtp.quit() smtp.quit()
def testRSET(self): def testRSET(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok') expected = (250, 'Ok')
self.assertEqual(smtp.rset(), expected) self.assertEqual(smtp.rset(), expected)
smtp.quit() smtp.quit()
def testNotImplemented(self): def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer # EHLO isn't implemented in DebuggingServer
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "EHLO" not implemented') expected = (502, 'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected) self.assertEqual(smtp.ehlo(), expected)
smtp.quit() smtp.quit()
def testVRFY(self): def testVRFY(self):
# VRFY isn't implemented in DebuggingServer # VRFY isn't implemented in DebuggingServer
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "VRFY" not implemented') expected = (502, 'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected) self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected) self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
@ -203,21 +194,21 @@ class DebuggingServerTests(TestCase):
def testSecondHELO(self): def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate # check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel) # (this behavior is specific to smtpd.SMTPChannel)
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.helo() smtp.helo()
expected = (503, 'Duplicate HELO/EHLO') expected = (503, 'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected) self.assertEqual(smtp.helo(), expected)
smtp.quit() smtp.quit()
def testHELP(self): def testHELP(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented') self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.quit() smtp.quit()
def testSend(self): def testSend(self):
# connect and send mail # connect and send mail
m = 'A test message' m = 'A test message'
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.sendmail('John', 'Sally', m) smtp.sendmail('John', 'Sally', m)
smtp.quit() smtp.quit()
@ -257,7 +248,10 @@ class BadHELOServerTests(TestCase):
sys.stdout = self.output sys.stdout = self.output
self.evt = threading.Event() self.evt = threading.Event()
servargs = (self.evt, "199 no hello for you!\n") self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(15)
self.port = test_support.bind_port(self.sock)
servargs = (self.evt, "199 no hello for you!\n", self.sock)
threading.Thread(target=server, args=servargs).start() threading.Thread(target=server, args=servargs).start()
self.evt.wait() self.evt.wait()
self.evt.clear() self.evt.clear()
@ -268,7 +262,7 @@ class BadHELOServerTests(TestCase):
def testFailingHELO(self): def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
HOST, PORT, 'localhost', 3) HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A', sim_users = {'Mr.A@somewhere.com':'John A',
@ -333,7 +327,9 @@ class SMTPSimTests(TestCase):
def setUp(self): def setUp(self):
self.serv_evt = threading.Event() self.serv_evt = threading.Event()
self.client_evt = threading.Event() self.client_evt = threading.Event()
serv_args = (SimSMTPServer, self.serv_evt, self.client_evt) self.port = test_support.find_unused_port()
self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start() threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number # wait until server thread has assigned a port number
@ -348,11 +344,11 @@ class SMTPSimTests(TestCase):
def testBasic(self): def testBasic(self):
# smoke test # smoke test
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit() smtp.quit()
def testEHLO(self): def testEHLO(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
# no features should be present before the EHLO # no features should be present before the EHLO
self.assertEqual(smtp.esmtp_features, {}) self.assertEqual(smtp.esmtp_features, {})
@ -373,7 +369,7 @@ class SMTPSimTests(TestCase):
smtp.quit() smtp.quit()
def testVRFY(self): def testVRFY(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for email, name in sim_users.items(): for email, name in sim_users.items():
expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email))) expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
@ -385,7 +381,7 @@ class SMTPSimTests(TestCase):
smtp.quit() smtp.quit()
def testEXPN(self): def testEXPN(self):
smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15) smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for listname, members in sim_lists.items(): for listname, members in sim_lists.items():
users = [] users = []

View File

@ -3,6 +3,7 @@
import unittest import unittest
from test import test_support from test import test_support
import errno
import socket import socket
import select import select
import thread, threading import thread, threading
@ -15,17 +16,14 @@ import array
from weakref import proxy from weakref import proxy
import signal import signal
PORT = 50007 HOST = test_support.HOST
HOST = 'localhost'
MSG = 'Michael Gilfix was here\n' MSG = 'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase): class SocketTCPTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.port = test_support.bind_port(self.serv)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
self.serv.listen(1) self.serv.listen(1)
def tearDown(self): def tearDown(self):
@ -36,9 +34,7 @@ class SocketUDPTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.port = test_support.bind_port(self.serv)
global PORT
PORT = test_support.bind_port(self.serv, HOST, PORT)
def tearDown(self): def tearDown(self):
self.serv.close() self.serv.close()
@ -185,7 +181,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
def clientSetUp(self): def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self) ThreadedTCPSocketTest.clientSetUp(self)
self.cli.connect((HOST, PORT)) self.cli.connect((HOST, self.port))
self.serv_conn = self.cli self.serv_conn = self.cli
def clientTearDown(self): def clientTearDown(self):
@ -461,16 +457,23 @@ class GeneralModuleTests(unittest.TestCase):
# XXX The following don't test module-level functionality... # XXX The following don't test module-level functionality...
def testSockName(self): def testSockName(self):
# Testing getsockname() # Testing getsockname(). Use a temporary socket to elicit an unused
# ephemeral port that we can use later in the test.
tempsock = socket.socket()
tempsock.bind(("0.0.0.0", 0))
(host, port) = tempsock.getsockname()
tempsock.close()
del tempsock
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("0.0.0.0", PORT+1)) sock.bind(("0.0.0.0", port))
name = sock.getsockname() name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0. # it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass. # At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname()) my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], PORT+1) self.assertEqual(name[1], port)
def testGetSockOpt(self): def testGetSockOpt(self):
# Testing getsockopt() # Testing getsockopt()
@ -597,7 +600,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG) self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self): def _testSendtoAndRecv(self):
self.cli.sendto(MSG, 0, (HOST, PORT)) self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self): def testRecvFrom(self):
# Testing recvfrom() over UDP # Testing recvfrom() over UDP
@ -605,14 +608,14 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG) self.assertEqual(msg, MSG)
def _testRecvFrom(self): def _testRecvFrom(self):
self.cli.sendto(MSG, 0, (HOST, PORT)) self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self): def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError. # Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1) self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self): def _testRecvFromNegative(self):
self.cli.sendto(MSG, 0, (HOST, PORT)) self.cli.sendto(MSG, 0, (HOST, self.port))
class TCPCloserTest(ThreadedTCPSocketTest): class TCPCloserTest(ThreadedTCPSocketTest):
@ -626,7 +629,7 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.assertEqual(sd.recv(1), '') self.assertEqual(sd.recv(1), '')
def _testClose(self): def _testClose(self):
self.cli.connect((HOST, PORT)) self.cli.connect((HOST, self.port))
time.sleep(1.0) time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest): class BasicSocketPairTest(SocketPairTest):
@ -684,7 +687,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testAccept(self): def _testAccept(self):
time.sleep(0.1) time.sleep(0.1)
self.cli.connect((HOST, PORT)) self.cli.connect((HOST, self.port))
def testConnect(self): def testConnect(self):
# Testing non-blocking connect # Testing non-blocking connect
@ -692,7 +695,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testConnect(self): def _testConnect(self):
self.cli.settimeout(10) self.cli.settimeout(10)
self.cli.connect((HOST, PORT)) self.cli.connect((HOST, self.port))
def testRecv(self): def testRecv(self):
# Testing non-blocking recv # Testing non-blocking recv
@ -712,7 +715,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.fail("Error during select call to non-blocking socket.") self.fail("Error during select call to non-blocking socket.")
def _testRecv(self): def _testRecv(self):
self.cli.connect((HOST, PORT)) self.cli.connect((HOST, self.port))
time.sleep(0.1) time.sleep(0.1)
self.cli.send(MSG) self.cli.send(MSG)
@ -830,7 +833,9 @@ class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
class NetworkConnectionTest(object): class NetworkConnectionTest(object):
"""Prove network connection.""" """Prove network connection."""
def clientSetUp(self): def clientSetUp(self):
self.cli = socket.create_connection((HOST, PORT)) # We're inherited below by BasicTCPTest2, which also inherits
# BasicTCPTest, which defines self.port referenced below.
self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
@ -839,7 +844,11 @@ class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
class NetworkConnectionNoServer(unittest.TestCase): class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self): def testWithoutServer(self):
self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT))) port = test_support.find_unused_port()
self.failUnlessRaises(
socket.error,
lambda: socket.create_connection((HOST, port))
)
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
@ -860,22 +869,22 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
testFamily = _justAccept testFamily = _justAccept
def _testFamily(self): def _testFamily(self):
self.cli = socket.create_connection((HOST, PORT), timeout=30) self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2) self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept testTimeoutDefault = _justAccept
def _testTimeoutDefault(self): def _testTimeoutDefault(self):
self.cli = socket.create_connection((HOST, PORT)) self.cli = socket.create_connection((HOST, self.port))
self.assertTrue(self.cli.gettimeout() is None) self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self): def _testTimeoutValueNamed(self):
self.cli = socket.create_connection((HOST, PORT), timeout=30) self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30) self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self): def _testTimeoutValueNonamed(self):
self.cli = socket.create_connection((HOST, PORT), 30) self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30) self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept testTimeoutNone = _justAccept
@ -883,7 +892,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
self.cli = socket.create_connection((HOST, PORT), timeout=None) self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30) self.assertEqual(self.cli.gettimeout(), 30)
@ -910,12 +919,12 @@ class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
testOutsideTimeout = testInsideTimeout testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self): def _testInsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, PORT)) self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5) data = sock.recv(5)
self.assertEqual(data, "done!") self.assertEqual(data, "done!")
def _testOutsideTimeout(self): def _testOutsideTimeout(self):
self.cli = sock = socket.create_connection((HOST, PORT), timeout=1) self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5)) self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))

View File

@ -20,6 +20,7 @@ warnings.filterwarnings(
# Optionally test SSL support, if we have it in the tested platform # Optionally test SSL support, if we have it in the tested platform
skip_expected = not hasattr(socket, "ssl") skip_expected = not hasattr(socket, "ssl")
HOST = test_support.HOST
class ConnectedTests(unittest.TestCase): class ConnectedTests(unittest.TestCase):
@ -86,19 +87,16 @@ class ConnectedTests(unittest.TestCase):
class BasicTests(unittest.TestCase): class BasicTests(unittest.TestCase):
def testRudeShutdown(self): def testRudeShutdown(self):
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event() listener_ready = threading.Event()
listener_gone = threading.Event() listener_gone = threading.Event()
sock = socket.socket()
port = test_support.bind_port(sock)
# `listener` runs in a thread. It opens a socket listening on # `listener` runs in a thread. It opens a socket and sits in accept()
# PORT, and sits in an accept() until the main thread connects. # until the main thread connects. Then it rudely closes the socket,
# Then it rudely closes the socket, and sets Event `listener_gone` # and sets Event `listener_gone` to let the main thread know the socket
# to let the main thread know the socket is gone. # is gone.
def listener(): def listener(s):
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5) s.listen(5)
listener_ready.set() listener_ready.set()
s.accept() s.accept()
@ -108,7 +106,7 @@ class BasicTests(unittest.TestCase):
def connector(): def connector():
listener_ready.wait() listener_ready.wait()
s = socket.socket() s = socket.socket()
s.connect(('localhost', PORT[0])) s.connect((HOST, port))
listener_gone.wait() listener_gone.wait()
try: try:
ssl_sock = socket.ssl(s) ssl_sock = socket.ssl(s)
@ -118,7 +116,7 @@ class BasicTests(unittest.TestCase):
raise test_support.TestFailed( raise test_support.TestFailed(
'connecting to closed SSL socket should have failed') 'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener) t = threading.Thread(target=listener, args=(sock,))
t.start() t.start()
connector() connector()
t.join() t.join()
@ -169,7 +167,7 @@ class OpenSSLTests(unittest.TestCase):
def testBasic(self): def testBasic(self):
s = socket.socket() s = socket.socket()
s.connect(("localhost", 4433)) s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s) ss = socket.ssl(s)
ss.write("Foo\n") ss.write("Foo\n")
i = ss.read(4) i = ss.read(4)
@ -183,7 +181,7 @@ class OpenSSLTests(unittest.TestCase):
info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi" info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
s = socket.socket() s = socket.socket()
s.connect(("localhost", 4433)) s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s) ss = socket.ssl(s)
cert = ss.server() cert = ss.server()
self.assertEqual(cert, info) self.assertEqual(cert, info)
@ -193,6 +191,7 @@ class OpenSSLTests(unittest.TestCase):
class OpenSSLServer(threading.Thread): class OpenSSLServer(threading.Thread):
PORT = None
def __init__(self): def __init__(self):
self.s = None self.s = None
self.keepServing = True self.keepServing = True
@ -211,7 +210,11 @@ class OpenSSLServer(threading.Thread):
raise ValueError("No key file found! (tried %r)" % key_file) raise ValueError("No key file found! (tried %r)" % key_file)
try: try:
cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file) # XXX TODO: on Windows, this should make more effort to use the
# openssl.exe that would have been built by the pcbuild.sln.
self.PORT = test_support.find_unused_port()
args = (self.PORT, cert_file, key_file)
cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
@ -222,7 +225,7 @@ class OpenSSLServer(threading.Thread):
# let's try if it is actually up # let's try if it is actually up
try: try:
s = socket.socket() s = socket.socket()
s.connect(("localhost", 4433)) s.connect((HOST, self.PORT))
s.close() s.close()
if self.s.stdout.readline() != "ERROR\n": if self.s.stdout.readline() != "ERROR\n":
raise ValueError raise ValueError

View File

@ -22,7 +22,7 @@ from test.test_support import TESTFN as TEST_FILE
test.test_support.requires("network") test.test_support.requires("network")
TEST_STR = "hello world\n" TEST_STR = "hello world\n"
HOST = "localhost" HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"

View File

@ -23,11 +23,10 @@ try:
except ImportError: except ImportError:
skip_expected = True skip_expected = True
HOST = test_support.HOST
CERTFILE = None CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None SVN_PYTHON_ORG_ROOT_CERT = None
TESTPORT = 10025
def handle_error(prefix): def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose: if test_support.verbose:
@ -269,7 +268,7 @@ else:
except: except:
handle_error('') handle_error('')
def __init__(self, port, certificate, ssl_version=None, def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False, certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False): chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None: if ssl_version is None:
@ -285,12 +284,8 @@ else:
self.connectionchatty = connectionchatty self.connectionchatty = connectionchatty
self.starttls_server = starttls_server self.starttls_server = starttls_server
self.sock = socket.socket() self.sock = socket.socket()
self.port = test_support.bind_port(self.sock)
self.flag = None self.flag = None
if hasattr(socket, 'SO_REUSEADDR'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('127.0.0.1', port))
self.active = False self.active = False
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.setDaemon(False) self.setDaemon(False)
@ -434,12 +429,13 @@ else:
format%args)) format%args))
def __init__(self, port, certfile): def __init__(self, certfile):
self.flag = None self.flag = None
self.active = False self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
self.port = test_support.find_unused_port()
self.server = self.HTTPSServer( self.server = self.HTTPSServer(
('', port), self.RootedHTTPRequestHandler, certfile) (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.setDaemon(True) self.setDaemon(True)
@ -465,7 +461,7 @@ else:
def badCertTest (certfile): def badCertTest (certfile):
server = ThreadedEchoServer(TESTPORT, CERTFILE, server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_REQUIRED, certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False) cacerts=CERTFILE, chatty=False)
flag = threading.Event() flag = threading.Event()
@ -478,7 +474,7 @@ else:
s = ssl.wrap_socket(socket.socket(), s = ssl.wrap_socket(socket.socket(),
certfile=certfile, certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1) ssl_version=ssl.PROTOCOL_TLSv1)
s.connect(('127.0.0.1', TESTPORT)) s.connect((HOST, server.port))
except ssl.SSLError, x: except ssl.SSLError, x:
if test_support.verbose: if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x[1]) sys.stdout.write("\nSSLError is %s\n" % x[1])
@ -493,7 +489,7 @@ else:
client_certfile, client_protocol=None, indata="FOO\n", client_certfile, client_protocol=None, indata="FOO\n",
chatty=True, connectionchatty=False): chatty=True, connectionchatty=False):
server = ThreadedEchoServer(TESTPORT, certfile, server = ThreadedEchoServer(certfile,
certreqs=certreqs, certreqs=certreqs,
ssl_version=protocol, ssl_version=protocol,
cacerts=cacertsfile, cacerts=cacertsfile,
@ -513,7 +509,7 @@ else:
ca_certs=cacertsfile, ca_certs=cacertsfile,
cert_reqs=certreqs, cert_reqs=certreqs,
ssl_version=client_protocol) ssl_version=client_protocol)
s.connect(('127.0.0.1', TESTPORT)) s.connect((HOST, server.port))
except ssl.SSLError, x: except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x)) raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception, x: except Exception, x:
@ -582,6 +578,7 @@ else:
listener_ready = threading.Event() listener_ready = threading.Event()
listener_gone = threading.Event() listener_gone = threading.Event()
port = test_support.find_unused_port()
# `listener` runs in a thread. It opens a socket listening on # `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects. # PORT, and sits in an accept() until the main thread connects.
@ -589,11 +586,7 @@ else:
# to let the main thread know the socket is gone. # to let the main thread know the socket is gone.
def listener(): def listener():
s = socket.socket() s = socket.socket()
if hasattr(socket, 'SO_REUSEADDR'): s.bind((HOST, port))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
s.bind(('127.0.0.1', TESTPORT))
s.listen(5) s.listen(5)
listener_ready.set() listener_ready.set()
s.accept() s.accept()
@ -603,7 +596,7 @@ else:
def connector(): def connector():
listener_ready.wait() listener_ready.wait()
s = socket.socket() s = socket.socket()
s.connect(('127.0.0.1', TESTPORT)) s.connect((HOST, port))
listener_gone.wait() listener_gone.wait()
try: try:
ssl_sock = ssl.wrap_socket(s) ssl_sock = ssl.wrap_socket(s)
@ -631,7 +624,7 @@ else:
if test_support.verbose: if test_support.verbose:
sys.stdout.write("\n") sys.stdout.write("\n")
s2 = socket.socket() s2 = socket.socket()
server = ThreadedEchoServer(TESTPORT, CERTFILE, server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE, certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23, ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE, cacerts=CERTFILE,
@ -648,7 +641,7 @@ else:
ca_certs=CERTFILE, ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED, cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23) ssl_version=ssl.PROTOCOL_SSLv23)
s.connect(('127.0.0.1', TESTPORT)) s.connect((HOST, server.port))
except ssl.SSLError, x: except ssl.SSLError, x:
raise test_support.TestFailed( raise test_support.TestFailed(
"Unexpected SSL error: " + str(x)) "Unexpected SSL error: " + str(x))
@ -748,7 +741,7 @@ else:
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4") msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
server = ThreadedEchoServer(TESTPORT, CERTFILE, server = ThreadedEchoServer(CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1, ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True, starttls_server=True,
chatty=True, chatty=True,
@ -763,7 +756,7 @@ else:
try: try:
s = socket.socket() s = socket.socket()
s.setblocking(1) s.setblocking(1)
s.connect(('127.0.0.1', TESTPORT)) s.connect((HOST, server.port))
except Exception, x: except Exception, x:
raise test_support.TestFailed("Unexpected exception: " + str(x)) raise test_support.TestFailed("Unexpected exception: " + str(x))
else: else:
@ -805,7 +798,7 @@ else:
def testAsyncore(self): def testAsyncore(self):
server = AsyncoreHTTPSServer(TESTPORT, CERTFILE) server = AsyncoreHTTPSServer(CERTFILE)
flag = threading.Event() flag = threading.Event()
server.start(flag) server.start(flag)
# wait for it to start # wait for it to start
@ -817,8 +810,8 @@ else:
d1 = open(CERTFILE, 'rb').read() d1 = open(CERTFILE, 'rb').read()
d2 = '' d2 = ''
# now fetch the same data from the HTTPS server # now fetch the same data from the HTTPS server
url = 'https://127.0.0.1:%d/%s' % ( url = 'https://%s:%d/%s' % (
TESTPORT, os.path.split(CERTFILE)[1]) HOST, server.port, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url) f = urllib.urlopen(url)
dlen = f.info().getheader("content-length") dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0): if dlen and (int(dlen) > 0):
@ -842,29 +835,11 @@ else:
server.join() server.join()
def findtestsocket(start, end):
def testbind(i):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("127.0.0.1", i))
except:
return 0
else:
return 1
finally:
s.close()
for i in range(start, end):
if testbind(i) and testbind(i+1):
return i
return 0
def test_main(verbose=False): def test_main(verbose=False):
if skip_expected: if skip_expected:
raise test_support.TestSkipped("No SSL support") raise test_support.TestSkipped("No SSL support")
global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem") "keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join( SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
@ -874,9 +849,6 @@ def test_main(verbose=False):
if (not os.path.exists(CERTFILE) or if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!") raise test_support.TestFailed("Can't read certificate files!")
TESTPORT = findtestsocket(10025, 12000)
if not TESTPORT:
raise test_support.TestFailed("Can't find open port to test servers on!")
tests = [BasicTests] tests = [BasicTests]

View File

@ -103,31 +103,97 @@ def requires(resource, msg=None):
msg = "Use of the `%s' resource not enabled" % resource msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg) raise ResourceDenied(msg)
def bind_port(sock, host='', preferred_port=54321): HOST = 'localhost'
"""Try to bind the sock to a port. If we are running multiple
tests and we don't try multiple ports, the test can fail. This
makes the test more robust."""
# Find some random ports that hopefully no one is listening on. def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
# Ideally each test would clean up after itself and not continue listening """Returns an unused port that should be suitable for binding. This is
# on any ports. However, this isn't the case. The last port (0) is achieved by creating a temporary socket with the same family and type as
# a stop-gap that asks the O/S to assign a port. Whenever the warning the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
# message below is printed, the test that is listening on the port should the specified host address (defaults to 0.0.0.0) with the port set to 0,
# be fixed to close the socket at the end of the test. eliciting an unused ephemeral port from the OS. The temporary socket is
# Another reason why we can't use a port is another process (possibly then closed and deleted, and the ephemeral port is returned.
# another instance of the test suite) is using the same port.
for port in [preferred_port, 9907, 10243, 32999, 0]: Either this method or bind_port() should be used for any tests where a
try: server socket needs to be bound to a particular port for the duration of
sock.bind((host, port)) the test. Which one to use depends on whether the calling code is creating
if port == 0: a python socket, or if an unused port needs to be provided in a constructor
port = sock.getsockname()[1] or passed to an external program (i.e. the -accept argument to openssl's
return port s_server mode). Always prefer bind_port() over find_unused_port() where
except socket.error, (err, msg): possible. Hard coded ports should *NEVER* be used. As soon as a server
if err != errno.EADDRINUSE: socket is bound to a hard coded port, the ability to run multiple instances
raise of the test simultaneously on the same host is compromised, which makes the
print >>sys.__stderr__, \ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
' WARNING: failed to listen on port %d, trying another' % port may simply manifest as a failed test, which can be recovered from without
raise TestFailed('unable to find port to listen on') intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
socket.error will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it."""
tempsock = socket.socket(family, socktype)
port = bind_port(tempsock)
tempsock.close()
del tempsock
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise TestFailed("tests should never set the SO_REUSEADDR " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise TestFailed("tests should never set the SO_REUSEPORT " \
"socket option on TCP/IP sockets!")
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
FUZZ = 1e-6 FUZZ = 1e-6

View File

@ -6,14 +6,9 @@ import time
from unittest import TestCase from unittest import TestCase
from test import test_support from test import test_support
PORT = 9091 HOST = test_support.HOST
def server(evt): def server(evt, serv):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(serv, "", PORT)
serv.listen(5) serv.listen(5)
evt.set() evt.set()
try: try:
@ -28,7 +23,10 @@ class GeneralTests(TestCase):
def setUp(self): def setUp(self):
self.evt = threading.Event() self.evt = threading.Event()
threading.Thread(target=server, args=(self.evt,)).start() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = test_support.bind_port(self.sock)
threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait() self.evt.wait()
self.evt.clear() self.evt.clear()
time.sleep(.1) time.sleep(.1)
@ -38,24 +36,24 @@ class GeneralTests(TestCase):
def testBasic(self): def testBasic(self):
# connects # connects
telnet = telnetlib.Telnet("localhost", PORT) telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close() telnet.sock.close()
def testTimeoutDefault(self): def testTimeoutDefault(self):
# default # default
telnet = telnetlib.Telnet("localhost", PORT) telnet = telnetlib.Telnet(HOST, self.port)
self.assertTrue(telnet.sock.gettimeout() is None) self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close() telnet.sock.close()
def testTimeoutValue(self): def testTimeoutValue(self):
# a value # a value
telnet = telnetlib.Telnet("localhost", PORT, timeout=30) telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close() telnet.sock.close()
def testTimeoutDifferentOrder(self): def testTimeoutDifferentOrder(self):
telnet = telnetlib.Telnet(timeout=30) telnet = telnetlib.Telnet(timeout=30)
telnet.open("localhost", PORT) telnet.open(HOST, self.port)
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close() telnet.sock.close()
@ -64,7 +62,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout() previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) socket.setdefaulttimeout(30)
try: try:
telnet = telnetlib.Telnet("localhost", PORT, timeout=None) telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally: finally:
socket.setdefaulttimeout(previous) socket.setdefaulttimeout(previous)
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)

View File

@ -41,6 +41,43 @@ Library
Tests Tests
----- -----
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
Build Build
----- -----