2002-06-12 16:18:08 -03:00
|
|
|
#!/usr/bin/env python
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
import unittest
|
2002-07-23 16:04:11 -03:00
|
|
|
from test import test_support
|
1997-01-03 16:03:32 -04:00
|
|
|
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
import errno
|
1997-01-03 16:03:32 -04:00
|
|
|
import socket
|
2002-06-12 16:18:08 -03:00
|
|
|
import select
|
2008-03-28 01:53:10 -03:00
|
|
|
import time
|
|
|
|
import traceback
|
2008-05-25 04:20:14 -03:00
|
|
|
import Queue
|
2002-09-06 18:57:50 -03:00
|
|
|
import sys
|
2008-01-04 11:48:06 -04:00
|
|
|
import os
|
2006-05-26 09:03:27 -03:00
|
|
|
import array
|
2010-09-07 18:40:25 -03:00
|
|
|
import contextlib
|
2004-05-31 00:09:25 -03:00
|
|
|
from weakref import proxy
|
2006-08-02 03:46:21 -03:00
|
|
|
import signal
|
2010-09-27 15:16:46 -03:00
|
|
|
import math
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2010-08-14 14:04:46 -03:00
|
|
|
def try_address(host, port=0, family=socket.AF_INET):
|
|
|
|
"""Try to bind a socket on the given host:port and return True
|
|
|
|
if that has been possible."""
|
|
|
|
try:
|
|
|
|
sock = socket.socket(family, socket.SOCK_STREAM)
|
|
|
|
sock.bind((host, port))
|
|
|
|
except (socket.error, socket.gaierror):
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
sock.close()
|
|
|
|
return True
|
|
|
|
|
|
|
|
HOST = test_support.HOST
|
|
|
|
MSG = b'Michael Gilfix was here\n'
|
|
|
|
SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
try:
|
|
|
|
import thread
|
|
|
|
import threading
|
|
|
|
except ImportError:
|
|
|
|
thread = None
|
|
|
|
threading = None
|
|
|
|
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
HOST = test_support.HOST
|
2002-06-12 16:18:08 -03:00
|
|
|
MSG = 'Michael Gilfix was here\n'
|
|
|
|
|
|
|
|
class SocketTCPTest(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.port = test_support.bind_port(self.serv)
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv.listen(1)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.serv.close()
|
|
|
|
self.serv = None
|
|
|
|
|
|
|
|
class SocketUDPTest(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.port = test_support.bind_port(self.serv)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.serv.close()
|
|
|
|
self.serv = None
|
|
|
|
|
|
|
|
class ThreadableTest:
|
2002-06-18 15:35:13 -03:00
|
|
|
"""Threadable Test class
|
|
|
|
|
|
|
|
The ThreadableTest class makes it easy to create a threaded
|
|
|
|
client/server pair from an existing unit test. To create a
|
|
|
|
new threaded class from an existing unit test, use multiple
|
|
|
|
inheritance:
|
|
|
|
|
|
|
|
class NewClass (OldClass, ThreadableTest):
|
|
|
|
pass
|
|
|
|
|
|
|
|
This class defines two new fixture functions with obvious
|
|
|
|
purposes for overriding:
|
|
|
|
|
|
|
|
clientSetUp ()
|
|
|
|
clientTearDown ()
|
|
|
|
|
|
|
|
Any new test functions within the class must then define
|
|
|
|
tests in pairs, where the test name is preceeded with a
|
|
|
|
'_' to indicate the client portion of the test. Ex:
|
|
|
|
|
|
|
|
def testFoo(self):
|
|
|
|
# Server portion
|
|
|
|
|
|
|
|
def _testFoo(self):
|
|
|
|
# Client portion
|
|
|
|
|
|
|
|
Any exceptions raised by the clients during their tests
|
|
|
|
are caught and transferred to the main thread to alert
|
|
|
|
the testing framework.
|
|
|
|
|
|
|
|
Note, the server setup function cannot call any blocking
|
|
|
|
functions that rely on the client thread during setup,
|
2007-03-23 15:54:07 -03:00
|
|
|
unless serverExplicitReady() is called just before
|
2002-06-18 15:35:13 -03:00
|
|
|
the blocking call (such as in setting up a client/server
|
|
|
|
connection and performing the accept() in setUp().
|
|
|
|
"""
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
# Swap the true setup function
|
|
|
|
self.__setUp = self.setUp
|
|
|
|
self.__tearDown = self.tearDown
|
|
|
|
self.setUp = self._setUp
|
|
|
|
self.tearDown = self._tearDown
|
|
|
|
|
2002-06-18 15:35:13 -03:00
|
|
|
def serverExplicitReady(self):
|
|
|
|
"""This method allows the server to explicitly indicate that
|
|
|
|
it wants the client thread to proceed. This is useful if the
|
|
|
|
server is about to execute a blocking routine that is
|
|
|
|
dependent upon the client thread during its setup routine."""
|
|
|
|
self.server_ready.set()
|
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def _setUp(self):
|
2002-06-18 15:35:13 -03:00
|
|
|
self.server_ready = threading.Event()
|
|
|
|
self.client_ready = threading.Event()
|
2002-06-12 16:18:08 -03:00
|
|
|
self.done = threading.Event()
|
2008-05-25 04:20:14 -03:00
|
|
|
self.queue = Queue.Queue(1)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
# Do some munging to start the client test.
|
2002-06-13 12:07:44 -03:00
|
|
|
methodname = self.id()
|
|
|
|
i = methodname.rfind('.')
|
|
|
|
methodname = methodname[i+1:]
|
|
|
|
test_method = getattr(self, '_' + methodname)
|
2002-06-12 18:29:43 -03:00
|
|
|
self.client_thread = thread.start_new_thread(
|
|
|
|
self.clientRun, (test_method,))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
self.__setUp()
|
2008-06-11 14:27:50 -03:00
|
|
|
if not self.server_ready.is_set():
|
2002-06-18 15:35:13 -03:00
|
|
|
self.server_ready.set()
|
|
|
|
self.client_ready.wait()
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _tearDown(self):
|
|
|
|
self.__tearDown()
|
|
|
|
self.done.wait()
|
|
|
|
|
|
|
|
if not self.queue.empty():
|
|
|
|
msg = self.queue.get()
|
|
|
|
self.fail(msg)
|
|
|
|
|
|
|
|
def clientRun(self, test_func):
|
2002-06-18 15:35:13 -03:00
|
|
|
self.server_ready.wait()
|
|
|
|
self.client_ready.set()
|
2002-06-12 16:18:08 -03:00
|
|
|
self.clientSetUp()
|
2010-03-20 22:14:24 -03:00
|
|
|
with test_support.check_py3k_warnings():
|
|
|
|
if not callable(test_func):
|
|
|
|
raise TypeError("test_func must be a callable function.")
|
2002-06-12 16:18:08 -03:00
|
|
|
try:
|
|
|
|
test_func()
|
|
|
|
except Exception, strerror:
|
|
|
|
self.queue.put(strerror)
|
|
|
|
self.clientTearDown()
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
2010-03-20 22:14:24 -03:00
|
|
|
raise NotImplementedError("clientSetUp must be implemented.")
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.done.set()
|
|
|
|
thread.exit()
|
|
|
|
|
|
|
|
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.cli.close()
|
|
|
|
self.cli = None
|
|
|
|
ThreadableTest.clientTearDown(self)
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketUDPTest.__init__(self, methodName=methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def clientSetUp(self):
|
|
|
|
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
class SocketConnectedTest(ThreadedTCPSocketTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
ThreadedTCPSocketTest.setUp(self)
|
2002-06-18 15:35:13 -03:00
|
|
|
# Indicate explicitly we're ready for the client thread to
|
|
|
|
# proceed and then perform the blocking call to accept
|
|
|
|
self.serverExplicitReady()
|
2002-06-12 16:18:08 -03:00
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
self.cli_conn = conn
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.cli_conn.close()
|
|
|
|
self.cli_conn = None
|
|
|
|
ThreadedTCPSocketTest.tearDown(self)
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
ThreadedTCPSocketTest.clientSetUp(self)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.connect((HOST, self.port))
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv_conn = self.cli
|
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.serv_conn.close()
|
|
|
|
self.serv_conn = None
|
|
|
|
ThreadedTCPSocketTest.clientTearDown(self)
|
|
|
|
|
2004-08-09 01:51:41 -03:00
|
|
|
class SocketPairTest(unittest.TestCase, ThreadableTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
unittest.TestCase.__init__(self, methodName=methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.serv, self.cli = socket.socketpair()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.serv.close()
|
|
|
|
self.serv = None
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.cli.close()
|
|
|
|
self.cli = None
|
|
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
|
2004-08-09 15:54:11 -03:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
#######################################################################
|
|
|
|
## Begin Tests
|
|
|
|
|
|
|
|
class GeneralModuleTests(unittest.TestCase):
|
|
|
|
|
2004-05-31 00:09:25 -03:00
|
|
|
def test_weakref(self):
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
p = proxy(s)
|
|
|
|
self.assertEqual(p.fileno(), s.fileno())
|
|
|
|
s.close()
|
|
|
|
s = None
|
|
|
|
try:
|
|
|
|
p.fileno()
|
|
|
|
except ReferenceError:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
self.fail('Socket proxy still exists')
|
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def testSocketError(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing socket module exceptions
|
2002-06-12 16:18:08 -03:00
|
|
|
def raise_error(*args, **kwargs):
|
|
|
|
raise socket.error
|
|
|
|
def raise_herror(*args, **kwargs):
|
|
|
|
raise socket.herror
|
|
|
|
def raise_gaierror(*args, **kwargs):
|
|
|
|
raise socket.gaierror
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.error, raise_error,
|
2002-06-12 16:18:08 -03:00
|
|
|
"Error raising socket exception.")
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.error, raise_herror,
|
2002-06-12 16:18:08 -03:00
|
|
|
"Error raising socket exception.")
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.error, raise_gaierror,
|
2002-06-12 16:18:08 -03:00
|
|
|
"Error raising socket exception.")
|
|
|
|
|
|
|
|
def testCrucialConstants(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing for mission critical constants
|
2002-06-12 16:18:08 -03:00
|
|
|
socket.AF_INET
|
|
|
|
socket.SOCK_STREAM
|
|
|
|
socket.SOCK_DGRAM
|
|
|
|
socket.SOCK_RAW
|
|
|
|
socket.SOCK_RDM
|
|
|
|
socket.SOCK_SEQPACKET
|
|
|
|
socket.SOL_SOCKET
|
|
|
|
socket.SO_REUSEADDR
|
|
|
|
|
2002-06-13 17:24:17 -03:00
|
|
|
def testHostnameRes(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing hostname resolution mechanisms
|
2002-06-13 17:24:17 -03:00
|
|
|
hostname = socket.gethostname()
|
2002-12-26 12:55:15 -04:00
|
|
|
try:
|
|
|
|
ip = socket.gethostbyname(hostname)
|
|
|
|
except socket.error:
|
|
|
|
# Probably name lookup wasn't set up right; skip this test
|
|
|
|
return
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
|
2002-12-26 13:04:45 -04:00
|
|
|
try:
|
|
|
|
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
|
|
|
|
except socket.error:
|
|
|
|
# Probably a similar problem as above; skip this test
|
|
|
|
return
|
2005-03-10 20:04:17 -04:00
|
|
|
all_host_names = [hostname, hname] + aliases
|
2006-04-03 05:10:33 -03:00
|
|
|
fqhn = socket.getfqdn(ip)
|
2002-06-13 17:24:17 -03:00
|
|
|
if not fqhn in all_host_names:
|
2006-03-26 12:40:47 -04:00
|
|
|
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2002-06-12 18:19:40 -03:00
|
|
|
def testRefCountGetNameInfo(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing reference count for getnameinfo
|
2002-06-12 18:19:40 -03:00
|
|
|
if hasattr(sys, "getrefcount"):
|
2002-06-12 16:18:08 -03:00
|
|
|
try:
|
|
|
|
# On some versions, this loses a reference
|
|
|
|
orig = sys.getrefcount(__name__)
|
|
|
|
socket.getnameinfo(__name__,0)
|
2009-09-19 04:35:07 -03:00
|
|
|
except TypeError:
|
2010-03-20 22:14:24 -03:00
|
|
|
self.assertEqual(sys.getrefcount(__name__), orig,
|
|
|
|
"socket.getnameinfo loses a reference")
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def testInterpreterCrash(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Making sure getnameinfo doesn't crash the interpreter
|
2002-06-12 16:18:08 -03:00
|
|
|
try:
|
|
|
|
# On some versions, this crashes the interpreter.
|
|
|
|
socket.getnameinfo(('x', 0, 0, 0), 0)
|
|
|
|
except socket.error:
|
|
|
|
pass
|
|
|
|
|
2002-09-15 22:30:03 -03:00
|
|
|
def testNtoH(self):
|
2002-09-13 21:58:46 -03:00
|
|
|
# This just checks that htons etc. are their own inverse,
|
|
|
|
# when looking at the lower 16 or 32 bits.
|
|
|
|
sizes = {socket.htonl: 32, socket.ntohl: 32,
|
|
|
|
socket.htons: 16, socket.ntohs: 16}
|
|
|
|
for func, size in sizes.items():
|
|
|
|
mask = (1L<<size) - 1
|
|
|
|
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
|
|
|
|
self.assertEqual(i & mask, func(func(i&mask)) & mask)
|
|
|
|
|
|
|
|
swapped = func(mask)
|
|
|
|
self.assertEqual(swapped & mask, mask)
|
|
|
|
self.assertRaises(OverflowError, func, 1L<<34)
|
2002-07-25 13:01:12 -03:00
|
|
|
|
2007-01-14 13:03:32 -04:00
|
|
|
def testNtoHErrors(self):
|
|
|
|
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
|
|
|
|
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
|
|
|
|
for k in good_values:
|
|
|
|
socket.ntohl(k)
|
|
|
|
socket.ntohs(k)
|
|
|
|
socket.htonl(k)
|
|
|
|
socket.htons(k)
|
|
|
|
for k in bad_values:
|
|
|
|
self.assertRaises(OverflowError, socket.ntohl, k)
|
|
|
|
self.assertRaises(OverflowError, socket.ntohs, k)
|
|
|
|
self.assertRaises(OverflowError, socket.htonl, k)
|
|
|
|
self.assertRaises(OverflowError, socket.htons, k)
|
|
|
|
|
2004-06-27 21:50:43 -03:00
|
|
|
def testGetServBy(self):
|
|
|
|
eq = self.assertEqual
|
|
|
|
# Find one service that exists, then check all the related interfaces.
|
|
|
|
# I've ordered this by protocols that have both a tcp and udp
|
|
|
|
# protocol, at least for modern Linuxes.
|
2004-11-20 17:10:07 -04:00
|
|
|
if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6',
|
2007-10-28 09:38:09 -03:00
|
|
|
'freebsd7', 'freebsd8', 'darwin'):
|
2004-07-12 09:10:30 -03:00
|
|
|
# avoid the 'echo' service on this platform, as there is an
|
|
|
|
# assumption breaking non-standard port/protocol entry
|
|
|
|
services = ('daytime', 'qotd', 'domain')
|
|
|
|
else:
|
|
|
|
services = ('echo', 'daytime', 'domain')
|
|
|
|
for service in services:
|
2002-08-02 12:52:30 -03:00
|
|
|
try:
|
2004-06-27 21:50:43 -03:00
|
|
|
port = socket.getservbyname(service, 'tcp')
|
2002-08-02 12:52:30 -03:00
|
|
|
break
|
|
|
|
except socket.error:
|
|
|
|
pass
|
2004-02-10 11:51:15 -04:00
|
|
|
else:
|
|
|
|
raise socket.error
|
2004-06-27 21:50:43 -03:00
|
|
|
# Try same call with optional protocol omitted
|
|
|
|
port2 = socket.getservbyname(service)
|
|
|
|
eq(port, port2)
|
|
|
|
# Try udp, but don't barf it it doesn't exist
|
|
|
|
try:
|
|
|
|
udpport = socket.getservbyname(service, 'udp')
|
|
|
|
except socket.error:
|
|
|
|
udpport = None
|
|
|
|
else:
|
|
|
|
eq(udpport, port)
|
|
|
|
# Now make sure the lookup by port returns the same service name
|
|
|
|
eq(socket.getservbyport(port2), service)
|
|
|
|
eq(socket.getservbyport(port, 'tcp'), service)
|
|
|
|
if udpport is not None:
|
|
|
|
eq(socket.getservbyport(udpport, 'udp'), service)
|
2009-01-31 18:57:30 -04:00
|
|
|
# Make sure getservbyport does not accept out of range ports.
|
|
|
|
self.assertRaises(OverflowError, socket.getservbyport, -1)
|
|
|
|
self.assertRaises(OverflowError, socket.getservbyport, 65536)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2002-07-18 14:08:35 -03:00
|
|
|
def testDefaultTimeout(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing default timeout
|
2002-07-18 14:08:35 -03:00
|
|
|
# The default timeout should initially be None
|
|
|
|
self.assertEqual(socket.getdefaulttimeout(), None)
|
|
|
|
s = socket.socket()
|
|
|
|
self.assertEqual(s.gettimeout(), None)
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
# Set the default timeout to 10, and see if it propagates
|
|
|
|
socket.setdefaulttimeout(10)
|
|
|
|
self.assertEqual(socket.getdefaulttimeout(), 10)
|
|
|
|
s = socket.socket()
|
|
|
|
self.assertEqual(s.gettimeout(), 10)
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
# Reset the default timeout to None, and see if it propagates
|
|
|
|
socket.setdefaulttimeout(None)
|
|
|
|
self.assertEqual(socket.getdefaulttimeout(), None)
|
|
|
|
s = socket.socket()
|
|
|
|
self.assertEqual(s.gettimeout(), None)
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
# Check that setting it to an invalid value raises ValueError
|
|
|
|
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
|
|
|
|
|
|
|
|
# Check that setting it to an invalid type raises TypeError
|
|
|
|
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
|
|
|
|
|
2009-02-11 19:45:25 -04:00
|
|
|
def testIPv4_inet_aton_fourbytes(self):
|
|
|
|
if not hasattr(socket, 'inet_aton'):
|
|
|
|
return # No inet_aton, nothing to check
|
|
|
|
# Test that issue1008086 and issue767150 are fixed.
|
|
|
|
# It must return 4 bytes.
|
|
|
|
self.assertEquals('\x00'*4, socket.inet_aton('0.0.0.0'))
|
|
|
|
self.assertEquals('\xff'*4, socket.inet_aton('255.255.255.255'))
|
|
|
|
|
2003-04-25 02:48:32 -03:00
|
|
|
def testIPv4toString(self):
|
2003-04-25 12:11:23 -03:00
|
|
|
if not hasattr(socket, 'inet_pton'):
|
|
|
|
return # No inet_pton() on this platform
|
2003-04-25 02:48:32 -03:00
|
|
|
from socket import inet_aton as f, inet_pton, AF_INET
|
|
|
|
g = lambda a: inet_pton(AF_INET, a)
|
|
|
|
|
|
|
|
self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
|
|
|
|
self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
|
|
|
|
self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
|
|
|
|
self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
|
2005-08-26 05:34:00 -03:00
|
|
|
self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
|
2003-04-25 02:48:32 -03:00
|
|
|
|
|
|
|
self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
|
|
|
|
self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
|
|
|
|
self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
|
2005-08-26 05:34:00 -03:00
|
|
|
self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
|
2003-05-12 17:19:37 -03:00
|
|
|
|
2003-04-25 02:48:32 -03:00
|
|
|
def testIPv6toString(self):
|
2003-04-25 12:11:23 -03:00
|
|
|
if not hasattr(socket, 'inet_pton'):
|
|
|
|
return # No inet_pton() on this platform
|
2003-04-25 02:48:32 -03:00
|
|
|
try:
|
|
|
|
from socket import inet_pton, AF_INET6, has_ipv6
|
|
|
|
if not has_ipv6:
|
|
|
|
return
|
|
|
|
except ImportError:
|
|
|
|
return
|
|
|
|
f = lambda a: inet_pton(AF_INET6, a)
|
|
|
|
|
|
|
|
self.assertEquals('\x00' * 16, f('::'))
|
|
|
|
self.assertEquals('\x00' * 16, f('0::0'))
|
|
|
|
self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
|
|
|
|
self.assertEquals(
|
|
|
|
'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
|
|
|
|
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
|
|
|
|
)
|
2003-05-12 17:19:37 -03:00
|
|
|
|
2003-04-25 02:48:32 -03:00
|
|
|
def testStringToIPv4(self):
|
2003-04-25 12:11:23 -03:00
|
|
|
if not hasattr(socket, 'inet_ntop'):
|
|
|
|
return # No inet_ntop() on this platform
|
2003-04-25 02:48:32 -03:00
|
|
|
from socket import inet_ntoa as f, inet_ntop, AF_INET
|
|
|
|
g = lambda a: inet_ntop(AF_INET, a)
|
|
|
|
|
|
|
|
self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
|
|
|
|
self.assertEquals('170.85.170.85', f('\xaa\x55\xaa\x55'))
|
|
|
|
self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
|
|
|
|
self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
|
2003-05-12 17:19:37 -03:00
|
|
|
|
2003-04-25 02:48:32 -03:00
|
|
|
self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
|
|
|
|
self.assertEquals('170.85.170.85', g('\xaa\x55\xaa\x55'))
|
|
|
|
self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
|
|
|
|
|
|
|
|
def testStringToIPv6(self):
|
2003-04-25 12:11:23 -03:00
|
|
|
if not hasattr(socket, 'inet_ntop'):
|
|
|
|
return # No inet_ntop() on this platform
|
2003-04-25 02:48:32 -03:00
|
|
|
try:
|
|
|
|
from socket import inet_ntop, AF_INET6, has_ipv6
|
|
|
|
if not has_ipv6:
|
|
|
|
return
|
|
|
|
except ImportError:
|
|
|
|
return
|
|
|
|
f = lambda a: inet_ntop(AF_INET6, a)
|
|
|
|
|
|
|
|
self.assertEquals('::', f('\x00' * 16))
|
|
|
|
self.assertEquals('::1', f('\x00' * 15 + '\x01'))
|
|
|
|
self.assertEquals(
|
|
|
|
'aef:b01:506:1001:ffff:9997:55:170',
|
|
|
|
f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
|
|
|
|
)
|
|
|
|
|
2002-07-19 09:46:46 -03:00
|
|
|
# XXX The following don't test module-level functionality...
|
2002-07-18 14:08:35 -03:00
|
|
|
|
2009-01-31 18:57:30 -04:00
|
|
|
def _get_unused_port(self, bind_address='0.0.0.0'):
|
|
|
|
"""Use a temporary socket to elicit an unused ephemeral port.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
bind_address: Hostname or IP address to search for a port on.
|
|
|
|
|
|
|
|
Returns: A most likely to be unused port.
|
|
|
|
"""
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
tempsock = socket.socket()
|
2009-01-31 18:57:30 -04:00
|
|
|
tempsock.bind((bind_address, 0))
|
|
|
|
host, port = tempsock.getsockname()
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
tempsock.close()
|
2009-01-31 18:57:30 -04:00
|
|
|
return port
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
|
2009-01-31 18:57:30 -04:00
|
|
|
def testSockName(self):
|
|
|
|
# Testing getsockname()
|
|
|
|
port = self._get_unused_port()
|
2002-06-12 16:18:08 -03:00
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
sock.bind(("0.0.0.0", port))
|
2002-06-12 16:18:08 -03:00
|
|
|
name = sock.getsockname()
|
2006-06-11 18:38:38 -03:00
|
|
|
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
|
|
|
|
# it reasonable to get the host's addr in addition to 0.0.0.0.
|
|
|
|
# At least for eCos. This is required for the S/390 to pass.
|
|
|
|
my_ip_addr = socket.gethostbyname(socket.gethostname())
|
2010-01-23 19:04:36 -04:00
|
|
|
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.assertEqual(name[1], port)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def testGetSockOpt(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing getsockopt()
|
2002-06-12 16:18:08 -03:00
|
|
|
# We know a socket should start without reuse==0
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertFalse(reuse != 0, "initial mode is reuse")
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def testSetSockOpt(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing setsockopt()
|
2002-06-12 16:18:08 -03:00
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertFalse(reuse == 0, "failed to set reuse mode")
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2002-07-19 09:46:46 -03:00
|
|
|
def testSendAfterClose(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# testing send() after close() with timeout
|
2002-07-19 09:46:46 -03:00
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
sock.settimeout(1)
|
|
|
|
sock.close()
|
|
|
|
self.assertRaises(socket.error, sock.send, "spam")
|
|
|
|
|
2006-03-21 14:17:25 -04:00
|
|
|
def testNewAttributes(self):
|
|
|
|
# testing .family, .type and .protocol
|
2006-03-17 15:17:34 -04:00
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
2006-03-21 14:17:25 -04:00
|
|
|
self.assertEqual(sock.family, socket.AF_INET)
|
|
|
|
self.assertEqual(sock.type, socket.SOCK_STREAM)
|
|
|
|
self.assertEqual(sock.proto, 0)
|
2006-03-17 15:17:34 -04:00
|
|
|
sock.close()
|
|
|
|
|
2009-01-31 18:57:30 -04:00
|
|
|
def test_getsockaddrarg(self):
|
|
|
|
host = '0.0.0.0'
|
|
|
|
port = self._get_unused_port(bind_address=host)
|
|
|
|
big_port = port + 65536
|
|
|
|
neg_port = port - 65536
|
|
|
|
sock = socket.socket()
|
|
|
|
try:
|
|
|
|
self.assertRaises(OverflowError, sock.bind, (host, big_port))
|
|
|
|
self.assertRaises(OverflowError, sock.bind, (host, neg_port))
|
|
|
|
sock.bind((host, port))
|
|
|
|
finally:
|
|
|
|
sock.close()
|
|
|
|
|
2008-01-04 11:48:06 -04:00
|
|
|
def test_sock_ioctl(self):
|
|
|
|
if os.name != "nt":
|
|
|
|
return
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(hasattr(socket.socket, 'ioctl'))
|
|
|
|
self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
|
|
|
|
self.assertTrue(hasattr(socket, 'RCVALL_ON'))
|
|
|
|
self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
|
2009-09-25 12:19:51 -03:00
|
|
|
self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
|
|
|
|
s = socket.socket()
|
|
|
|
self.assertRaises(ValueError, s.ioctl, -1, None)
|
|
|
|
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
|
2008-01-04 11:48:06 -04:00
|
|
|
|
2010-08-14 14:04:46 -03:00
|
|
|
def testGetaddrinfo(self):
|
|
|
|
try:
|
|
|
|
socket.getaddrinfo('localhost', 80)
|
|
|
|
except socket.gaierror as err:
|
|
|
|
if err.errno == socket.EAI_SERVICE:
|
|
|
|
# see http://bugs.python.org/issue1282647
|
|
|
|
self.skipTest("buggy libc version")
|
|
|
|
raise
|
|
|
|
# len of every sequence is supposed to be == 5
|
|
|
|
for info in socket.getaddrinfo(HOST, None):
|
|
|
|
self.assertEqual(len(info), 5)
|
|
|
|
# host can be a domain name, a string representation of an
|
|
|
|
# IPv4/v6 address or None
|
|
|
|
socket.getaddrinfo('localhost', 80)
|
|
|
|
socket.getaddrinfo('127.0.0.1', 80)
|
|
|
|
socket.getaddrinfo(None, 80)
|
|
|
|
if SUPPORTS_IPV6:
|
|
|
|
socket.getaddrinfo('::1', 80)
|
|
|
|
# port can be a string service name such as "http", a numeric
|
|
|
|
# port number or None
|
|
|
|
socket.getaddrinfo(HOST, "http")
|
|
|
|
socket.getaddrinfo(HOST, 80)
|
|
|
|
socket.getaddrinfo(HOST, None)
|
|
|
|
# test family and socktype filters
|
|
|
|
infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
|
|
|
|
for family, _, _, _, _ in infos:
|
|
|
|
self.assertEqual(family, socket.AF_INET)
|
|
|
|
infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
|
|
|
|
for _, socktype, _, _, _ in infos:
|
|
|
|
self.assertEqual(socktype, socket.SOCK_STREAM)
|
|
|
|
# test proto and flags arguments
|
2010-08-16 02:10:30 -03:00
|
|
|
socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
|
2010-08-14 14:04:46 -03:00
|
|
|
socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
|
|
|
|
# a server willing to support both IPv4 and IPv6 will
|
|
|
|
# usually do this
|
|
|
|
socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
|
|
|
|
socket.AI_PASSIVE)
|
|
|
|
|
2008-01-04 11:48:06 -04:00
|
|
|
|
2010-09-27 15:16:46 -03:00
|
|
|
def check_sendall_interrupted(self, with_timeout):
|
|
|
|
# socketpair() is not stricly required, but it makes things easier.
|
|
|
|
if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
|
|
|
|
self.skipTest("signal.alarm and socket.socketpair required for this test")
|
|
|
|
# Our signal handlers clobber the C errno by calling a math function
|
|
|
|
# with an invalid domain value.
|
|
|
|
def ok_handler(*args):
|
|
|
|
self.assertRaises(ValueError, math.acosh, 0)
|
|
|
|
def raising_handler(*args):
|
|
|
|
self.assertRaises(ValueError, math.acosh, 0)
|
|
|
|
1 // 0
|
|
|
|
c, s = socket.socketpair()
|
|
|
|
old_alarm = signal.signal(signal.SIGALRM, raising_handler)
|
|
|
|
try:
|
|
|
|
if with_timeout:
|
|
|
|
# Just above the one second minimum for signal.alarm
|
|
|
|
c.settimeout(1.5)
|
|
|
|
with self.assertRaises(ZeroDivisionError):
|
|
|
|
signal.alarm(1)
|
|
|
|
c.sendall(b"x" * (1024**2))
|
|
|
|
if with_timeout:
|
|
|
|
signal.signal(signal.SIGALRM, ok_handler)
|
|
|
|
signal.alarm(1)
|
|
|
|
self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
|
|
|
|
finally:
|
|
|
|
signal.signal(signal.SIGALRM, old_alarm)
|
|
|
|
c.close()
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
def test_sendall_interrupted(self):
|
|
|
|
self.check_sendall_interrupted(False)
|
|
|
|
|
|
|
|
def test_sendall_interrupted_with_timeout(self):
|
|
|
|
self.check_sendall_interrupted(True)
|
|
|
|
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2002-06-12 16:18:08 -03:00
|
|
|
class BasicTCPTest(SocketConnectedTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def testRecv(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing large receive over TCP
|
2002-06-12 16:18:08 -03:00
|
|
|
msg = self.cli_conn.recv(1024)
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testRecv(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
|
|
|
|
def testOverFlowRecv(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing receive in chunks over TCP
|
2002-06-12 16:18:08 -03:00
|
|
|
seg1 = self.cli_conn.recv(len(MSG) - 3)
|
|
|
|
seg2 = self.cli_conn.recv(1024)
|
2002-06-12 18:29:43 -03:00
|
|
|
msg = seg1 + seg2
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testOverFlowRecv(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
|
|
|
|
def testRecvFrom(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing large recvfrom() over TCP
|
2002-06-12 16:18:08 -03:00
|
|
|
msg, addr = self.cli_conn.recvfrom(1024)
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testRecvFrom(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
|
|
|
|
def testOverFlowRecvFrom(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing recvfrom() in chunks over TCP
|
2002-06-12 16:18:08 -03:00
|
|
|
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
|
|
|
|
seg2, addr = self.cli_conn.recvfrom(1024)
|
2002-06-12 18:29:43 -03:00
|
|
|
msg = seg1 + seg2
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testOverFlowRecvFrom(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
|
|
|
|
def testSendAll(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing sendall() with a 2048 byte string over TCP
|
2002-08-08 17:28:34 -03:00
|
|
|
msg = ''
|
2002-06-12 16:18:08 -03:00
|
|
|
while 1:
|
|
|
|
read = self.cli_conn.recv(1024)
|
|
|
|
if not read:
|
|
|
|
break
|
2002-08-08 17:28:34 -03:00
|
|
|
msg += read
|
|
|
|
self.assertEqual(msg, 'f' * 2048)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testSendAll(self):
|
2002-06-12 18:29:43 -03:00
|
|
|
big_chunk = 'f' * 2048
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv_conn.sendall(big_chunk)
|
|
|
|
|
|
|
|
def testFromFd(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing fromfd()
|
2002-06-12 17:55:17 -03:00
|
|
|
if not hasattr(socket, "fromfd"):
|
2002-06-12 17:48:59 -03:00
|
|
|
return # On Windows, this doesn't exist
|
2002-06-12 16:18:08 -03:00
|
|
|
fd = self.cli_conn.fileno()
|
|
|
|
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
msg = sock.recv(1024)
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testFromFd(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
|
|
|
|
def testShutdown(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing shutdown()
|
2002-06-12 16:18:08 -03:00
|
|
|
msg = self.cli_conn.recv(1024)
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2009-01-15 10:54:37 -04:00
|
|
|
# wait for _testShutdown to finish: on OS X, when the server
|
|
|
|
# closes the connection the client also becomes disconnected,
|
2009-01-15 10:58:28 -04:00
|
|
|
# and the client's shutdown call will fail. (Issue #4397.)
|
2009-01-15 10:54:37 -04:00
|
|
|
self.done.wait()
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testShutdown(self):
|
|
|
|
self.serv_conn.send(MSG)
|
|
|
|
self.serv_conn.shutdown(2)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2002-06-12 16:18:08 -03:00
|
|
|
class BasicUDPTest(ThreadedUDPSocketTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def testSendtoAndRecv(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing sendto() and Recv() over UDP
|
2002-06-12 16:18:08 -03:00
|
|
|
msg = self.serv.recv(len(MSG))
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testSendtoAndRecv(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2002-06-12 18:17:20 -03:00
|
|
|
def testRecvFrom(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing recvfrom() over UDP
|
2002-06-12 16:18:08 -03:00
|
|
|
msg, addr = self.serv.recvfrom(len(MSG))
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2002-06-12 18:17:20 -03:00
|
|
|
def _testRecvFrom(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
2007-03-28 00:45:20 -03:00
|
|
|
def testRecvFromNegative(self):
|
|
|
|
# Negative lengths passed to recvfrom should give ValueError.
|
|
|
|
self.assertRaises(ValueError, self.serv.recvfrom, -1)
|
|
|
|
|
|
|
|
def _testRecvFromNegative(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
2007-03-28 00:45:20 -03:00
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2006-07-01 12:33:37 -03:00
|
|
|
class TCPCloserTest(ThreadedTCPSocketTest):
|
|
|
|
|
|
|
|
def testClose(self):
|
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
sd = self.cli
|
|
|
|
read, write, err = select.select([sd], [], [], 1.0)
|
|
|
|
self.assertEqual(read, [sd])
|
|
|
|
self.assertEqual(sd.recv(1), '')
|
|
|
|
|
|
|
|
def _testClose(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.connect((HOST, self.port))
|
2006-07-01 12:33:37 -03:00
|
|
|
time.sleep(1.0)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2004-08-09 01:51:41 -03:00
|
|
|
class BasicSocketPairTest(SocketPairTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketPairTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def testRecv(self):
|
|
|
|
msg = self.serv.recv(1024)
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
def _testRecv(self):
|
|
|
|
self.cli.send(MSG)
|
|
|
|
|
|
|
|
def testSend(self):
|
|
|
|
self.serv.send(MSG)
|
|
|
|
|
|
|
|
def _testSend(self):
|
|
|
|
msg = self.cli.recv(1024)
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2002-06-12 16:18:08 -03:00
|
|
|
class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def testSetBlocking(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing whether set blocking works
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv.setblocking(0)
|
|
|
|
start = time.time()
|
|
|
|
try:
|
|
|
|
self.serv.accept()
|
|
|
|
except socket.error:
|
|
|
|
pass
|
|
|
|
end = time.time()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testSetBlocking(self):
|
2001-03-23 13:40:16 -04:00
|
|
|
pass
|
1997-01-03 16:03:32 -04:00
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def testAccept(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing non-blocking accept
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv.setblocking(0)
|
|
|
|
try:
|
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
except socket.error:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
self.fail("Error trying to do non-blocking accept.")
|
|
|
|
read, write, err = select.select([self.serv], [], [])
|
|
|
|
if self.serv in read:
|
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
else:
|
|
|
|
self.fail("Error trying to do accept after select.")
|
|
|
|
|
|
|
|
def _testAccept(self):
|
2002-07-19 09:46:46 -03:00
|
|
|
time.sleep(0.1)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.connect((HOST, self.port))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def testConnect(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing non-blocking connect
|
2002-06-12 16:18:08 -03:00
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
|
|
|
|
def _testConnect(self):
|
2002-06-13 13:07:04 -03:00
|
|
|
self.cli.settimeout(10)
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.connect((HOST, self.port))
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def testRecv(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Testing non-blocking recv
|
2002-06-12 16:18:08 -03:00
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
conn.setblocking(0)
|
|
|
|
try:
|
|
|
|
msg = conn.recv(len(MSG))
|
|
|
|
except socket.error:
|
|
|
|
pass
|
|
|
|
else:
|
|
|
|
self.fail("Error trying to do non-blocking recv.")
|
|
|
|
read, write, err = select.select([conn], [], [])
|
|
|
|
if conn in read:
|
|
|
|
msg = conn.recv(len(MSG))
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
else:
|
|
|
|
self.fail("Error during select call to non-blocking socket.")
|
|
|
|
|
|
|
|
def _testRecv(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli.connect((HOST, self.port))
|
2002-07-19 09:46:46 -03:00
|
|
|
time.sleep(0.1)
|
2002-06-12 16:18:08 -03:00
|
|
|
self.cli.send(MSG)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2002-06-12 16:18:08 -03:00
|
|
|
class FileObjectClassTestCase(SocketConnectedTest):
|
|
|
|
|
2002-08-07 12:46:19 -03:00
|
|
|
bufsize = -1 # Use default buffer size
|
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
SocketConnectedTest.setUp(self)
|
2002-08-07 12:46:19 -03:00
|
|
|
self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.serv_file.close()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(self.serv_file.closed)
|
2002-06-12 16:18:08 -03:00
|
|
|
self.serv_file = None
|
|
|
|
SocketConnectedTest.tearDown(self)
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
SocketConnectedTest.clientSetUp(self)
|
2002-08-07 12:46:19 -03:00
|
|
|
self.cli_file = self.serv_conn.makefile('wb')
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.cli_file.close()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(self.cli_file.closed)
|
2002-06-12 16:18:08 -03:00
|
|
|
self.cli_file = None
|
|
|
|
SocketConnectedTest.clientTearDown(self)
|
|
|
|
|
|
|
|
def testSmallRead(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Performing small file read test
|
2002-06-12 16:18:08 -03:00
|
|
|
first_seg = self.serv_file.read(len(MSG)-3)
|
|
|
|
second_seg = self.serv_file.read(3)
|
2002-06-12 18:29:43 -03:00
|
|
|
msg = first_seg + second_seg
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(msg, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testSmallRead(self):
|
|
|
|
self.cli_file.write(MSG)
|
|
|
|
self.cli_file.flush()
|
|
|
|
|
2002-08-07 22:00:28 -03:00
|
|
|
def testFullRead(self):
|
|
|
|
# read until EOF
|
|
|
|
msg = self.serv_file.read()
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
def _testFullRead(self):
|
|
|
|
self.cli_file.write(MSG)
|
|
|
|
self.cli_file.close()
|
|
|
|
|
2002-06-12 16:18:08 -03:00
|
|
|
def testUnbufferedRead(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Performing unbuffered file read test
|
2002-06-12 16:18:08 -03:00
|
|
|
buf = ''
|
|
|
|
while 1:
|
|
|
|
char = self.serv_file.read(1)
|
2002-08-07 22:00:28 -03:00
|
|
|
if not char:
|
2002-06-12 16:18:08 -03:00
|
|
|
break
|
2002-08-07 22:00:28 -03:00
|
|
|
buf += char
|
|
|
|
self.assertEqual(buf, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testUnbufferedRead(self):
|
|
|
|
self.cli_file.write(MSG)
|
|
|
|
self.cli_file.flush()
|
|
|
|
|
|
|
|
def testReadline(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Performing file readline test
|
2002-06-12 16:18:08 -03:00
|
|
|
line = self.serv_file.readline()
|
2002-06-12 17:38:30 -03:00
|
|
|
self.assertEqual(line, MSG)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
def _testReadline(self):
|
|
|
|
self.cli_file.write(MSG)
|
|
|
|
self.cli_file.flush()
|
|
|
|
|
2008-05-05 18:53:45 -03:00
|
|
|
def testReadlineAfterRead(self):
|
|
|
|
a_baloo_is = self.serv_file.read(len("A baloo is"))
|
|
|
|
self.assertEqual("A baloo is", a_baloo_is)
|
|
|
|
_a_bear = self.serv_file.read(len(" a bear"))
|
|
|
|
self.assertEqual(" a bear", _a_bear)
|
|
|
|
line = self.serv_file.readline()
|
|
|
|
self.assertEqual("\n", line)
|
|
|
|
line = self.serv_file.readline()
|
|
|
|
self.assertEqual("A BALOO IS A BEAR.\n", line)
|
|
|
|
line = self.serv_file.readline()
|
|
|
|
self.assertEqual(MSG, line)
|
|
|
|
|
|
|
|
def _testReadlineAfterRead(self):
|
|
|
|
self.cli_file.write("A baloo is a bear\n")
|
|
|
|
self.cli_file.write("A BALOO IS A BEAR.\n")
|
|
|
|
self.cli_file.write(MSG)
|
|
|
|
self.cli_file.flush()
|
|
|
|
|
|
|
|
def testReadlineAfterReadNoNewline(self):
|
|
|
|
end_of_ = self.serv_file.read(len("End Of "))
|
|
|
|
self.assertEqual("End Of ", end_of_)
|
|
|
|
line = self.serv_file.readline()
|
|
|
|
self.assertEqual("Line", line)
|
|
|
|
|
|
|
|
def _testReadlineAfterReadNoNewline(self):
|
|
|
|
self.cli_file.write("End Of Line")
|
|
|
|
|
2004-03-27 22:20:45 -04:00
|
|
|
def testClosedAttr(self):
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(not self.serv_file.closed)
|
2004-03-27 22:20:45 -04:00
|
|
|
|
|
|
|
def _testClosedAttr(self):
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(not self.cli_file.closed)
|
2004-03-27 22:20:45 -04:00
|
|
|
|
2009-08-13 15:54:50 -03:00
|
|
|
|
|
|
|
class FileObjectInterruptedTestCase(unittest.TestCase):
|
|
|
|
"""Test that the file object correctly handles EINTR internally."""
|
|
|
|
|
|
|
|
class MockSocket(object):
|
|
|
|
def __init__(self, recv_funcs=()):
|
|
|
|
# A generator that returns callables that we'll call for each
|
|
|
|
# call to recv().
|
|
|
|
self._recv_step = iter(recv_funcs)
|
|
|
|
|
|
|
|
def recv(self, size):
|
|
|
|
return self._recv_step.next()()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _raise_eintr():
|
|
|
|
raise socket.error(errno.EINTR)
|
|
|
|
|
|
|
|
def _test_readline(self, size=-1, **kwargs):
|
|
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
|
|
lambda : "This is the first line\nAnd the sec",
|
|
|
|
self._raise_eintr,
|
|
|
|
lambda : "ond line is here\n",
|
|
|
|
lambda : "",
|
|
|
|
])
|
|
|
|
fo = socket._fileobject(mock_sock, **kwargs)
|
|
|
|
self.assertEquals(fo.readline(size), "This is the first line\n")
|
|
|
|
self.assertEquals(fo.readline(size), "And the second line is here\n")
|
|
|
|
|
|
|
|
def _test_read(self, size=-1, **kwargs):
|
|
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
|
|
lambda : "This is the first line\nAnd the sec",
|
|
|
|
self._raise_eintr,
|
|
|
|
lambda : "ond line is here\n",
|
|
|
|
lambda : "",
|
|
|
|
])
|
|
|
|
fo = socket._fileobject(mock_sock, **kwargs)
|
|
|
|
self.assertEquals(fo.read(size), "This is the first line\n"
|
|
|
|
"And the second line is here\n")
|
|
|
|
|
|
|
|
def test_default(self):
|
|
|
|
self._test_readline()
|
|
|
|
self._test_readline(size=100)
|
|
|
|
self._test_read()
|
|
|
|
self._test_read(size=100)
|
|
|
|
|
|
|
|
def test_with_1k_buffer(self):
|
|
|
|
self._test_readline(bufsize=1024)
|
|
|
|
self._test_readline(size=100, bufsize=1024)
|
|
|
|
self._test_read(bufsize=1024)
|
|
|
|
self._test_read(size=100, bufsize=1024)
|
|
|
|
|
|
|
|
def _test_readline_no_buffer(self, size=-1):
|
|
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
|
|
lambda : "aa",
|
|
|
|
lambda : "\n",
|
|
|
|
lambda : "BB",
|
|
|
|
self._raise_eintr,
|
|
|
|
lambda : "bb",
|
|
|
|
lambda : "",
|
|
|
|
])
|
|
|
|
fo = socket._fileobject(mock_sock, bufsize=0)
|
|
|
|
self.assertEquals(fo.readline(size), "aa\n")
|
|
|
|
self.assertEquals(fo.readline(size), "BBbb")
|
|
|
|
|
|
|
|
def test_no_buffer(self):
|
|
|
|
self._test_readline_no_buffer()
|
|
|
|
self._test_readline_no_buffer(size=4)
|
|
|
|
self._test_read(bufsize=0)
|
|
|
|
self._test_read(size=100, bufsize=0)
|
|
|
|
|
|
|
|
|
2002-08-07 12:46:19 -03:00
|
|
|
class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
|
|
|
|
"""Repeat the tests from FileObjectClassTestCase with bufsize==0.
|
2002-08-08 17:19:19 -03:00
|
|
|
|
2002-08-07 12:46:19 -03:00
|
|
|
In this case (and in this case only), it should be possible to
|
|
|
|
create a file object, read a line from it, create another file
|
|
|
|
object, read another line from it, without loss of data in the
|
|
|
|
first file object's buffer. Note that httplib relies on this
|
|
|
|
when reading multiple requests from the same socket."""
|
|
|
|
|
|
|
|
bufsize = 0 # Use unbuffered mode
|
|
|
|
|
|
|
|
def testUnbufferedReadline(self):
|
2002-08-07 22:00:28 -03:00
|
|
|
# Read a line, create a new file object, read another line with it
|
2002-08-07 12:46:19 -03:00
|
|
|
line = self.serv_file.readline() # first line
|
2002-08-07 16:02:49 -03:00
|
|
|
self.assertEqual(line, "A. " + MSG) # first line
|
2002-08-07 12:46:19 -03:00
|
|
|
self.serv_file = self.cli_conn.makefile('rb', 0)
|
|
|
|
line = self.serv_file.readline() # second line
|
2002-08-07 16:02:49 -03:00
|
|
|
self.assertEqual(line, "B. " + MSG) # second line
|
2002-08-07 12:46:19 -03:00
|
|
|
|
|
|
|
def _testUnbufferedReadline(self):
|
2002-08-07 16:02:49 -03:00
|
|
|
self.cli_file.write("A. " + MSG)
|
|
|
|
self.cli_file.write("B. " + MSG)
|
2002-08-07 12:46:19 -03:00
|
|
|
self.cli_file.flush()
|
|
|
|
|
2002-08-07 22:00:28 -03:00
|
|
|
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
|
|
|
|
bufsize = 1 # Default-buffered for reading; line-buffered for writing
|
|
|
|
|
|
|
|
|
|
|
|
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
|
|
|
|
bufsize = 2 # Exercise the buffering code
|
2002-08-07 12:46:19 -03:00
|
|
|
|
2007-01-21 06:35:10 -04:00
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
class NetworkConnectionTest(object):
|
|
|
|
"""Prove network connection."""
|
|
|
|
def clientSetUp(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
# We're inherited below by BasicTCPTest2, which also inherits
|
|
|
|
# BasicTCPTest, which defines self.port referenced below.
|
|
|
|
self.cli = socket.create_connection((HOST, self.port))
|
2007-03-23 15:54:07 -03:00
|
|
|
self.serv_conn = self.cli
|
2007-04-25 03:30:05 -03:00
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
|
|
|
|
"""Tests that NetworkConnection does not break existing TCP functionality.
|
|
|
|
"""
|
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
class NetworkConnectionNoServer(unittest.TestCase):
|
2010-09-07 18:40:25 -03:00
|
|
|
class MockSocket(socket.socket):
|
|
|
|
def connect(self, *args):
|
|
|
|
raise socket.timeout('timed out')
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
|
|
def mocked_socket_module(self):
|
|
|
|
"""Return a socket which times out on connect"""
|
|
|
|
old_socket = socket.socket
|
|
|
|
socket.socket = self.MockSocket
|
|
|
|
try:
|
|
|
|
yield
|
|
|
|
finally:
|
|
|
|
socket.socket = old_socket
|
|
|
|
|
|
|
|
def test_connect(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
port = test_support.find_unused_port()
|
2010-09-07 18:40:25 -03:00
|
|
|
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
with self.assertRaises(socket.error) as cm:
|
|
|
|
cli.connect((HOST, port))
|
|
|
|
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
|
|
|
|
|
|
|
|
def test_create_connection(self):
|
|
|
|
# Issue #9792: errors raised by create_connection() should have
|
|
|
|
# a proper errno attribute.
|
|
|
|
port = test_support.find_unused_port()
|
|
|
|
with self.assertRaises(socket.error) as cm:
|
|
|
|
socket.create_connection((HOST, port))
|
|
|
|
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
|
|
|
|
|
|
|
|
def test_create_connection_timeout(self):
|
|
|
|
# Issue #9792: create_connection() should not recast timeout errors
|
|
|
|
# as generic socket errors.
|
|
|
|
with self.mocked_socket_module():
|
|
|
|
with self.assertRaises(socket.timeout):
|
|
|
|
socket.create_connection((HOST, 1234))
|
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2007-03-24 22:53:21 -03:00
|
|
|
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
def clientSetUp(self):
|
2010-01-02 21:29:44 -04:00
|
|
|
self.source_port = test_support.find_unused_port()
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
def clientTearDown(self):
|
|
|
|
self.cli.close()
|
|
|
|
self.cli = None
|
|
|
|
ThreadableTest.clientTearDown(self)
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
def _justAccept(self):
|
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
|
|
|
|
testFamily = _justAccept
|
|
|
|
def _testFamily(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30)
|
2007-03-24 22:53:21 -03:00
|
|
|
self.assertEqual(self.cli.family, 2)
|
2007-03-23 15:54:07 -03:00
|
|
|
|
2010-01-03 11:05:52 -04:00
|
|
|
testSourceAddress = _justAccept
|
|
|
|
def _testSourceAddress(self):
|
2010-01-02 21:29:44 -04:00
|
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30,
|
|
|
|
source_address=('', self.source_port))
|
|
|
|
self.assertEqual(self.cli.getsockname()[1], self.source_port)
|
2010-01-03 11:05:52 -04:00
|
|
|
# The port number being used is sufficient to show that the bind()
|
|
|
|
# call happened.
|
2010-01-02 21:29:44 -04:00
|
|
|
|
2007-04-25 03:30:05 -03:00
|
|
|
testTimeoutDefault = _justAccept
|
2007-03-24 22:53:21 -03:00
|
|
|
def _testTimeoutDefault(self):
|
2008-05-29 13:39:26 -03:00
|
|
|
# passing no explicit timeout uses socket's global default
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
2008-05-29 13:39:26 -03:00
|
|
|
socket.setdefaulttimeout(42)
|
|
|
|
try:
|
|
|
|
self.cli = socket.create_connection((HOST, self.port))
|
|
|
|
finally:
|
|
|
|
socket.setdefaulttimeout(None)
|
|
|
|
self.assertEquals(self.cli.gettimeout(), 42)
|
|
|
|
|
|
|
|
testTimeoutNone = _justAccept
|
|
|
|
def _testTimeoutNone(self):
|
|
|
|
# None timeout means the same as sock.settimeout(None)
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
2008-05-29 13:39:26 -03:00
|
|
|
socket.setdefaulttimeout(30)
|
|
|
|
try:
|
|
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=None)
|
|
|
|
finally:
|
|
|
|
socket.setdefaulttimeout(None)
|
|
|
|
self.assertEqual(self.cli.gettimeout(), None)
|
2007-04-25 03:30:05 -03:00
|
|
|
|
|
|
|
testTimeoutValueNamed = _justAccept
|
2007-03-24 22:53:21 -03:00
|
|
|
def _testTimeoutValueNamed(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30)
|
2007-03-24 22:53:21 -03:00
|
|
|
self.assertEqual(self.cli.gettimeout(), 30)
|
|
|
|
|
2007-04-25 03:30:05 -03:00
|
|
|
testTimeoutValueNonamed = _justAccept
|
2007-03-24 22:53:21 -03:00
|
|
|
def _testTimeoutValueNonamed(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli = socket.create_connection((HOST, self.port), 30)
|
2007-03-24 22:53:21 -03:00
|
|
|
self.assertEqual(self.cli.gettimeout(), 30)
|
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2007-03-24 22:53:21 -03:00
|
|
|
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
|
|
|
|
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def clientTearDown(self):
|
|
|
|
self.cli.close()
|
|
|
|
self.cli = None
|
|
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
|
2007-03-23 15:54:07 -03:00
|
|
|
def testInsideTimeout(self):
|
2007-03-24 22:53:21 -03:00
|
|
|
conn, addr = self.serv.accept()
|
|
|
|
time.sleep(3)
|
|
|
|
conn.send("done!")
|
|
|
|
testOutsideTimeout = testInsideTimeout
|
|
|
|
|
|
|
|
def _testInsideTimeout(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli = sock = socket.create_connection((HOST, self.port))
|
2007-03-23 15:54:07 -03:00
|
|
|
data = sock.recv(5)
|
|
|
|
self.assertEqual(data, "done!")
|
|
|
|
|
2007-03-24 22:53:21 -03:00
|
|
|
def _testOutsideTimeout(self):
|
- Issue #2550: The approach used by client/server code for obtaining ports
to listen on in network-oriented tests has been refined in an effort to
facilitate running multiple instances of the entire regression test suite
in parallel without issue. test_support.bind_port() has been fixed such
that it will always return a unique port -- which wasn't always the case
with the previous implementation, especially if socket options had been
set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
new implementation of bind_port() will actually raise an exception if it
is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
This currently only applies to Windows. This option prevents any other
sockets from binding to the host/port we've bound to, thus removing the
possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
that occurs when a second SOCK_STREAM socket binds and accepts to a
host/port that's already been bound by another socket. The optional
preferred port parameter to bind_port() has been removed. Under no
circumstances should tests be hard coding ports!
test_support.find_unused_port() has also been introduced, which will pass
a temporary socket object to bind_port() in order to obtain an unused port.
The temporary socket object is then closed and deleted, and the port is
returned. This method should only be used for obtaining an unused port
in order to pass to an external program (i.e. the -accept [port] argument
to openssl's s_server mode) or as a parameter to a server-oriented class
that doesn't give you direct access to the underlying socket used.
Finally, test_support.HOST has been introduced, which should be used for
the host argument of any relevant socket calls (i.e. bind and connect).
The following tests were updated to following the new conventions:
test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
test_poplib, test_ftplib, test_telnetlib, test_socketserver,
test_asynchat and test_socket_ssl.
It is now possible for multiple instances of the regression test suite to
run in parallel without issue.
2008-04-08 20:47:30 -03:00
|
|
|
self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.timeout, lambda: sock.recv(5))
|
2007-03-23 15:54:07 -03:00
|
|
|
|
|
|
|
|
2007-01-21 06:35:10 -04:00
|
|
|
class Urllib2FileobjectTest(unittest.TestCase):
|
|
|
|
|
|
|
|
# urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
|
|
|
|
# it close the socket if the close c'tor argument is true
|
|
|
|
|
|
|
|
def testClose(self):
|
|
|
|
class MockSocket:
|
|
|
|
closed = False
|
|
|
|
def flush(self): pass
|
|
|
|
def close(self): self.closed = True
|
|
|
|
|
|
|
|
# must not close unless we request it: the original use of _fileobject
|
|
|
|
# by module socket requires that the underlying socket not be closed until
|
|
|
|
# the _socketobject that created the _fileobject is closed
|
|
|
|
s = MockSocket()
|
|
|
|
f = socket._fileobject(s)
|
|
|
|
f.close()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(not s.closed)
|
2007-01-21 06:35:10 -04:00
|
|
|
|
|
|
|
s = MockSocket()
|
|
|
|
f = socket._fileobject(s, close=True)
|
|
|
|
f.close()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(s.closed)
|
2007-01-21 06:35:10 -04:00
|
|
|
|
2003-06-29 01:40:22 -03:00
|
|
|
class TCPTimeoutTest(SocketTCPTest):
|
|
|
|
|
|
|
|
def testTCPTimeout(self):
|
|
|
|
def raise_timeout(*args, **kwargs):
|
|
|
|
self.serv.settimeout(1.0)
|
|
|
|
self.serv.accept()
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.timeout, raise_timeout,
|
2003-06-29 01:40:22 -03:00
|
|
|
"Error generating a timeout exception (TCP)")
|
|
|
|
|
|
|
|
def testTimeoutZero(self):
|
|
|
|
ok = False
|
|
|
|
try:
|
|
|
|
self.serv.settimeout(0.0)
|
|
|
|
foo = self.serv.accept()
|
|
|
|
except socket.timeout:
|
|
|
|
self.fail("caught timeout instead of error (TCP)")
|
|
|
|
except socket.error:
|
|
|
|
ok = True
|
|
|
|
except:
|
|
|
|
self.fail("caught unexpected exception (TCP)")
|
|
|
|
if not ok:
|
|
|
|
self.fail("accept() returned success when we did not expect it")
|
|
|
|
|
2006-08-02 03:46:21 -03:00
|
|
|
def testInterruptedTimeout(self):
|
|
|
|
# XXX I don't know how to do this test on MSWindows or any other
|
|
|
|
# plaform that doesn't support signal.alarm() or os.kill(), though
|
|
|
|
# the bug should have existed on all platforms.
|
|
|
|
if not hasattr(signal, "alarm"):
|
|
|
|
return # can only test on *nix
|
|
|
|
self.serv.settimeout(5.0) # must be longer than alarm
|
|
|
|
class Alarm(Exception):
|
|
|
|
pass
|
|
|
|
def alarm_handler(signal, frame):
|
|
|
|
raise Alarm
|
|
|
|
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
|
|
|
|
try:
|
|
|
|
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
|
|
|
|
try:
|
|
|
|
foo = self.serv.accept()
|
|
|
|
except socket.timeout:
|
|
|
|
self.fail("caught timeout instead of Alarm")
|
|
|
|
except Alarm:
|
|
|
|
pass
|
|
|
|
except:
|
2008-03-28 01:53:10 -03:00
|
|
|
self.fail("caught other exception instead of Alarm:"
|
|
|
|
" %s(%s):\n%s" %
|
|
|
|
(sys.exc_info()[:2] + (traceback.format_exc(),)))
|
2006-08-02 03:46:21 -03:00
|
|
|
else:
|
|
|
|
self.fail("nothing caught")
|
2008-03-28 01:53:10 -03:00
|
|
|
finally:
|
|
|
|
signal.alarm(0) # shut off alarm
|
2006-08-02 03:46:21 -03:00
|
|
|
except Alarm:
|
|
|
|
self.fail("got Alarm in wrong place")
|
|
|
|
finally:
|
|
|
|
# no alarm can be pending. Safe to restore old handler.
|
|
|
|
signal.signal(signal.SIGALRM, old_alarm)
|
|
|
|
|
2003-06-29 01:40:22 -03:00
|
|
|
class UDPTimeoutTest(SocketTCPTest):
|
|
|
|
|
|
|
|
def testUDPTimeout(self):
|
|
|
|
def raise_timeout(*args, **kwargs):
|
|
|
|
self.serv.settimeout(1.0)
|
|
|
|
self.serv.recv(1024)
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertRaises(socket.timeout, raise_timeout,
|
2003-06-29 01:40:22 -03:00
|
|
|
"Error generating a timeout exception (UDP)")
|
|
|
|
|
|
|
|
def testTimeoutZero(self):
|
|
|
|
ok = False
|
|
|
|
try:
|
|
|
|
self.serv.settimeout(0.0)
|
|
|
|
foo = self.serv.recv(1024)
|
|
|
|
except socket.timeout:
|
|
|
|
self.fail("caught timeout instead of error (UDP)")
|
|
|
|
except socket.error:
|
|
|
|
ok = True
|
|
|
|
except:
|
|
|
|
self.fail("caught unexpected exception (UDP)")
|
|
|
|
if not ok:
|
|
|
|
self.fail("recv() returned success when we did not expect it")
|
|
|
|
|
|
|
|
class TestExceptions(unittest.TestCase):
|
|
|
|
|
|
|
|
def testExceptionTree(self):
|
2009-06-30 19:57:08 -03:00
|
|
|
self.assertTrue(issubclass(socket.error, Exception))
|
|
|
|
self.assertTrue(issubclass(socket.herror, socket.error))
|
|
|
|
self.assertTrue(issubclass(socket.gaierror, socket.error))
|
|
|
|
self.assertTrue(issubclass(socket.timeout, socket.error))
|
2003-06-29 01:40:22 -03:00
|
|
|
|
2006-04-19 08:50:27 -03:00
|
|
|
class TestLinuxAbstractNamespace(unittest.TestCase):
|
|
|
|
|
|
|
|
UNIX_PATH_MAX = 108
|
|
|
|
|
|
|
|
def testLinuxAbstractNamespace(self):
|
|
|
|
address = "\x00python-test-hello\x00\xff"
|
|
|
|
s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
s1.bind(address)
|
|
|
|
s1.listen(1)
|
|
|
|
s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
s2.connect(s1.getsockname())
|
|
|
|
s1.accept()
|
|
|
|
self.assertEqual(s1.getsockname(), address)
|
|
|
|
self.assertEqual(s2.getpeername(), address)
|
|
|
|
|
|
|
|
def testMaxName(self):
|
|
|
|
address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
|
|
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
s.bind(address)
|
|
|
|
self.assertEqual(s.getsockname(), address)
|
|
|
|
|
|
|
|
def testNameOverflow(self):
|
|
|
|
address = "\x00" + "h" * self.UNIX_PATH_MAX
|
|
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
|
self.assertRaises(socket.error, s.bind, address)
|
|
|
|
|
2003-06-29 01:40:22 -03:00
|
|
|
|
2010-04-27 20:55:59 -03:00
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
2006-05-26 09:03:27 -03:00
|
|
|
class BufferIOTest(SocketConnectedTest):
|
|
|
|
"""
|
|
|
|
Test the buffer versions of socket.recv() and socket.send().
|
|
|
|
"""
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
|
2010-03-17 19:45:39 -03:00
|
|
|
def testRecvIntoArray(self):
|
2006-05-26 09:03:27 -03:00
|
|
|
buf = array.array('c', ' '*1024)
|
2006-06-04 10:49:49 -03:00
|
|
|
nbytes = self.cli_conn.recv_into(buf)
|
2006-05-26 09:03:27 -03:00
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf.tostring()[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
2010-03-17 19:45:39 -03:00
|
|
|
def _testRecvIntoArray(self):
|
2010-03-20 22:14:24 -03:00
|
|
|
with test_support.check_py3k_warnings():
|
|
|
|
buf = buffer(MSG)
|
2006-05-26 09:03:27 -03:00
|
|
|
self.serv_conn.send(buf)
|
|
|
|
|
2010-03-17 19:45:39 -03:00
|
|
|
def testRecvIntoBytearray(self):
|
|
|
|
buf = bytearray(1024)
|
|
|
|
nbytes = self.cli_conn.recv_into(buf)
|
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
_testRecvIntoBytearray = _testRecvIntoArray
|
|
|
|
|
|
|
|
def testRecvIntoMemoryview(self):
|
|
|
|
buf = bytearray(1024)
|
|
|
|
nbytes = self.cli_conn.recv_into(memoryview(buf))
|
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
_testRecvIntoMemoryview = _testRecvIntoArray
|
|
|
|
|
|
|
|
def testRecvFromIntoArray(self):
|
2006-05-26 09:03:27 -03:00
|
|
|
buf = array.array('c', ' '*1024)
|
2006-06-04 10:49:49 -03:00
|
|
|
nbytes, addr = self.cli_conn.recvfrom_into(buf)
|
2006-05-26 09:03:27 -03:00
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf.tostring()[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
2010-03-17 19:45:39 -03:00
|
|
|
def _testRecvFromIntoArray(self):
|
2010-03-20 22:14:24 -03:00
|
|
|
with test_support.check_py3k_warnings():
|
|
|
|
buf = buffer(MSG)
|
2006-05-26 09:03:27 -03:00
|
|
|
self.serv_conn.send(buf)
|
|
|
|
|
2010-03-17 19:45:39 -03:00
|
|
|
def testRecvFromIntoBytearray(self):
|
|
|
|
buf = bytearray(1024)
|
|
|
|
nbytes, addr = self.cli_conn.recvfrom_into(buf)
|
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
_testRecvFromIntoBytearray = _testRecvFromIntoArray
|
|
|
|
|
|
|
|
def testRecvFromIntoMemoryview(self):
|
|
|
|
buf = bytearray(1024)
|
|
|
|
nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
|
|
|
|
self.assertEqual(nbytes, len(MSG))
|
|
|
|
msg = buf[:len(MSG)]
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
_testRecvFromIntoMemoryview = _testRecvFromIntoArray
|
|
|
|
|
2008-01-07 12:12:44 -04:00
|
|
|
|
|
|
|
TIPC_STYPE = 2000
|
|
|
|
TIPC_LOWER = 200
|
|
|
|
TIPC_UPPER = 210
|
|
|
|
|
|
|
|
def isTipcAvailable():
|
|
|
|
"""Check if the TIPC module is loaded
|
|
|
|
|
|
|
|
The TIPC module is not loaded automatically on Ubuntu and probably
|
|
|
|
other Linux distros.
|
|
|
|
"""
|
|
|
|
if not hasattr(socket, "AF_TIPC"):
|
|
|
|
return False
|
|
|
|
if not os.path.isfile("/proc/modules"):
|
|
|
|
return False
|
|
|
|
with open("/proc/modules") as f:
|
|
|
|
for line in f:
|
|
|
|
if line.startswith("tipc "):
|
|
|
|
return True
|
2008-01-07 23:40:04 -04:00
|
|
|
if test_support.verbose:
|
2008-01-07 12:12:44 -04:00
|
|
|
print "TIPC module is not loaded, please 'sudo modprobe tipc'"
|
|
|
|
return False
|
|
|
|
|
|
|
|
class TIPCTest (unittest.TestCase):
|
|
|
|
def testRDM(self):
|
|
|
|
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
|
|
|
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
|
|
|
|
|
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
|
|
|
TIPC_LOWER, TIPC_UPPER)
|
|
|
|
srv.bind(srvaddr)
|
|
|
|
|
|
|
|
sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
|
|
|
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
|
|
|
cli.sendto(MSG, sendaddr)
|
|
|
|
|
|
|
|
msg, recvaddr = srv.recvfrom(1024)
|
|
|
|
|
|
|
|
self.assertEqual(cli.getsockname(), recvaddr)
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
|
|
|
|
|
|
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
|
|
|
|
def __init__(self, methodName = 'runTest'):
|
|
|
|
unittest.TestCase.__init__(self, methodName = methodName)
|
|
|
|
ThreadableTest.__init__(self)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
|
|
|
self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
|
|
|
TIPC_LOWER, TIPC_UPPER)
|
|
|
|
self.srv.bind(srvaddr)
|
|
|
|
self.srv.listen(5)
|
|
|
|
self.serverExplicitReady()
|
|
|
|
self.conn, self.connaddr = self.srv.accept()
|
|
|
|
|
|
|
|
def clientSetUp(self):
|
|
|
|
# The is a hittable race between serverExplicitReady() and the
|
|
|
|
# accept() call; sleep a little while to avoid it, otherwise
|
|
|
|
# we could get an exception
|
|
|
|
time.sleep(0.1)
|
|
|
|
self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
|
|
|
addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
|
|
|
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
|
|
|
self.cli.connect(addr)
|
|
|
|
self.cliaddr = self.cli.getsockname()
|
|
|
|
|
|
|
|
def testStream(self):
|
|
|
|
msg = self.conn.recv(1024)
|
|
|
|
self.assertEqual(msg, MSG)
|
|
|
|
self.assertEqual(self.cliaddr, self.connaddr)
|
|
|
|
|
|
|
|
def _testStream(self):
|
|
|
|
self.cli.send(MSG)
|
|
|
|
self.cli.close()
|
|
|
|
|
|
|
|
|
2002-07-31 13:08:40 -03:00
|
|
|
def test_main():
|
2006-07-01 12:33:37 -03:00
|
|
|
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
|
2010-05-05 16:09:31 -03:00
|
|
|
TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
|
|
|
|
UDPTimeoutTest ]
|
2003-05-01 14:45:56 -03:00
|
|
|
|
|
|
|
tests.extend([
|
|
|
|
NonBlockingTCPTests,
|
|
|
|
FileObjectClassTestCase,
|
2009-08-13 15:54:50 -03:00
|
|
|
FileObjectInterruptedTestCase,
|
2003-05-01 14:45:56 -03:00
|
|
|
UnbufferedFileObjectClassTestCase,
|
|
|
|
LineBufferedFileObjectClassTestCase,
|
2007-01-21 06:35:10 -04:00
|
|
|
SmallBufferedFileObjectClassTestCase,
|
|
|
|
Urllib2FileobjectTest,
|
2007-03-24 22:53:21 -03:00
|
|
|
NetworkConnectionNoServer,
|
2007-03-23 15:54:07 -03:00
|
|
|
NetworkConnectionAttributesTest,
|
|
|
|
NetworkConnectionBehaviourTest,
|
2003-05-01 14:45:56 -03:00
|
|
|
])
|
2004-08-09 01:51:41 -03:00
|
|
|
if hasattr(socket, "socketpair"):
|
|
|
|
tests.append(BasicSocketPairTest)
|
2006-04-19 08:50:27 -03:00
|
|
|
if sys.platform == 'linux2':
|
|
|
|
tests.append(TestLinuxAbstractNamespace)
|
2008-01-07 12:12:44 -04:00
|
|
|
if isTipcAvailable():
|
|
|
|
tests.append(TIPCTest)
|
2008-01-07 15:58:41 -04:00
|
|
|
tests.append(TIPCThreadableTest)
|
2006-06-18 16:35:01 -03:00
|
|
|
|
|
|
|
thread_info = test_support.threading_setup()
|
2003-05-01 14:45:56 -03:00
|
|
|
test_support.run_unittest(*tests)
|
2006-06-18 16:35:01 -03:00
|
|
|
test_support.threading_cleanup(*thread_info)
|
2002-06-12 16:18:08 -03:00
|
|
|
|
|
|
|
if __name__ == "__main__":
|
2002-07-31 13:08:40 -03:00
|
|
|
test_main()
|