From 1fa0c3f5bf2349ffa231608f25de2912479d89bd Mon Sep 17 00:00:00 2001 From: Jack Diederich Date: Fri, 6 Nov 2009 21:14:59 +0000 Subject: [PATCH] Merged revisions 74638,76133 via svnmerge from svn+ssh://pythondev@svn.python.org/python/branches/py3k ........ r74638 | jack.diederich | 2009-09-03 16:37:58 -0400 (Thu, 03 Sep 2009) | 4 lines - apply issue 6582 to test all the write methods of telnetlib - add patch author to ACKS - possibly fix test timeouts on non-linux platforms ........ r76133 | jack.diederich | 2009-11-06 12:20:42 -0500 (Fri, 06 Nov 2009) | 3 lines - issue #6748 intermittent test failures from sockets - telnetlib tests now use mock sockets for most tests ........ --- Lib/test/test_telnetlib.py | 380 +++++++++++++++---------------------- Misc/ACKS | 1 + 2 files changed, 157 insertions(+), 224 deletions(-) diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py index 662c7151cf0..d57fda13374 100644 --- a/Lib/test/test_telnetlib.py +++ b/Lib/test/test_telnetlib.py @@ -1,41 +1,20 @@ import socket +import select import threading import telnetlib import time -import queue -import sys -import io +import contextlib from unittest import TestCase from test import support HOST = support.HOST -EOF_sigil = object() -def server(evt, serv, dataq=None): - """ Open a tcp server in three steps - 1) set evt to true to let the parent know we are ready - 2) [optional] if is not False, write the list of data from dataq.get() - to the socket. - 3) set evt to true to let the parent know we're done - """ +def server(evt, serv): serv.listen(5) evt.set() try: conn, addr = serv.accept() - if dataq: - data = b'' - new_data = dataq.get(True, 0.5) - dataq.task_done() - for item in new_data: - if item == EOF_sigil: - break - if type(item) in [int, float]: - time.sleep(item) - else: - data += item - written = conn.send(data) - data = data[written:] except socket.timeout: pass finally: @@ -96,163 +75,159 @@ class GeneralTests(TestCase): self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() -def _read_setUp(self): - self.evt = threading.Event() - self.dataq = queue.Queue() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(3) - self.port = support.bind_port(self.sock) - self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) - self.thread.start() - self.evt.wait() - self.evt.clear() - time.sleep(.1) +class SocketStub(object): + ''' a socket proxy that re-defines sendall() ''' + def __init__(self, reads=[]): + self.reads = reads + self.writes = [] + self.block = False + def sendall(self, data): + self.writes.append(data) + def recv(self, size): + out = b'' + while self.reads and len(out) < size: + out += self.reads.pop(0) + if len(out) > size: + self.reads.insert(0, out[size:]) + out = out[:size] + return out -def _read_tearDown(self): - self.evt.wait() - self.thread.join() +class TelnetAlike(telnetlib.Telnet): + def fileno(self): + raise NotImplementedError() + def close(self): pass + def sock_avail(self): + return (not self.sock.block) + def msg(self, msg, *args): + with support.captured_stdout() as out: + telnetlib.Telnet.msg(self, msg, *args) + self._messages += out.getvalue() + return + +def new_select(*s_args): + block = False + for l in s_args: + for fob in l: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [[], [], []] + else: + return s_args + +@contextlib.contextmanager +def test_socket(reads): + def new_conn(*ignored): + return SocketStub(reads) + try: + old_conn = socket.create_connection + socket.create_connection = new_conn + yield None + finally: + socket.create_connection = old_conn + return + +def test_telnet(reads=[], cls=TelnetAlike): + ''' return a telnetlib.Telnet object that uses a SocketStub with + reads queued up to be read ''' + for x in reads: + assert type(x) is bytes, x + with test_socket(reads): + telnet = cls('dummy', 0) + telnet._messages = '' # debuglevel output + return telnet class ReadTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown + def setUp(self): + self.old_select = select.select + select.select = new_select + def tearDown(self): + select.select = self.old_select - # use a similar approach to testing timeouts as test_timeout.py - # these will never pass 100% but make the fuzz big enough that it is rare - block_long = 0.6 - block_short = 0.3 - def test_read_until_A(self): + def test_read_until(self): """ - read_until(expected, [timeout]) - Read until the expected string has been seen, or a timeout is - hit (default is no timeout); may block. + read_until(expected, timeout=None) + test the blocking version of read_util """ - want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + want = [b'xxxmatchyyy'] + telnet = test_telnet(want) data = telnet.read_until(b'match') - self.assertEqual(data, b''.join(want[:-2])) + self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) - def test_read_until_B(self): - # test the timeout - it does NOT raise socket.timeout - want = [b'hello', self.block_long, b'not seen', EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_until(b'not seen', self.block_short) - self.assertEqual(data, want[0]) - self.assertEqual(telnet.read_all(), b'not seen') + reads = [b'x' * 50, b'match', b'y' * 50] + expect = b''.join(reads[:-1]) + telnet = test_telnet(reads) + data = telnet.read_until(b'match') + self.assertEqual(data, expect) - def test_read_all_A(self): + + def test_read_all(self): """ read_all() Read all data until EOF; may block. """ - want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + reads = [b'x' * 500, b'y' * 500, b'z' * 500] + expect = b''.join(reads) + telnet = test_telnet(reads) data = telnet.read_all() - self.assertEqual(data, b''.join(want[:-1])) + self.assertEqual(data, expect) return - def _test_blocking(self, func): - self.dataq.put([self.block_long, EOF_sigil]) - self.dataq.join() - start = time.time() - data = func() - self.assertTrue(self.block_short <= time.time() - start) - - def test_read_all_B(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) - - def test_read_all_C(self): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - telnet.read_all() - telnet.read_all() # shouldn't raise - - def test_read_some_A(self): + def test_read_some(self): """ read_some() Read at least one byte or EOF; may block. """ # test 'at least one byte' - want = [b'x' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_all() + telnet = test_telnet([b'x' * 500]) + data = telnet.read_some() self.assertTrue(len(data) >= 1) - - def test_read_some_B(self): # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.assertEqual(b'', telnet.read_some()) + telnet = test_telnet() + data = telnet.read_some() + self.assertEqual(b'', data) - def test_read_some_C(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) - - def _test_read_any_eager_A(self, func_name): + def _read_eager(self, func_name): """ - read_very_eager() + read_*_eager() Read all data available already queued or on the socket, without blocking. """ - want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil] - expects = want[1] + want[2] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + want = b'x' * 100 + telnet = test_telnet([want]) func = getattr(telnet, func_name) + telnet.sock.block = True + self.assertEqual(b'', func()) + telnet.sock.block = False data = b'' while True: try: data += func() - self.assertTrue(expects.startswith(data)) except EOFError: break - self.assertEqual(expects, data) + self.assertEqual(data, want) - def _test_read_any_eager_B(self, func_name): - # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - func = getattr(telnet, func_name) - self.assertRaises(EOFError, func) + def test_read_eager(self): + # read_eager and read_very_eager make the same gaurantees + # (they behave differently but we only test the gaurantees) + self._read_eager('read_eager') + self._read_eager('read_very_eager') + # NB -- we need to test the IAC block which is mentioned in the + # docstring but not in the module docs - # read_eager and read_very_eager make the same gaurantees - # (they behave differently but we only test the gaurantees) - def test_read_very_eager_A(self): - self._test_read_any_eager_A('read_very_eager') - def test_read_very_eager_B(self): - self._test_read_any_eager_B('read_very_eager') - def test_read_eager_A(self): - self._test_read_any_eager_A('read_eager') - def test_read_eager_B(self): - self._test_read_any_eager_B('read_eager') - # NB -- we need to test the IAC block which is mentioned in the docstring - # but not in the module docs + def read_very_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_very_lazy()) + while telnet.sock.reads: + telnet.fill_rawq() + data = telnet.read_very_lazy() + self.assertEqual(want, data) + self.assertRaises(EOFError, telnet.read_very_lazy) - def _test_read_any_lazy_B(self, func_name): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - func = getattr(telnet, func_name) - telnet.fill_rawq() - self.assertRaises(EOFError, func) - - def test_read_lazy_A(self): - want = [b'x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) + def test_read_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) self.assertEqual(b'', telnet.read_lazy()) data = b'' while True: @@ -263,35 +238,8 @@ class ReadTests(TestCase): telnet.fill_rawq() except EOFError: break - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) - - def test_read_lazy_B(self): - self._test_read_any_lazy_B('read_lazy') - - def test_read_very_lazy_A(self): - want = [b'x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - self.assertEqual(b'', telnet.read_very_lazy()) - data = b'' - while True: - try: - read_data = telnet.read_very_lazy() - except EOFError: - break - data += read_data - if not read_data: - telnet.fill_rawq() - self.assertEqual(b'', telnet.cookedq) - telnet.process_rawq() - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) - - def test_read_very_lazy_B(self): - self._test_read_any_lazy_B('read_very_lazy') + self.assertTrue(want.startswith(data)) + self.assertEqual(data, want) class nego_collector(object): def __init__(self, sb_getter=None): @@ -307,31 +255,30 @@ class nego_collector(object): tl = telnetlib -class TelnetDebuglevel(tl.Telnet): - ''' Telnet-alike that captures messages written to stdout when - debuglevel > 0 - ''' - _messages = '' - def msg(self, msg, *args): - orig_stdout = sys.stdout - sys.stdout = fake_stdout = io.StringIO() - tl.Telnet.msg(self, msg, *args) - self._messages += fake_stdout.getvalue() - sys.stdout = orig_stdout - return +class WriteTests(TestCase): + '''The only thing that write does is replace each tl.IAC for + tl.IAC+tl.IAC''' + + def test_write(self): + data_sample = [b'data sample without IAC', + b'data sample with' + tl.IAC + b' one IAC', + b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, + tl.IAC, + b''] + for data in data_sample: + telnet = test_telnet() + telnet.write(data) + written = b''.join(telnet.sock.writes) + self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) class OptionTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown # RFC 854 commands cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] def _test_command(self, data): """ helper for testing IAC + cmd """ - self.setUp() - self.dataq.put(data) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(data) + data_len = len(b''.join(data)) nego = nego_collector() telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() @@ -339,24 +286,16 @@ class OptionTests(TestCase): self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(cmd[:1] in self.cmds) self.assertEqual(cmd[1:2], tl.NOOPT) - self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd)) + self.assertEqual(data_len, len(txt + cmd)) nego.sb_getter = None # break the nego => telnet cycle - self.tearDown() def test_IAC_commands(self): - # reset our setup - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.tearDown() - for cmd in self.cmds: - self._test_command([tl.IAC, cmd, EOF_sigil]) - self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil]) - self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil]) + self._test_command([tl.IAC, cmd]) + self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) + self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) # all at once - self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) - self.assertEqual(b'', telnet.read_sb_data()) + self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) def test_SB_commands(self): # RFC 855, subnegotiations portion @@ -365,11 +304,8 @@ class OptionTests(TestCase): tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, - EOF_sigil, ] - self.dataq.put(send) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(send) nego = nego_collector(telnet.read_sb_data) telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() @@ -379,19 +315,7 @@ class OptionTests(TestCase): self.assertEqual(b'', telnet.read_sb_data()) nego.sb_getter = None # break the nego => telnet cycle - def _test_debuglevel(self, data, expected_msg): - """ helper for testing debuglevel messages """ - self.setUp() - self.dataq.put(data) - telnet = TelnetDebuglevel(HOST, self.port) - telnet.set_debuglevel(1) - self.dataq.join() - txt = telnet.read_all() - self.assertTrue(expected_msg in telnet._messages, - msg=(telnet._messages, expected_msg)) - self.tearDown() - - def test_debuglevel(self): + def test_debuglevel_reads(self): # test all the various places that self.msg(...) is called given_a_expect_b = [ # Telnet.fill_rawq @@ -402,15 +326,23 @@ class OptionTests(TestCase): (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), - # Telnet.write - # XXX, untested ] for a, b in given_a_expect_b: - self._test_debuglevel([a, EOF_sigil], b) + telnet = test_telnet([a]) + telnet.set_debuglevel(1) + txt = telnet.read_all() + self.assertTrue(b in telnet._messages) return + def test_debuglevel_write(self): + telnet = test_telnet() + telnet.set_debuglevel(1) + telnet.write(b'xxx') + expected = "send b'xxx'\n" + self.assertTrue(expected in telnet._messages) + def test_main(verbose=None): - support.run_unittest(GeneralTests, ReadTests, OptionTests) + support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests) if __name__ == '__main__': test_main() diff --git a/Misc/ACKS b/Misc/ACKS index bd2dd6944be..7715730157f 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -774,6 +774,7 @@ Wojtek Walczak Charles Waldman Richard Walker Larry Wall +Rodrigo Steinmuller Wanderley Greg Ward Barry Warsaw Steve Waterbury