2007-03-29 15:22:35 -03:00
|
|
|
import socket
|
|
|
|
import threading
|
|
|
|
import telnetlib
|
|
|
|
import time
|
2009-04-05 23:08:44 -03:00
|
|
|
import Queue
|
2007-03-29 15:22:35 -03:00
|
|
|
|
|
|
|
from unittest import TestCase
|
|
|
|
from test import test_support
|
|
|
|
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
HOST = test_support.HOST
|
2009-04-05 23:08:44 -03:00
|
|
|
EOF_sigil = object()
|
2007-03-29 15:22:35 -03:00
|
|
|
|
2009-04-05 23:08:44 -03:00
|
|
|
def server(evt, serv, dataq=None):
|
|
|
|
""" Open a tcp server in three steps
|
|
|
|
1) set evt to true to let the parent know we are ready
|
2009-04-07 17:22:59 -03:00
|
|
|
2) [optional] if is not False, write the list of data from dataq.get()
|
|
|
|
to the socket.
|
2009-04-05 23:08:44 -03:00
|
|
|
3) set evt to true to let the parent know we're done
|
|
|
|
"""
|
2007-03-29 15:22:35 -03:00
|
|
|
serv.listen(5)
|
2008-01-26 17:21:59 -04:00
|
|
|
evt.set()
|
2007-03-29 15:22:35 -03:00
|
|
|
try:
|
|
|
|
conn, addr = serv.accept()
|
2009-04-05 23:08:44 -03:00
|
|
|
if dataq:
|
|
|
|
data = ''
|
2009-04-07 20:56:57 -03:00
|
|
|
new_data = dataq.get(True, 0.5)
|
|
|
|
dataq.task_done()
|
|
|
|
for item in new_data:
|
|
|
|
if item == EOF_sigil:
|
2009-04-07 17:22:59 -03:00
|
|
|
break
|
2009-04-07 20:56:57 -03:00
|
|
|
if type(item) in [int, float]:
|
|
|
|
time.sleep(item)
|
2009-04-07 17:22:59 -03:00
|
|
|
else:
|
2009-04-07 20:56:57 -03:00
|
|
|
data += item
|
2009-04-05 23:08:44 -03:00
|
|
|
written = conn.send(data)
|
|
|
|
data = data[written:]
|
2007-03-29 15:22:35 -03:00
|
|
|
except socket.timeout:
|
|
|
|
pass
|
|
|
|
finally:
|
|
|
|
serv.close()
|
|
|
|
evt.set()
|
|
|
|
|
|
|
|
class GeneralTests(TestCase):
|
2007-04-25 03:30:05 -03:00
|
|
|
|
2007-03-29 15:22:35 -03:00
|
|
|
def setUp(self):
|
|
|
|
self.evt = threading.Event()
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
self.sock.settimeout(3)
|
|
|
|
self.port = test_support.bind_port(self.sock)
|
2009-04-05 23:08:44 -03:00
|
|
|
self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
|
|
|
|
self.thread.start()
|
2008-01-26 17:21:59 -04:00
|
|
|
self.evt.wait()
|
|
|
|
self.evt.clear()
|
2007-03-29 15:22:35 -03:00
|
|
|
time.sleep(.1)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.evt.wait()
|
2009-04-05 23:08:44 -03:00
|
|
|
self.thread.join()
|
2007-03-29 15:22:35 -03:00
|
|
|
|
|
|
|
def testBasic(self):
|
|
|
|
# connects
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2007-03-29 15:22:35 -03:00
|
|
|
telnet.sock.close()
|
2007-04-25 03:30:05 -03:00
|
|
|
|
2007-03-29 15:22:35 -03:00
|
|
|
def testTimeoutDefault(self):
|
2008-05-29 13:39:26 -03:00
|
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
|
|
|
socket.setdefaulttimeout(30)
|
|
|
|
try:
|
|
|
|
telnet = telnetlib.Telnet("localhost", self.port)
|
|
|
|
finally:
|
|
|
|
socket.setdefaulttimeout(None)
|
2007-03-29 15:22:35 -03:00
|
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
|
|
telnet.sock.close()
|
|
|
|
|
|
|
|
def testTimeoutNone(self):
|
|
|
|
# None, having other default
|
2008-05-29 13:39:26 -03:00
|
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
2007-03-29 15:22:35 -03:00
|
|
|
socket.setdefaulttimeout(30)
|
|
|
|
try:
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
|
2007-03-29 15:22:35 -03:00
|
|
|
finally:
|
2008-05-29 13:39:26 -03:00
|
|
|
socket.setdefaulttimeout(None)
|
|
|
|
self.assertTrue(telnet.sock.gettimeout() is None)
|
|
|
|
telnet.sock.close()
|
|
|
|
|
|
|
|
def testTimeoutValue(self):
|
|
|
|
telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
|
2007-03-29 15:22:35 -03:00
|
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
|
|
telnet.sock.close()
|
|
|
|
|
2008-05-29 13:39:26 -03:00
|
|
|
def testTimeoutOpen(self):
|
|
|
|
telnet = telnetlib.Telnet()
|
|
|
|
telnet.open("localhost", self.port, timeout=30)
|
|
|
|
self.assertEqual(telnet.sock.gettimeout(), 30)
|
|
|
|
telnet.sock.close()
|
2007-03-29 15:22:35 -03:00
|
|
|
|
2009-04-05 23:08:44 -03:00
|
|
|
def _read_setUp(self):
|
|
|
|
self.evt = threading.Event()
|
|
|
|
self.dataq = Queue.Queue()
|
|
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
self.sock.settimeout(3)
|
|
|
|
self.port = test_support.bind_port(self.sock)
|
|
|
|
self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
|
|
|
|
self.thread.start()
|
|
|
|
self.evt.wait()
|
|
|
|
self.evt.clear()
|
|
|
|
time.sleep(.1)
|
|
|
|
|
|
|
|
def _read_tearDown(self):
|
|
|
|
self.evt.wait()
|
|
|
|
self.thread.join()
|
|
|
|
|
|
|
|
class ReadTests(TestCase):
|
|
|
|
setUp = _read_setUp
|
|
|
|
tearDown = _read_tearDown
|
|
|
|
|
2009-04-07 20:56:57 -03:00
|
|
|
# use a similar approach to testing timeouts as test_timeout.py
|
|
|
|
# these will never pass 100% but make the fuzz big enough that it is rare
|
|
|
|
block_long = 0.6
|
|
|
|
block_short = 0.3
|
2009-04-05 23:08:44 -03:00
|
|
|
def test_read_until_A(self):
|
|
|
|
"""
|
|
|
|
read_until(expected, [timeout])
|
|
|
|
Read until the expected string has been seen, or a timeout is
|
|
|
|
hit (default is no timeout); may block.
|
|
|
|
"""
|
|
|
|
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
data = telnet.read_until('match')
|
|
|
|
self.assertEqual(data, ''.join(want[:-2]))
|
|
|
|
|
|
|
|
def test_read_until_B(self):
|
|
|
|
# test the timeout - it does NOT raise socket.timeout
|
2009-04-07 20:56:57 -03:00
|
|
|
want = ['hello', self.block_long, 'not seen', EOF_sigil]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-07 20:56:57 -03:00
|
|
|
data = telnet.read_until('not seen', self.block_short)
|
2009-04-05 23:08:44 -03:00
|
|
|
self.assertEqual(data, want[0])
|
2009-04-07 20:56:57 -03:00
|
|
|
self.assertEqual(telnet.read_all(), 'not seen')
|
2009-04-05 23:08:44 -03:00
|
|
|
|
|
|
|
def test_read_all_A(self):
|
|
|
|
"""
|
|
|
|
read_all()
|
|
|
|
Read all data until EOF; may block.
|
|
|
|
"""
|
|
|
|
want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
data = telnet.read_all()
|
|
|
|
self.assertEqual(data, ''.join(want[:-1]))
|
|
|
|
return
|
|
|
|
|
2009-04-07 17:22:59 -03:00
|
|
|
def _test_blocking(self, func):
|
2009-04-07 20:56:57 -03:00
|
|
|
self.dataq.put([self.block_long, EOF_sigil])
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
|
|
|
start = time.time()
|
|
|
|
data = func()
|
2009-04-07 20:56:57 -03:00
|
|
|
self.assertTrue(self.block_short <= time.time() - start)
|
2009-04-07 17:22:59 -03:00
|
|
|
|
2009-04-05 23:08:44 -03:00
|
|
|
def test_read_all_B(self):
|
|
|
|
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
|
|
|
|
|
2009-04-07 17:22:59 -03:00
|
|
|
def test_read_all_C(self):
|
|
|
|
self.dataq.put([EOF_sigil])
|
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
|
|
|
self.dataq.join()
|
|
|
|
telnet.read_all()
|
|
|
|
telnet.read_all() # shouldn't raise
|
|
|
|
|
2009-04-05 23:08:44 -03:00
|
|
|
def test_read_some_A(self):
|
|
|
|
"""
|
|
|
|
read_some()
|
|
|
|
Read at least one byte or EOF; may block.
|
|
|
|
"""
|
|
|
|
# test 'at least one byte'
|
|
|
|
want = ['x' * 500, EOF_sigil]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
data = telnet.read_all()
|
|
|
|
self.assertTrue(len(data) >= 1)
|
|
|
|
|
|
|
|
def test_read_some_B(self):
|
|
|
|
# test EOF
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put([EOF_sigil])
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
self.assertEqual('', telnet.read_some())
|
|
|
|
|
2009-04-07 17:22:59 -03:00
|
|
|
def test_read_some_C(self):
|
2009-04-05 23:08:44 -03:00
|
|
|
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
|
|
|
|
|
|
|
|
def _test_read_any_eager_A(self, func_name):
|
|
|
|
"""
|
|
|
|
read_very_eager()
|
|
|
|
Read all data available already queued or on the socket,
|
|
|
|
without blocking.
|
|
|
|
"""
|
2009-04-07 20:56:57 -03:00
|
|
|
want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
|
|
|
|
expects = want[1] + want[2]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
func = getattr(telnet, func_name)
|
|
|
|
data = ''
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
data += func()
|
|
|
|
self.assertTrue(expects.startswith(data))
|
|
|
|
except EOFError:
|
|
|
|
break
|
|
|
|
self.assertEqual(expects, data)
|
|
|
|
|
|
|
|
def _test_read_any_eager_B(self, func_name):
|
|
|
|
# test EOF
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put([EOF_sigil])
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-07 20:56:57 -03:00
|
|
|
time.sleep(self.block_short)
|
2009-04-05 23:08:44 -03:00
|
|
|
func = getattr(telnet, func_name)
|
|
|
|
self.assertRaises(EOFError, func)
|
|
|
|
|
|
|
|
# read_eager and read_very_eager make the same gaurantees
|
|
|
|
# (they behave differently but we only test the gaurantees)
|
|
|
|
def test_read_very_eager_A(self):
|
|
|
|
self._test_read_any_eager_A('read_very_eager')
|
|
|
|
def test_read_very_eager_B(self):
|
|
|
|
self._test_read_any_eager_B('read_very_eager')
|
|
|
|
def test_read_eager_A(self):
|
|
|
|
self._test_read_any_eager_A('read_eager')
|
|
|
|
def test_read_eager_B(self):
|
|
|
|
self._test_read_any_eager_B('read_eager')
|
|
|
|
# NB -- we need to test the IAC block which is mentioned in the docstring
|
|
|
|
# but not in the module docs
|
|
|
|
|
2009-04-07 20:56:57 -03:00
|
|
|
def _test_read_any_lazy_B(self, func_name):
|
|
|
|
self.dataq.put([EOF_sigil])
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
func = getattr(telnet, func_name)
|
2009-04-07 20:56:57 -03:00
|
|
|
telnet.fill_rawq()
|
|
|
|
self.assertRaises(EOFError, func)
|
|
|
|
|
|
|
|
def test_read_lazy_A(self):
|
|
|
|
want = ['x' * 100, EOF_sigil]
|
|
|
|
self.dataq.put(want)
|
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
|
|
|
self.dataq.join()
|
|
|
|
time.sleep(self.block_short)
|
|
|
|
self.assertEqual('', telnet.read_lazy())
|
2009-04-05 23:08:44 -03:00
|
|
|
data = ''
|
|
|
|
while True:
|
|
|
|
try:
|
2009-04-07 20:56:57 -03:00
|
|
|
read_data = telnet.read_lazy()
|
|
|
|
data += read_data
|
|
|
|
if not read_data:
|
|
|
|
telnet.fill_rawq()
|
2009-04-05 23:08:44 -03:00
|
|
|
except EOFError:
|
|
|
|
break
|
2009-04-07 20:56:57 -03:00
|
|
|
self.assertTrue(want[0].startswith(data))
|
|
|
|
self.assertEqual(data, want[0])
|
2009-04-05 23:08:44 -03:00
|
|
|
|
2009-04-07 20:56:57 -03:00
|
|
|
def test_read_lazy_B(self):
|
|
|
|
self._test_read_any_lazy_B('read_lazy')
|
|
|
|
|
|
|
|
def test_read_very_lazy_A(self):
|
|
|
|
want = ['x' * 100, EOF_sigil]
|
|
|
|
self.dataq.put(want)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-07 20:56:57 -03:00
|
|
|
time.sleep(self.block_short)
|
|
|
|
self.assertEqual('', telnet.read_very_lazy())
|
|
|
|
data = ''
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
read_data = telnet.read_very_lazy()
|
|
|
|
except EOFError:
|
|
|
|
break
|
|
|
|
data += read_data
|
|
|
|
if not read_data:
|
|
|
|
telnet.fill_rawq()
|
|
|
|
self.assertEqual('', telnet.cookedq)
|
|
|
|
telnet.process_rawq()
|
|
|
|
self.assertTrue(want[0].startswith(data))
|
|
|
|
self.assertEqual(data, want[0])
|
2009-04-05 23:08:44 -03:00
|
|
|
|
|
|
|
def test_read_very_lazy_B(self):
|
|
|
|
self._test_read_any_lazy_B('read_very_lazy')
|
|
|
|
|
|
|
|
class nego_collector(object):
|
|
|
|
def __init__(self, sb_getter=None):
|
|
|
|
self.seen = ''
|
|
|
|
self.sb_getter = sb_getter
|
|
|
|
self.sb_seen = ''
|
|
|
|
|
|
|
|
def do_nego(self, sock, cmd, opt):
|
|
|
|
self.seen += cmd + opt
|
|
|
|
if cmd == tl.SE and self.sb_getter:
|
|
|
|
sb_data = self.sb_getter()
|
|
|
|
self.sb_seen += sb_data
|
|
|
|
|
|
|
|
tl = telnetlib
|
|
|
|
class OptionTests(TestCase):
|
|
|
|
setUp = _read_setUp
|
|
|
|
tearDown = _read_tearDown
|
|
|
|
# RFC 854 commands
|
|
|
|
cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
|
|
|
|
|
|
|
|
def _test_command(self, data):
|
|
|
|
""" helper for testing IAC + cmd """
|
|
|
|
self.setUp()
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(data)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
nego = nego_collector()
|
|
|
|
telnet.set_option_negotiation_callback(nego.do_nego)
|
|
|
|
txt = telnet.read_all()
|
|
|
|
cmd = nego.seen
|
|
|
|
self.assertTrue(len(cmd) > 0) # we expect at least one command
|
2010-01-23 19:04:36 -04:00
|
|
|
self.assertIn(cmd[0], self.cmds)
|
2009-04-05 23:08:44 -03:00
|
|
|
self.assertEqual(cmd[1], tl.NOOPT)
|
|
|
|
self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
|
2009-04-07 20:56:57 -03:00
|
|
|
nego.sb_getter = None # break the nego => telnet cycle
|
2009-04-05 23:08:44 -03:00
|
|
|
self.tearDown()
|
|
|
|
|
|
|
|
def test_IAC_commands(self):
|
|
|
|
# reset our setup
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put([EOF_sigil])
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
self.tearDown()
|
|
|
|
|
|
|
|
for cmd in self.cmds:
|
|
|
|
self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
|
|
|
|
self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
|
|
|
|
self._test_command([tl.IAC + cmd, EOF_sigil])
|
|
|
|
# all at once
|
|
|
|
self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
|
2009-04-07 17:22:59 -03:00
|
|
|
self.assertEqual('', telnet.read_sb_data())
|
2009-04-05 23:08:44 -03:00
|
|
|
|
|
|
|
def test_SB_commands(self):
|
|
|
|
# RFC 855, subnegotiations portion
|
|
|
|
send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
|
|
|
|
tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
|
|
|
tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
|
|
|
|
tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
|
|
|
|
tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
|
|
|
|
EOF_sigil,
|
|
|
|
]
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.put(send)
|
2009-04-05 23:08:44 -03:00
|
|
|
telnet = telnetlib.Telnet(HOST, self.port)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.dataq.join()
|
2009-04-05 23:08:44 -03:00
|
|
|
nego = nego_collector(telnet.read_sb_data)
|
|
|
|
telnet.set_option_negotiation_callback(nego.do_nego)
|
|
|
|
txt = telnet.read_all()
|
|
|
|
self.assertEqual(txt, '')
|
|
|
|
want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
|
|
|
|
self.assertEqual(nego.sb_seen, want_sb_data)
|
2009-04-07 17:22:59 -03:00
|
|
|
self.assertEqual('', telnet.read_sb_data())
|
2009-04-07 20:56:57 -03:00
|
|
|
nego.sb_getter = None # break the nego => telnet cycle
|
2007-03-29 15:22:35 -03:00
|
|
|
|
|
|
|
def test_main(verbose=None):
|
2009-04-05 23:08:44 -03:00
|
|
|
test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
|
2007-03-29 15:22:35 -03:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
test_main()
|