mirror of https://github.com/python/cpython
- Make timing assertions very generous (a la test_timeout.py)
- Break the gc cycle in negotiation tests - test the different guarantees of read_lazy and read_very_lazy
This commit is contained in:
parent
085b998a36
commit
7f9bb9142e
|
@ -23,17 +23,15 @@ def server(evt, serv, dataq=None):
|
|||
conn, addr = serv.accept()
|
||||
if dataq:
|
||||
data = ''
|
||||
done = False
|
||||
for new_data in dataq.get(True, 0.5):
|
||||
if not done:
|
||||
dataq.task_done()
|
||||
done = True
|
||||
if new_data == EOF_sigil:
|
||||
new_data = dataq.get(True, 0.5)
|
||||
dataq.task_done()
|
||||
for item in new_data:
|
||||
if item == EOF_sigil:
|
||||
break
|
||||
if type(new_data) in [int, float]:
|
||||
time.sleep(new_data)
|
||||
if type(item) in [int, float]:
|
||||
time.sleep(item)
|
||||
else:
|
||||
data += new_data
|
||||
data += item
|
||||
written = conn.send(data)
|
||||
data = data[written:]
|
||||
except socket.timeout:
|
||||
|
@ -42,10 +40,6 @@ def server(evt, serv, dataq=None):
|
|||
serv.close()
|
||||
evt.set()
|
||||
|
||||
def wibble_float(num):
|
||||
''' return a (low, high) tuple that are 1% more and 1% less of num '''
|
||||
return num * 0.99, num * 1.01
|
||||
|
||||
class GeneralTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -101,8 +95,6 @@ class GeneralTests(TestCase):
|
|||
telnet.sock.close()
|
||||
|
||||
def _read_setUp(self):
|
||||
# the blocking constant should be tuned!
|
||||
self.blocking_timeout = 0.3
|
||||
self.evt = threading.Event()
|
||||
self.dataq = Queue.Queue()
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
@ -122,6 +114,10 @@ class ReadTests(TestCase):
|
|||
setUp = _read_setUp
|
||||
tearDown = _read_tearDown
|
||||
|
||||
# use a similar approach to testing timeouts as test_timeout.py
|
||||
# these will never pass 100% but make the fuzz big enough that it is rare
|
||||
block_long = 0.6
|
||||
block_short = 0.3
|
||||
def test_read_until_A(self):
|
||||
"""
|
||||
read_until(expected, [timeout])
|
||||
|
@ -137,12 +133,13 @@ class ReadTests(TestCase):
|
|||
|
||||
def test_read_until_B(self):
|
||||
# test the timeout - it does NOT raise socket.timeout
|
||||
want = ['hello', self.blocking_timeout, EOF_sigil]
|
||||
want = ['hello', self.block_long, 'not seen', EOF_sigil]
|
||||
self.dataq.put(want)
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
data = telnet.read_until('not seen')
|
||||
data = telnet.read_until('not seen', self.block_short)
|
||||
self.assertEqual(data, want[0])
|
||||
self.assertEqual(telnet.read_all(), 'not seen')
|
||||
|
||||
def test_read_all_A(self):
|
||||
"""
|
||||
|
@ -158,12 +155,11 @@ class ReadTests(TestCase):
|
|||
return
|
||||
|
||||
def _test_blocking(self, func):
|
||||
self.dataq.put([self.blocking_timeout, EOF_sigil])
|
||||
self.dataq.put([self.block_long, EOF_sigil])
|
||||
self.dataq.join()
|
||||
start = time.time()
|
||||
data = func()
|
||||
low, high = wibble_float(self.blocking_timeout)
|
||||
self.assertTrue(low <= time.time() - start)
|
||||
self.assertTrue(self.block_short <= time.time() - start)
|
||||
|
||||
def test_read_all_B(self):
|
||||
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
|
||||
|
@ -204,9 +200,8 @@ class ReadTests(TestCase):
|
|||
Read all data available already queued or on the socket,
|
||||
without blocking.
|
||||
"""
|
||||
# this never blocks so it should return eat part in turn
|
||||
want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
|
||||
expects = want[0] + want[2]
|
||||
want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
|
||||
expects = want[1] + want[2]
|
||||
self.dataq.put(want)
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
|
@ -225,6 +220,7 @@ class ReadTests(TestCase):
|
|||
self.dataq.put([EOF_sigil])
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
time.sleep(self.block_short)
|
||||
func = getattr(telnet, func_name)
|
||||
self.assertRaises(EOFError, func)
|
||||
|
||||
|
@ -241,47 +237,60 @@ class ReadTests(TestCase):
|
|||
# NB -- we need to test the IAC block which is mentioned in the docstring
|
||||
# but not in the module docs
|
||||
|
||||
def _test_read_any_lazy_A(self, func_name):
|
||||
want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
|
||||
self.dataq.put(want)
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
func = getattr(telnet, func_name)
|
||||
self.assertEqual('', func())
|
||||
data = ''
|
||||
while True:
|
||||
time.sleep(0.0)
|
||||
try:
|
||||
telnet.fill_rawq()
|
||||
data += func()
|
||||
if not data:
|
||||
break
|
||||
except EOFError:
|
||||
break
|
||||
self.assertTrue(want[1].startswith(data))
|
||||
return data, want[1]
|
||||
|
||||
def _test_read_any_lazy_B(self, func_name):
|
||||
self.dataq.put([EOF_sigil])
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
func = getattr(telnet, func_name)
|
||||
time.sleep(0.0)
|
||||
telnet.fill_rawq()
|
||||
self.assertRaises(EOFError, func)
|
||||
|
||||
# read_lazy and read_very_lazy make the samish gaurantees
|
||||
def test_read_very_lazy_A(self):
|
||||
data, want = self._test_read_any_lazy_A('read_very_lazy')
|
||||
self.assertEqual(data, '')
|
||||
def test_read_lazy(self):
|
||||
data, want = self._test_read_any_lazy_A('read_lazy')
|
||||
self.assertEqual(data, want)
|
||||
def test_read_very_lazy_B(self):
|
||||
self._test_read_any_lazy_B('read_very_lazy')
|
||||
def test_read_lazy_A(self):
|
||||
want = ['x' * 100, EOF_sigil]
|
||||
self.dataq.put(want)
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
time.sleep(self.block_short)
|
||||
self.assertEqual('', telnet.read_lazy())
|
||||
data = ''
|
||||
while True:
|
||||
try:
|
||||
read_data = telnet.read_lazy()
|
||||
data += read_data
|
||||
if not read_data:
|
||||
telnet.fill_rawq()
|
||||
except EOFError:
|
||||
break
|
||||
self.assertTrue(want[0].startswith(data))
|
||||
self.assertEqual(data, want[0])
|
||||
|
||||
def test_read_lazy_B(self):
|
||||
self._test_read_any_lazy_B('read_lazy')
|
||||
|
||||
def test_read_very_lazy_A(self):
|
||||
want = ['x' * 100, EOF_sigil]
|
||||
self.dataq.put(want)
|
||||
telnet = telnetlib.Telnet(HOST, self.port)
|
||||
self.dataq.join()
|
||||
time.sleep(self.block_short)
|
||||
self.assertEqual('', telnet.read_very_lazy())
|
||||
data = ''
|
||||
while True:
|
||||
try:
|
||||
read_data = telnet.read_very_lazy()
|
||||
except EOFError:
|
||||
break
|
||||
data += read_data
|
||||
if not read_data:
|
||||
telnet.fill_rawq()
|
||||
self.assertEqual('', telnet.cookedq)
|
||||
telnet.process_rawq()
|
||||
self.assertTrue(want[0].startswith(data))
|
||||
self.assertEqual(data, want[0])
|
||||
|
||||
def test_read_very_lazy_B(self):
|
||||
self._test_read_any_lazy_B('read_very_lazy')
|
||||
|
||||
class nego_collector(object):
|
||||
def __init__(self, sb_getter=None):
|
||||
self.seen = ''
|
||||
|
@ -315,6 +324,7 @@ class OptionTests(TestCase):
|
|||
self.assertTrue(cmd[0] in self.cmds)
|
||||
self.assertEqual(cmd[1], tl.NOOPT)
|
||||
self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
|
||||
nego.sb_getter = None # break the nego => telnet cycle
|
||||
self.tearDown()
|
||||
|
||||
def test_IAC_commands(self):
|
||||
|
@ -351,6 +361,7 @@ class OptionTests(TestCase):
|
|||
want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
|
||||
self.assertEqual(nego.sb_seen, want_sb_data)
|
||||
self.assertEqual('', telnet.read_sb_data())
|
||||
nego.sb_getter = None # break the nego => telnet cycle
|
||||
|
||||
def test_main(verbose=None):
|
||||
test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
|
||||
|
|
Loading…
Reference in New Issue