- issue #6748 intermittent test failures from sockets

- telnetlib tests now use mock sockets for most tests
This commit is contained in:
Jack Diederich 2009-11-06 17:20:42 +00:00
parent 0267b73207
commit 1766b9d10e
1 changed files with 140 additions and 282 deletions

View File

@ -1,48 +1,23 @@
import socket import socket
import select
import threading import threading
import telnetlib import telnetlib
import time import time
import queue import contextlib
import sys
import io
from unittest import TestCase from unittest import TestCase
from test import support from test import support
HOST = support.HOST HOST = support.HOST
EOF_sigil = object()
def server(evt, serv, dataq=None, test_done=None): def server(evt, serv):
""" Open a tcp server in four steps
1) set evt to true to let the parent know we are ready
2) [optional] if is not False, write the list of data from dataq.get()
to the socket.
3) [optional] if test_done is not None, it's an event; wait
for parent to set test_done before closing connection
4) set evt to true to let the parent know we're done
"""
serv.listen(5) serv.listen(5)
evt.set() evt.set()
try: try:
conn, addr = serv.accept() conn, addr = serv.accept()
if dataq:
data = b''
new_data = dataq.get(True, 0.5)
dataq.task_done()
for item in new_data:
if item == EOF_sigil:
break
if type(item) in [int, float]:
time.sleep(item)
else:
data += item
written = conn.send(data)
data = data[written:]
except socket.timeout: except socket.timeout:
pass pass
finally: finally:
if test_done is not None:
test_done.wait()
serv.close() serv.close()
evt.set() evt.set()
@ -100,163 +75,159 @@ class GeneralTests(TestCase):
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close() telnet.sock.close()
def _read_setUp(self): class SocketStub(object):
self.evt = threading.Event() ''' a socket proxy that re-defines sendall() '''
self.dataq = queue.Queue() def __init__(self, reads=[]):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.reads = reads
self.sock.settimeout(3) self.writes = []
self.port = support.bind_port(self.sock) self.block = False
self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) def sendall(self, data):
self.thread.start() self.writes.append(data)
self.evt.wait() def recv(self, size):
self.evt.clear() out = b''
time.sleep(.1) while self.reads and len(out) < size:
out += self.reads.pop(0)
if len(out) > size:
self.reads.insert(0, out[size:])
out = out[:size]
return out
def _read_tearDown(self): class TelnetAlike(telnetlib.Telnet):
self.evt.wait() def fileno(self):
self.thread.join() raise NotImplementedError()
def close(self): pass
def sock_avail(self):
return (not self.sock.block)
def msg(self, msg, *args):
with support.captured_stdout() as out:
telnetlib.Telnet.msg(self, msg, *args)
self._messages += out.getvalue()
return
def new_select(*s_args):
block = False
for l in s_args:
for fob in l:
if isinstance(fob, TelnetAlike):
block = fob.sock.block
if block:
return [[], [], []]
else:
return s_args
@contextlib.contextmanager
def test_socket(reads):
def new_conn(*ignored):
return SocketStub(reads)
try:
old_conn = socket.create_connection
socket.create_connection = new_conn
yield None
finally:
socket.create_connection = old_conn
return
def test_telnet(reads=[], cls=TelnetAlike):
''' return a telnetlib.Telnet object that uses a SocketStub with
reads queued up to be read '''
for x in reads:
assert type(x) is bytes, x
with test_socket(reads):
telnet = cls('dummy', 0)
telnet._messages = '' # debuglevel output
return telnet
class ReadTests(TestCase): class ReadTests(TestCase):
setUp = _read_setUp def setUp(self):
tearDown = _read_tearDown self.old_select = select.select
select.select = new_select
def tearDown(self):
select.select = self.old_select
# use a similar approach to testing timeouts as test_timeout.py def test_read_until(self):
# these will never pass 100% but make the fuzz big enough that it is rare
block_long = 0.6
block_short = 0.3
def test_read_until_A(self):
""" """
read_until(expected, [timeout]) read_until(expected, timeout=None)
Read until the expected string has been seen, or a timeout is test the blocking version of read_util
hit (default is no timeout); may block.
""" """
want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil] want = [b'xxxmatchyyy']
self.dataq.put(want) telnet = test_telnet(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_until(b'match') data = telnet.read_until(b'match')
self.assertEqual(data, b''.join(want[:-2])) self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
def test_read_until_B(self): reads = [b'x' * 50, b'match', b'y' * 50]
# test the timeout - it does NOT raise socket.timeout expect = b''.join(reads[:-1])
want = [b'hello', self.block_long, b'not seen', EOF_sigil] telnet = test_telnet(reads)
self.dataq.put(want) data = telnet.read_until(b'match')
telnet = telnetlib.Telnet(HOST, self.port) self.assertEqual(data, expect)
self.dataq.join()
data = telnet.read_until(b'not seen', self.block_short)
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), b'not seen')
def test_read_all_A(self):
def test_read_all(self):
""" """
read_all() read_all()
Read all data until EOF; may block. Read all data until EOF; may block.
""" """
want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil] reads = [b'x' * 500, b'y' * 500, b'z' * 500]
self.dataq.put(want) expect = b''.join(reads)
telnet = telnetlib.Telnet(HOST, self.port) telnet = test_telnet(reads)
self.dataq.join()
data = telnet.read_all() data = telnet.read_all()
self.assertEqual(data, b''.join(want[:-1])) self.assertEqual(data, expect)
return return
def _test_blocking(self, func): def test_read_some(self):
self.dataq.put([self.block_long, EOF_sigil])
self.dataq.join()
start = time.time()
data = func()
self.assertTrue(self.block_short <= time.time() - start)
def test_read_all_B(self):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
def test_read_all_C(self):
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
telnet.read_all()
telnet.read_all() # shouldn't raise
def test_read_some_A(self):
""" """
read_some() read_some()
Read at least one byte or EOF; may block. Read at least one byte or EOF; may block.
""" """
# test 'at least one byte' # test 'at least one byte'
want = [b'x' * 500, EOF_sigil] telnet = test_telnet([b'x' * 500])
self.dataq.put(want) data = telnet.read_some()
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_all()
self.assertTrue(len(data) >= 1) self.assertTrue(len(data) >= 1)
def test_read_some_B(self):
# test EOF # test EOF
self.dataq.put([EOF_sigil]) telnet = test_telnet()
telnet = telnetlib.Telnet(HOST, self.port) data = telnet.read_some()
self.dataq.join() self.assertEqual(b'', data)
self.assertEqual(b'', telnet.read_some())
def test_read_some_C(self): def _read_eager(self, func_name):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
def _test_read_any_eager_A(self, func_name):
""" """
read_very_eager() read_*_eager()
Read all data available already queued or on the socket, Read all data available already queued or on the socket,
without blocking. without blocking.
""" """
want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil] want = b'x' * 100
expects = want[1] + want[2] telnet = test_telnet([want])
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
telnet.sock.block = True
self.assertEqual(b'', func())
telnet.sock.block = False
data = b'' data = b''
while True: while True:
try: try:
data += func() data += func()
self.assertTrue(expects.startswith(data))
except EOFError: except EOFError:
break break
self.assertEqual(expects, data) self.assertEqual(data, want)
def _test_read_any_eager_B(self, func_name): def test_read_eager(self):
# test EOF # read_eager and read_very_eager make the same gaurantees
self.dataq.put([EOF_sigil]) # (they behave differently but we only test the gaurantees)
telnet = telnetlib.Telnet(HOST, self.port) self._read_eager('read_eager')
self.dataq.join() self._read_eager('read_very_eager')
time.sleep(self.block_short) # NB -- we need to test the IAC block which is mentioned in the
func = getattr(telnet, func_name) # docstring but not in the module docs
self.assertRaises(EOFError, func)
# read_eager and read_very_eager make the same gaurantees def read_very_lazy(self):
# (they behave differently but we only test the gaurantees) want = b'x' * 100
def test_read_very_eager_A(self): telnet = test_telnet([want])
self._test_read_any_eager_A('read_very_eager') self.assertEqual(b'', telnet.read_very_lazy())
def test_read_very_eager_B(self): while telnet.sock.reads:
self._test_read_any_eager_B('read_very_eager') telnet.fill_rawq()
def test_read_eager_A(self): data = telnet.read_very_lazy()
self._test_read_any_eager_A('read_eager') self.assertEqual(want, data)
def test_read_eager_B(self): self.assertRaises(EOFError, telnet.read_very_lazy)
self._test_read_any_eager_B('read_eager')
# NB -- we need to test the IAC block which is mentioned in the docstring
# but not in the module docs
def _test_read_any_lazy_B(self, func_name): def test_read_lazy(self):
self.dataq.put([EOF_sigil]) want = b'x' * 100
telnet = telnetlib.Telnet(HOST, self.port) telnet = test_telnet([want])
self.dataq.join()
func = getattr(telnet, func_name)
telnet.fill_rawq()
self.assertRaises(EOFError, func)
def test_read_lazy_A(self):
want = [b'x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_lazy()) self.assertEqual(b'', telnet.read_lazy())
data = b'' data = b''
while True: while True:
@ -267,35 +238,8 @@ class ReadTests(TestCase):
telnet.fill_rawq() telnet.fill_rawq()
except EOFError: except EOFError:
break break
self.assertTrue(want[0].startswith(data)) self.assertTrue(want.startswith(data))
self.assertEqual(data, want[0]) self.assertEqual(data, want)
def test_read_lazy_B(self):
self._test_read_any_lazy_B('read_lazy')
def test_read_very_lazy_A(self):
want = [b'x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_very_lazy())
data = b''
while True:
try:
read_data = telnet.read_very_lazy()
except EOFError:
break
data += read_data
if not read_data:
telnet.fill_rawq()
self.assertEqual(b'', telnet.cookedq)
telnet.process_rawq()
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
def test_read_very_lazy_B(self):
self._test_read_any_lazy_B('read_very_lazy')
class nego_collector(object): class nego_collector(object):
def __init__(self, sb_getter=None): def __init__(self, sb_getter=None):
@ -309,91 +253,32 @@ class nego_collector(object):
sb_data = self.sb_getter() sb_data = self.sb_getter()
self.sb_seen += sb_data self.sb_seen += sb_data
class SocketProxy(object): tl = telnetlib
''' a socket proxy that re-defines sendall() '''
def __init__(self, real_sock):
self.socket = real_sock
self._raw_sent = b''
def __getattr__(self, k):
return getattr(self.socket, k)
def sendall(self, data):
self._raw_sent += data
self.socket.sendall(data)
class TelnetSockSendall(telnetlib.Telnet):
def open(self, *args, **opts):
telnetlib.Telnet.open(self, *args, **opts)
self.sock = SocketProxy(self.sock)
class WriteTests(TestCase): class WriteTests(TestCase):
'''The only thing that write does is replace each tl.IAC for '''The only thing that write does is replace each tl.IAC for
tl.IAC+tl.IAC''' tl.IAC+tl.IAC'''
def setUp(self):
self.evt = threading.Event()
self.test_done = threading.Event()
self.dataq = queue.Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = support.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(
self.evt, self.sock, self.dataq, self.test_done))
self.thread.start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
def tearDown(self):
self.test_done.set()
self.evt.wait()
self.thread.join()
def _test_write(self, data):
self.telnet.sock._raw_sent = b''
self.telnet.write(data)
after_write = self.telnet.sock._raw_sent
self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC),
after_write)
def test_write(self): def test_write(self):
self.telnet = TelnetSockSendall()
data_sample = [b'data sample without IAC', data_sample = [b'data sample without IAC',
b'data sample with' + tl.IAC + b' one IAC', b'data sample with' + tl.IAC + b' one IAC',
b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
tl.IAC, tl.IAC,
b''] b'']
self.telnet.open(HOST, self.port) for data in data_sample:
for d in data_sample: telnet = test_telnet()
self.dataq.put([b'']) telnet.write(data)
self._test_write(d) written = b''.join(telnet.sock.writes)
self.telnet.close() self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
tl = telnetlib
class TelnetDebuglevel(tl.Telnet):
''' Telnet-alike that captures messages written to stdout when
debuglevel > 0
'''
_messages = ''
def msg(self, msg, *args):
orig_stdout = sys.stdout
sys.stdout = fake_stdout = io.StringIO()
tl.Telnet.msg(self, msg, *args)
self._messages += fake_stdout.getvalue()
sys.stdout = orig_stdout
return
class OptionTests(TestCase): class OptionTests(TestCase):
setUp = _read_setUp
tearDown = _read_tearDown
# RFC 854 commands # RFC 854 commands
cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
def _test_command(self, data): def _test_command(self, data):
""" helper for testing IAC + cmd """ """ helper for testing IAC + cmd """
self.setUp() telnet = test_telnet(data)
self.dataq.put(data) data_len = len(b''.join(data))
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector() nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
@ -401,24 +286,16 @@ class OptionTests(TestCase):
self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(len(cmd) > 0) # we expect at least one command
self.assertTrue(cmd[:1] in self.cmds) self.assertTrue(cmd[:1] in self.cmds)
self.assertEqual(cmd[1:2], tl.NOOPT) self.assertEqual(cmd[1:2], tl.NOOPT)
self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd)) self.assertEqual(data_len, len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
self.tearDown()
def test_IAC_commands(self): def test_IAC_commands(self):
# reset our setup
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
self.tearDown()
for cmd in self.cmds: for cmd in self.cmds:
self._test_command([tl.IAC, cmd, EOF_sigil]) self._test_command([tl.IAC, cmd])
self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil]) self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil]) self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
# all at once # all at once
self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
self.assertEqual(b'', telnet.read_sb_data())
def test_SB_commands(self): def test_SB_commands(self):
# RFC 855, subnegotiations portion # RFC 855, subnegotiations portion
@ -427,11 +304,8 @@ class OptionTests(TestCase):
tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
EOF_sigil,
] ]
self.dataq.put(send) telnet = test_telnet(send)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector(telnet.read_sb_data) nego = nego_collector(telnet.read_sb_data)
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
@ -441,19 +315,6 @@ class OptionTests(TestCase):
self.assertEqual(b'', telnet.read_sb_data()) self.assertEqual(b'', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
def _test_debuglevel(self, data, expected_msg):
""" helper for testing debuglevel messages """
self.setUp()
self.dataq.put(data + [EOF_sigil])
telnet = TelnetDebuglevel(HOST, self.port)
telnet.set_debuglevel(1)
self.dataq.join()
txt = telnet.read_all()
self.assertTrue(expected_msg in telnet._messages,
msg=(telnet._messages, expected_msg))
telnet.close()
self.tearDown()
def test_debuglevel_reads(self): def test_debuglevel_reads(self):
# test all the various places that self.msg(...) is called # test all the various places that self.msg(...) is called
given_a_expect_b = [ given_a_expect_b = [
@ -467,21 +328,18 @@ class OptionTests(TestCase):
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
] ]
for a, b in given_a_expect_b: for a, b in given_a_expect_b:
self._test_debuglevel([a, EOF_sigil], b) telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
self.assertTrue(b in telnet._messages)
return return
def test_debuglevel_write(self): def test_debuglevel_write(self):
self.setUp() telnet = test_telnet()
telnet = TelnetDebuglevel(HOST, self.port)
telnet.set_debuglevel(1) telnet.set_debuglevel(1)
self.dataq.put([b'', EOF_sigil])
self.dataq.join()
telnet.write(b'xxx') telnet.write(b'xxx')
expected = "send b'xxx'\n" expected = "send b'xxx'\n"
self.assertTrue(expected in telnet._messages, self.assertTrue(expected in telnet._messages)
msg=(telnet._messages, expected))
telnet.close()
self.tearDown()
def test_main(verbose=None): def test_main(verbose=None):
support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests) support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)