eliminate more race conditions in telnetlib tests

This commit is contained in:
Jack Diederich 2009-04-07 20:22:59 +00:00
parent 14bf0a0a37
commit 3b2312ee5c
1 changed files with 58 additions and 51 deletions

View File

@ -13,8 +13,8 @@ EOF_sigil = object()
def server(evt, serv, dataq=None): def server(evt, serv, dataq=None):
""" Open a tcp server in three steps """ Open a tcp server in three steps
1) set evt to true to let the parent know we are ready 1) set evt to true to let the parent know we are ready
2) [optional] write all the data in dataq to the socket 2) [optional] if is not False, write the list of data from dataq.get()
terminate when dataq.get() returns EOF_sigil to the socket.
3) set evt to true to let the parent know we're done 3) set evt to true to let the parent know we're done
""" """
serv.listen(5) serv.listen(5)
@ -23,15 +23,19 @@ def server(evt, serv, dataq=None):
conn, addr = serv.accept() conn, addr = serv.accept()
if dataq: if dataq:
data = '' data = ''
new_data = dataq.get(True, 0.5) done = False
while new_data is not EOF_sigil: for new_data in dataq.get(True, 0.5):
if type(new_data) == str: if not done:
data += new_data dataq.task_done()
elif type(new_data) in [int, float]: done = True
if new_data == EOF_sigil:
break
if type(new_data) in [int, float]:
time.sleep(new_data) time.sleep(new_data)
else:
data += new_data
written = conn.send(data) written = conn.send(data)
data = data[written:] data = data[written:]
new_data = dataq.get(True, 0.5)
except socket.timeout: except socket.timeout:
pass pass
finally: finally:
@ -98,7 +102,7 @@ class GeneralTests(TestCase):
def _read_setUp(self): def _read_setUp(self):
# the blocking constant should be tuned! # the blocking constant should be tuned!
self.blocking_timeout = 0.0 self.blocking_timeout = 0.3
self.evt = threading.Event() self.evt = threading.Event()
self.dataq = Queue.Queue() self.dataq = Queue.Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -114,19 +118,10 @@ def _read_tearDown(self):
self.evt.wait() self.evt.wait()
self.thread.join() self.thread.join()
class ReadTests(TestCase): class ReadTests(TestCase):
setUp = _read_setUp setUp = _read_setUp
tearDown = _read_tearDown tearDown = _read_tearDown
def _test_blocking(self, func):
start = time.time()
self.dataq.put(self.blocking_timeout)
self.dataq.put(EOF_sigil)
data = func()
low, high = wibble_float(self.blocking_timeout)
self.assertTrue(time.time() - start >= low)
def test_read_until_A(self): def test_read_until_A(self):
""" """
read_until(expected, [timeout]) read_until(expected, [timeout])
@ -134,23 +129,19 @@ class ReadTests(TestCase):
hit (default is no timeout); may block. hit (default is no timeout); may block.
""" """
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_until('match') data = telnet.read_until('match')
self.assertEqual(data, ''.join(want[:-2])) self.assertEqual(data, ''.join(want[:-2]))
def test_read_until_B(self): def test_read_until_B(self):
# test the timeout - it does NOT raise socket.timeout # test the timeout - it does NOT raise socket.timeout
want = ['hello', self.blocking_timeout, EOF_sigil] want = ['hello', self.blocking_timeout, EOF_sigil]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
start = time.time() self.dataq.join()
timeout = self.blocking_timeout / 2 data = telnet.read_until('not seen')
data = telnet.read_until('not seen', timeout)
low, high = wibble_float(timeout)
self.assertTrue(low <= time.time() - high)
self.assertEqual(data, want[0]) self.assertEqual(data, want[0])
def test_read_all_A(self): def test_read_all_A(self):
@ -159,16 +150,31 @@ class ReadTests(TestCase):
Read all data until EOF; may block. Read all data until EOF; may block.
""" """
want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_all() data = telnet.read_all()
self.assertEqual(data, ''.join(want[:-1])) self.assertEqual(data, ''.join(want[:-1]))
return return
def _test_blocking(self, func):
self.dataq.put([self.blocking_timeout, EOF_sigil])
self.dataq.join()
start = time.time()
data = func()
low, high = wibble_float(self.blocking_timeout)
self.assertTrue(low <= time.time() - start)
def test_read_all_B(self): def test_read_all_B(self):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
def test_read_all_C(self):
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
telnet.read_all()
telnet.read_all() # shouldn't raise
def test_read_some_A(self): def test_read_some_A(self):
""" """
read_some() read_some()
@ -176,19 +182,20 @@ class ReadTests(TestCase):
""" """
# test 'at least one byte' # test 'at least one byte'
want = ['x' * 500, EOF_sigil] want = ['x' * 500, EOF_sigil]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_all() data = telnet.read_all()
self.assertTrue(len(data) >= 1) self.assertTrue(len(data) >= 1)
def test_read_some_B(self): def test_read_some_B(self):
# test EOF # test EOF
self.dataq.put(EOF_sigil) self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
self.assertEqual('', telnet.read_some()) self.assertEqual('', telnet.read_some())
def test_read_all_C(self): def test_read_some_C(self):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
def _test_read_any_eager_A(self, func_name): def _test_read_any_eager_A(self, func_name):
@ -200,26 +207,24 @@ class ReadTests(TestCase):
# this never blocks so it should return eat part in turn # this never blocks so it should return eat part in turn
want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil] want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
expects = want[0] + want[2] expects = want[0] + want[2]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
time.sleep(self.blocking_timeout/10)
data = '' data = ''
while True: while True:
try: try:
data += func() data += func()
self.assertTrue(expects.startswith(data)) self.assertTrue(expects.startswith(data))
time.sleep(self.blocking_timeout)
except EOFError: except EOFError:
break break
self.assertEqual(expects, data) self.assertEqual(expects, data)
def _test_read_any_eager_B(self, func_name): def _test_read_any_eager_B(self, func_name):
# test EOF # test EOF
self.dataq.put(EOF_sigil) self.dataq.put([EOF_sigil])
time.sleep(self.blocking_timeout / 10)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
self.assertRaises(EOFError, func) self.assertRaises(EOFError, func)
@ -238,14 +243,14 @@ class ReadTests(TestCase):
def _test_read_any_lazy_A(self, func_name): def _test_read_any_lazy_A(self, func_name):
want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil] want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
for item in want: self.dataq.put(want)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
self.assertEqual('', func()) self.assertEqual('', func())
data = '' data = ''
while True: while True:
time.sleep(self.blocking_timeout) time.sleep(0.0)
try: try:
telnet.fill_rawq() telnet.fill_rawq()
data += func() data += func()
@ -257,10 +262,11 @@ class ReadTests(TestCase):
return data, want[1] return data, want[1]
def _test_read_any_lazy_B(self, func_name): def _test_read_any_lazy_B(self, func_name):
self.dataq.put(EOF_sigil) self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
time.sleep(self.blocking_timeout/10) time.sleep(0.0)
telnet.fill_rawq() telnet.fill_rawq()
self.assertRaises(EOFError, func) self.assertRaises(EOFError, func)
@ -298,12 +304,11 @@ class OptionTests(TestCase):
def _test_command(self, data): def _test_command(self, data):
""" helper for testing IAC + cmd """ """ helper for testing IAC + cmd """
self.setUp() self.setUp()
for item in data: self.dataq.put(data)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector() nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
time.sleep(self.blocking_timeout/10)
txt = telnet.read_all() txt = telnet.read_all()
cmd = nego.seen cmd = nego.seen
self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(len(cmd) > 0) # we expect at least one command
@ -314,8 +319,9 @@ class OptionTests(TestCase):
def test_IAC_commands(self): def test_IAC_commands(self):
# reset our setup # reset our setup
self.dataq.put(EOF_sigil) self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
self.tearDown() self.tearDown()
for cmd in self.cmds: for cmd in self.cmds:
@ -324,6 +330,7 @@ class OptionTests(TestCase):
self._test_command([tl.IAC + cmd, EOF_sigil]) self._test_command([tl.IAC + cmd, EOF_sigil])
# all at once # all at once
self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
self.assertEqual('', telnet.read_sb_data())
def test_SB_commands(self): def test_SB_commands(self):
# RFC 855, subnegotiations portion # RFC 855, subnegotiations portion
@ -334,16 +341,16 @@ class OptionTests(TestCase):
tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE, tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
EOF_sigil, EOF_sigil,
] ]
for item in send: self.dataq.put(send)
self.dataq.put(item)
telnet = telnetlib.Telnet(HOST, self.port) telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector(telnet.read_sb_data) nego = nego_collector(telnet.read_sb_data)
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
time.sleep(self.blocking_timeout/10)
txt = telnet.read_all() txt = telnet.read_all()
self.assertEqual(txt, '') self.assertEqual(txt, '')
want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd' want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
self.assertEqual(nego.sb_seen, want_sb_data) self.assertEqual(nego.sb_seen, want_sb_data)
self.assertEqual('', telnet.read_sb_data())
def test_main(verbose=None): def test_main(verbose=None):
test_support.run_unittest(GeneralTests, ReadTests, OptionTests) test_support.run_unittest(GeneralTests, ReadTests, OptionTests)