mirror of https://github.com/python/cpython
1628 lines
55 KiB
Python
1628 lines
55 KiB
Python
#!/usr/bin/env python
|
|
|
|
import unittest
|
|
from test import test_support
|
|
|
|
import errno
|
|
import socket
|
|
import select
|
|
import time
|
|
import traceback
|
|
import Queue
|
|
import sys
|
|
import os
|
|
import array
|
|
import contextlib
|
|
from weakref import proxy
|
|
import signal
|
|
import math
|
|
|
|
def try_address(host, port=0, family=socket.AF_INET):
|
|
"""Try to bind a socket on the given host:port and return True
|
|
if that has been possible."""
|
|
try:
|
|
sock = socket.socket(family, socket.SOCK_STREAM)
|
|
sock.bind((host, port))
|
|
except (socket.error, socket.gaierror):
|
|
return False
|
|
else:
|
|
sock.close()
|
|
return True
|
|
|
|
HOST = test_support.HOST
|
|
MSG = b'Michael Gilfix was here\n'
|
|
SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
|
|
|
|
try:
|
|
import thread
|
|
import threading
|
|
except ImportError:
|
|
thread = None
|
|
threading = None
|
|
|
|
HOST = test_support.HOST
|
|
MSG = 'Michael Gilfix was here\n'
|
|
|
|
class SocketTCPTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.port = test_support.bind_port(self.serv)
|
|
self.serv.listen(1)
|
|
|
|
def tearDown(self):
|
|
self.serv.close()
|
|
self.serv = None
|
|
|
|
class SocketUDPTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.port = test_support.bind_port(self.serv)
|
|
|
|
def tearDown(self):
|
|
self.serv.close()
|
|
self.serv = None
|
|
|
|
class ThreadableTest:
|
|
"""Threadable Test class
|
|
|
|
The ThreadableTest class makes it easy to create a threaded
|
|
client/server pair from an existing unit test. To create a
|
|
new threaded class from an existing unit test, use multiple
|
|
inheritance:
|
|
|
|
class NewClass (OldClass, ThreadableTest):
|
|
pass
|
|
|
|
This class defines two new fixture functions with obvious
|
|
purposes for overriding:
|
|
|
|
clientSetUp ()
|
|
clientTearDown ()
|
|
|
|
Any new test functions within the class must then define
|
|
tests in pairs, where the test name is preceeded with a
|
|
'_' to indicate the client portion of the test. Ex:
|
|
|
|
def testFoo(self):
|
|
# Server portion
|
|
|
|
def _testFoo(self):
|
|
# Client portion
|
|
|
|
Any exceptions raised by the clients during their tests
|
|
are caught and transferred to the main thread to alert
|
|
the testing framework.
|
|
|
|
Note, the server setup function cannot call any blocking
|
|
functions that rely on the client thread during setup,
|
|
unless serverExplicitReady() is called just before
|
|
the blocking call (such as in setting up a client/server
|
|
connection and performing the accept() in setUp().
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Swap the true setup function
|
|
self.__setUp = self.setUp
|
|
self.__tearDown = self.tearDown
|
|
self.setUp = self._setUp
|
|
self.tearDown = self._tearDown
|
|
|
|
def serverExplicitReady(self):
|
|
"""This method allows the server to explicitly indicate that
|
|
it wants the client thread to proceed. This is useful if the
|
|
server is about to execute a blocking routine that is
|
|
dependent upon the client thread during its setup routine."""
|
|
self.server_ready.set()
|
|
|
|
def _setUp(self):
|
|
self.server_ready = threading.Event()
|
|
self.client_ready = threading.Event()
|
|
self.done = threading.Event()
|
|
self.queue = Queue.Queue(1)
|
|
|
|
# Do some munging to start the client test.
|
|
methodname = self.id()
|
|
i = methodname.rfind('.')
|
|
methodname = methodname[i+1:]
|
|
test_method = getattr(self, '_' + methodname)
|
|
self.client_thread = thread.start_new_thread(
|
|
self.clientRun, (test_method,))
|
|
|
|
self.__setUp()
|
|
if not self.server_ready.is_set():
|
|
self.server_ready.set()
|
|
self.client_ready.wait()
|
|
|
|
def _tearDown(self):
|
|
self.__tearDown()
|
|
self.done.wait()
|
|
|
|
if not self.queue.empty():
|
|
msg = self.queue.get()
|
|
self.fail(msg)
|
|
|
|
def clientRun(self, test_func):
|
|
self.server_ready.wait()
|
|
self.clientSetUp()
|
|
self.client_ready.set()
|
|
if not callable(test_func):
|
|
raise TypeError("test_func must be a callable function.")
|
|
try:
|
|
test_func()
|
|
except Exception, strerror:
|
|
self.queue.put(strerror)
|
|
self.clientTearDown()
|
|
|
|
def clientSetUp(self):
|
|
raise NotImplementedError("clientSetUp must be implemented.")
|
|
|
|
def clientTearDown(self):
|
|
self.done.set()
|
|
thread.exit()
|
|
|
|
class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def clientSetUp(self):
|
|
self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
def clientTearDown(self):
|
|
self.cli.close()
|
|
self.cli = None
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketUDPTest.__init__(self, methodName=methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def clientSetUp(self):
|
|
self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
def clientTearDown(self):
|
|
self.cli.close()
|
|
self.cli = None
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
class SocketConnectedTest(ThreadedTCPSocketTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
def setUp(self):
|
|
ThreadedTCPSocketTest.setUp(self)
|
|
# Indicate explicitly we're ready for the client thread to
|
|
# proceed and then perform the blocking call to accept
|
|
self.serverExplicitReady()
|
|
conn, addr = self.serv.accept()
|
|
self.cli_conn = conn
|
|
|
|
def tearDown(self):
|
|
self.cli_conn.close()
|
|
self.cli_conn = None
|
|
ThreadedTCPSocketTest.tearDown(self)
|
|
|
|
def clientSetUp(self):
|
|
ThreadedTCPSocketTest.clientSetUp(self)
|
|
self.cli.connect((HOST, self.port))
|
|
self.serv_conn = self.cli
|
|
|
|
def clientTearDown(self):
|
|
self.serv_conn.close()
|
|
self.serv_conn = None
|
|
ThreadedTCPSocketTest.clientTearDown(self)
|
|
|
|
class SocketPairTest(unittest.TestCase, ThreadableTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
unittest.TestCase.__init__(self, methodName=methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def setUp(self):
|
|
self.serv, self.cli = socket.socketpair()
|
|
|
|
def tearDown(self):
|
|
self.serv.close()
|
|
self.serv = None
|
|
|
|
def clientSetUp(self):
|
|
pass
|
|
|
|
def clientTearDown(self):
|
|
self.cli.close()
|
|
self.cli = None
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
|
|
#######################################################################
|
|
## Begin Tests
|
|
|
|
class GeneralModuleTests(unittest.TestCase):
|
|
|
|
def test_weakref(self):
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
p = proxy(s)
|
|
self.assertEqual(p.fileno(), s.fileno())
|
|
s.close()
|
|
s = None
|
|
try:
|
|
p.fileno()
|
|
except ReferenceError:
|
|
pass
|
|
else:
|
|
self.fail('Socket proxy still exists')
|
|
|
|
def testSocketError(self):
|
|
# Testing socket module exceptions
|
|
def raise_error(*args, **kwargs):
|
|
raise socket.error
|
|
def raise_herror(*args, **kwargs):
|
|
raise socket.herror
|
|
def raise_gaierror(*args, **kwargs):
|
|
raise socket.gaierror
|
|
self.assertRaises(socket.error, raise_error,
|
|
"Error raising socket exception.")
|
|
self.assertRaises(socket.error, raise_herror,
|
|
"Error raising socket exception.")
|
|
self.assertRaises(socket.error, raise_gaierror,
|
|
"Error raising socket exception.")
|
|
|
|
def testSendtoErrors(self):
|
|
# Testing that sendto doens't masks failures. See #10169.
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.addCleanup(s.close)
|
|
s.bind(('', 0))
|
|
sockname = s.getsockname()
|
|
# 2 args
|
|
with self.assertRaises(UnicodeEncodeError):
|
|
s.sendto(u'\u2620', sockname)
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto(5j, sockname)
|
|
self.assertIn('not complex', str(cm.exception))
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo', None)
|
|
self.assertIn('not NoneType', str(cm.exception))
|
|
# 3 args
|
|
with self.assertRaises(UnicodeEncodeError):
|
|
s.sendto(u'\u2620', 0, sockname)
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto(5j, 0, sockname)
|
|
self.assertIn('not complex', str(cm.exception))
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo', 0, None)
|
|
self.assertIn('not NoneType', str(cm.exception))
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo', 'bar', sockname)
|
|
self.assertIn('an integer is required', str(cm.exception))
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo', None, None)
|
|
self.assertIn('an integer is required', str(cm.exception))
|
|
# wrong number of args
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo')
|
|
self.assertIn('(1 given)', str(cm.exception))
|
|
with self.assertRaises(TypeError) as cm:
|
|
s.sendto('foo', 0, sockname, 4)
|
|
self.assertIn('(4 given)', str(cm.exception))
|
|
|
|
|
|
def testCrucialConstants(self):
|
|
# Testing for mission critical constants
|
|
socket.AF_INET
|
|
socket.SOCK_STREAM
|
|
socket.SOCK_DGRAM
|
|
socket.SOCK_RAW
|
|
socket.SOCK_RDM
|
|
socket.SOCK_SEQPACKET
|
|
socket.SOL_SOCKET
|
|
socket.SO_REUSEADDR
|
|
|
|
def testHostnameRes(self):
|
|
# Testing hostname resolution mechanisms
|
|
hostname = socket.gethostname()
|
|
try:
|
|
ip = socket.gethostbyname(hostname)
|
|
except socket.error:
|
|
# Probably name lookup wasn't set up right; skip this test
|
|
return
|
|
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
|
|
try:
|
|
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
|
|
except socket.error:
|
|
# Probably a similar problem as above; skip this test
|
|
return
|
|
all_host_names = [hostname, hname] + aliases
|
|
fqhn = socket.getfqdn(ip)
|
|
if not fqhn in all_host_names:
|
|
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
|
|
|
|
def testRefCountGetNameInfo(self):
|
|
# Testing reference count for getnameinfo
|
|
if hasattr(sys, "getrefcount"):
|
|
try:
|
|
# On some versions, this loses a reference
|
|
orig = sys.getrefcount(__name__)
|
|
socket.getnameinfo(__name__,0)
|
|
except TypeError:
|
|
self.assertEqual(sys.getrefcount(__name__), orig,
|
|
"socket.getnameinfo loses a reference")
|
|
|
|
def testInterpreterCrash(self):
|
|
# Making sure getnameinfo doesn't crash the interpreter
|
|
try:
|
|
# On some versions, this crashes the interpreter.
|
|
socket.getnameinfo(('x', 0, 0, 0), 0)
|
|
except socket.error:
|
|
pass
|
|
|
|
def testNtoH(self):
|
|
# This just checks that htons etc. are their own inverse,
|
|
# when looking at the lower 16 or 32 bits.
|
|
sizes = {socket.htonl: 32, socket.ntohl: 32,
|
|
socket.htons: 16, socket.ntohs: 16}
|
|
for func, size in sizes.items():
|
|
mask = (1L<<size) - 1
|
|
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
|
|
self.assertEqual(i & mask, func(func(i&mask)) & mask)
|
|
|
|
swapped = func(mask)
|
|
self.assertEqual(swapped & mask, mask)
|
|
self.assertRaises(OverflowError, func, 1L<<34)
|
|
|
|
def testNtoHErrors(self):
|
|
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
|
|
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
|
|
for k in good_values:
|
|
socket.ntohl(k)
|
|
socket.ntohs(k)
|
|
socket.htonl(k)
|
|
socket.htons(k)
|
|
for k in bad_values:
|
|
self.assertRaises(OverflowError, socket.ntohl, k)
|
|
self.assertRaises(OverflowError, socket.ntohs, k)
|
|
self.assertRaises(OverflowError, socket.htonl, k)
|
|
self.assertRaises(OverflowError, socket.htons, k)
|
|
|
|
def testGetServBy(self):
|
|
eq = self.assertEqual
|
|
# Find one service that exists, then check all the related interfaces.
|
|
# I've ordered this by protocols that have both a tcp and udp
|
|
# protocol, at least for modern Linuxes.
|
|
if (sys.platform.startswith('linux') or
|
|
sys.platform.startswith('freebsd') or
|
|
sys.platform.startswith('netbsd') or
|
|
sys.platform == 'darwin'):
|
|
# avoid the 'echo' service on this platform, as there is an
|
|
# assumption breaking non-standard port/protocol entry
|
|
services = ('daytime', 'qotd', 'domain')
|
|
else:
|
|
services = ('echo', 'daytime', 'domain')
|
|
for service in services:
|
|
try:
|
|
port = socket.getservbyname(service, 'tcp')
|
|
break
|
|
except socket.error:
|
|
pass
|
|
else:
|
|
raise socket.error
|
|
# Try same call with optional protocol omitted
|
|
port2 = socket.getservbyname(service)
|
|
eq(port, port2)
|
|
# Try udp, but don't barf it it doesn't exist
|
|
try:
|
|
udpport = socket.getservbyname(service, 'udp')
|
|
except socket.error:
|
|
udpport = None
|
|
else:
|
|
eq(udpport, port)
|
|
# Now make sure the lookup by port returns the same service name
|
|
eq(socket.getservbyport(port2), service)
|
|
eq(socket.getservbyport(port, 'tcp'), service)
|
|
if udpport is not None:
|
|
eq(socket.getservbyport(udpport, 'udp'), service)
|
|
# Make sure getservbyport does not accept out of range ports.
|
|
self.assertRaises(OverflowError, socket.getservbyport, -1)
|
|
self.assertRaises(OverflowError, socket.getservbyport, 65536)
|
|
|
|
def testDefaultTimeout(self):
|
|
# Testing default timeout
|
|
# The default timeout should initially be None
|
|
self.assertEqual(socket.getdefaulttimeout(), None)
|
|
s = socket.socket()
|
|
self.assertEqual(s.gettimeout(), None)
|
|
s.close()
|
|
|
|
# Set the default timeout to 10, and see if it propagates
|
|
socket.setdefaulttimeout(10)
|
|
self.assertEqual(socket.getdefaulttimeout(), 10)
|
|
s = socket.socket()
|
|
self.assertEqual(s.gettimeout(), 10)
|
|
s.close()
|
|
|
|
# Reset the default timeout to None, and see if it propagates
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(socket.getdefaulttimeout(), None)
|
|
s = socket.socket()
|
|
self.assertEqual(s.gettimeout(), None)
|
|
s.close()
|
|
|
|
# Check that setting it to an invalid value raises ValueError
|
|
self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
|
|
|
|
# Check that setting it to an invalid type raises TypeError
|
|
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
|
|
|
|
def testIPv4_inet_aton_fourbytes(self):
|
|
if not hasattr(socket, 'inet_aton'):
|
|
return # No inet_aton, nothing to check
|
|
# Test that issue1008086 and issue767150 are fixed.
|
|
# It must return 4 bytes.
|
|
self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
|
|
self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
|
|
|
|
def testIPv4toString(self):
|
|
if not hasattr(socket, 'inet_pton'):
|
|
return # No inet_pton() on this platform
|
|
from socket import inet_aton as f, inet_pton, AF_INET
|
|
g = lambda a: inet_pton(AF_INET, a)
|
|
|
|
self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
|
|
self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
|
|
self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
|
|
self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
|
|
self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
|
|
|
|
self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
|
|
self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
|
|
self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
|
|
self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
|
|
|
|
def testIPv6toString(self):
|
|
if not hasattr(socket, 'inet_pton'):
|
|
return # No inet_pton() on this platform
|
|
try:
|
|
from socket import inet_pton, AF_INET6, has_ipv6
|
|
if not has_ipv6:
|
|
return
|
|
except ImportError:
|
|
return
|
|
f = lambda a: inet_pton(AF_INET6, a)
|
|
|
|
self.assertEqual('\x00' * 16, f('::'))
|
|
self.assertEqual('\x00' * 16, f('0::0'))
|
|
self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
|
|
self.assertEqual(
|
|
'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
|
|
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
|
|
)
|
|
|
|
def testStringToIPv4(self):
|
|
if not hasattr(socket, 'inet_ntop'):
|
|
return # No inet_ntop() on this platform
|
|
from socket import inet_ntoa as f, inet_ntop, AF_INET
|
|
g = lambda a: inet_ntop(AF_INET, a)
|
|
|
|
self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
|
|
self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
|
|
self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
|
|
self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
|
|
|
|
self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
|
|
self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
|
|
self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
|
|
|
|
def testStringToIPv6(self):
|
|
if not hasattr(socket, 'inet_ntop'):
|
|
return # No inet_ntop() on this platform
|
|
try:
|
|
from socket import inet_ntop, AF_INET6, has_ipv6
|
|
if not has_ipv6:
|
|
return
|
|
except ImportError:
|
|
return
|
|
f = lambda a: inet_ntop(AF_INET6, a)
|
|
|
|
self.assertEqual('::', f('\x00' * 16))
|
|
self.assertEqual('::1', f('\x00' * 15 + '\x01'))
|
|
self.assertEqual(
|
|
'aef:b01:506:1001:ffff:9997:55:170',
|
|
f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
|
|
)
|
|
|
|
# XXX The following don't test module-level functionality...
|
|
|
|
def _get_unused_port(self, bind_address='0.0.0.0'):
|
|
"""Use a temporary socket to elicit an unused ephemeral port.
|
|
|
|
Args:
|
|
bind_address: Hostname or IP address to search for a port on.
|
|
|
|
Returns: A most likely to be unused port.
|
|
"""
|
|
tempsock = socket.socket()
|
|
tempsock.bind((bind_address, 0))
|
|
host, port = tempsock.getsockname()
|
|
tempsock.close()
|
|
return port
|
|
|
|
def testSockName(self):
|
|
# Testing getsockname()
|
|
port = self._get_unused_port()
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.addCleanup(sock.close)
|
|
sock.bind(("0.0.0.0", port))
|
|
name = sock.getsockname()
|
|
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
|
|
# it reasonable to get the host's addr in addition to 0.0.0.0.
|
|
# At least for eCos. This is required for the S/390 to pass.
|
|
try:
|
|
my_ip_addr = socket.gethostbyname(socket.gethostname())
|
|
except socket.error:
|
|
# Probably name lookup wasn't set up right; skip this test
|
|
return
|
|
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
|
|
self.assertEqual(name[1], port)
|
|
|
|
def testGetSockOpt(self):
|
|
# Testing getsockopt()
|
|
# We know a socket should start without reuse==0
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.addCleanup(sock.close)
|
|
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
|
self.assertFalse(reuse != 0, "initial mode is reuse")
|
|
|
|
def testSetSockOpt(self):
|
|
# Testing setsockopt()
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.addCleanup(sock.close)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
|
|
self.assertFalse(reuse == 0, "failed to set reuse mode")
|
|
|
|
def testSendAfterClose(self):
|
|
# testing send() after close() with timeout
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(1)
|
|
sock.close()
|
|
self.assertRaises(socket.error, sock.send, "spam")
|
|
|
|
def testNewAttributes(self):
|
|
# testing .family, .type and .protocol
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.assertEqual(sock.family, socket.AF_INET)
|
|
self.assertEqual(sock.type, socket.SOCK_STREAM)
|
|
self.assertEqual(sock.proto, 0)
|
|
sock.close()
|
|
|
|
def test_getsockaddrarg(self):
|
|
host = '0.0.0.0'
|
|
port = self._get_unused_port(bind_address=host)
|
|
big_port = port + 65536
|
|
neg_port = port - 65536
|
|
sock = socket.socket()
|
|
try:
|
|
self.assertRaises(OverflowError, sock.bind, (host, big_port))
|
|
self.assertRaises(OverflowError, sock.bind, (host, neg_port))
|
|
sock.bind((host, port))
|
|
finally:
|
|
sock.close()
|
|
|
|
@unittest.skipUnless(os.name == "nt", "Windows specific")
|
|
def test_sock_ioctl(self):
|
|
self.assertTrue(hasattr(socket.socket, 'ioctl'))
|
|
self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
|
|
self.assertTrue(hasattr(socket, 'RCVALL_ON'))
|
|
self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
|
|
self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
|
|
s = socket.socket()
|
|
self.addCleanup(s.close)
|
|
self.assertRaises(ValueError, s.ioctl, -1, None)
|
|
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
|
|
|
|
def testGetaddrinfo(self):
|
|
try:
|
|
socket.getaddrinfo('localhost', 80)
|
|
except socket.gaierror as err:
|
|
if err.errno == socket.EAI_SERVICE:
|
|
# see http://bugs.python.org/issue1282647
|
|
self.skipTest("buggy libc version")
|
|
raise
|
|
# len of every sequence is supposed to be == 5
|
|
for info in socket.getaddrinfo(HOST, None):
|
|
self.assertEqual(len(info), 5)
|
|
# host can be a domain name, a string representation of an
|
|
# IPv4/v6 address or None
|
|
socket.getaddrinfo('localhost', 80)
|
|
socket.getaddrinfo('127.0.0.1', 80)
|
|
socket.getaddrinfo(None, 80)
|
|
if SUPPORTS_IPV6:
|
|
socket.getaddrinfo('::1', 80)
|
|
# port can be a string service name such as "http", a numeric
|
|
# port number or None
|
|
socket.getaddrinfo(HOST, "http")
|
|
socket.getaddrinfo(HOST, 80)
|
|
socket.getaddrinfo(HOST, None)
|
|
# test family and socktype filters
|
|
infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
|
|
for family, _, _, _, _ in infos:
|
|
self.assertEqual(family, socket.AF_INET)
|
|
infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
|
|
for _, socktype, _, _, _ in infos:
|
|
self.assertEqual(socktype, socket.SOCK_STREAM)
|
|
# test proto and flags arguments
|
|
socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
|
|
socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
|
|
# a server willing to support both IPv4 and IPv6 will
|
|
# usually do this
|
|
socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
|
|
socket.AI_PASSIVE)
|
|
|
|
|
|
def check_sendall_interrupted(self, with_timeout):
|
|
# socketpair() is not stricly required, but it makes things easier.
|
|
if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
|
|
self.skipTest("signal.alarm and socket.socketpair required for this test")
|
|
# Our signal handlers clobber the C errno by calling a math function
|
|
# with an invalid domain value.
|
|
def ok_handler(*args):
|
|
self.assertRaises(ValueError, math.acosh, 0)
|
|
def raising_handler(*args):
|
|
self.assertRaises(ValueError, math.acosh, 0)
|
|
1 // 0
|
|
c, s = socket.socketpair()
|
|
old_alarm = signal.signal(signal.SIGALRM, raising_handler)
|
|
try:
|
|
if with_timeout:
|
|
# Just above the one second minimum for signal.alarm
|
|
c.settimeout(1.5)
|
|
with self.assertRaises(ZeroDivisionError):
|
|
signal.alarm(1)
|
|
c.sendall(b"x" * (1024**2))
|
|
if with_timeout:
|
|
signal.signal(signal.SIGALRM, ok_handler)
|
|
signal.alarm(1)
|
|
self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
|
|
finally:
|
|
signal.signal(signal.SIGALRM, old_alarm)
|
|
c.close()
|
|
s.close()
|
|
|
|
def test_sendall_interrupted(self):
|
|
self.check_sendall_interrupted(False)
|
|
|
|
def test_sendall_interrupted_with_timeout(self):
|
|
self.check_sendall_interrupted(True)
|
|
|
|
def testListenBacklog0(self):
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
srv.bind((HOST, 0))
|
|
# backlog = 0
|
|
srv.listen(0)
|
|
srv.close()
|
|
|
|
@unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
|
|
def test_flowinfo(self):
|
|
self.assertRaises(OverflowError, socket.getnameinfo,
|
|
('::1',0, 0xffffffff), 0)
|
|
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
|
try:
|
|
self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
|
|
finally:
|
|
s.close()
|
|
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class BasicTCPTest(SocketConnectedTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
def testRecv(self):
|
|
# Testing large receive over TCP
|
|
msg = self.cli_conn.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecv(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testOverFlowRecv(self):
|
|
# Testing receive in chunks over TCP
|
|
seg1 = self.cli_conn.recv(len(MSG) - 3)
|
|
seg2 = self.cli_conn.recv(1024)
|
|
msg = seg1 + seg2
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testOverFlowRecv(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testRecvFrom(self):
|
|
# Testing large recvfrom() over TCP
|
|
msg, addr = self.cli_conn.recvfrom(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecvFrom(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testOverFlowRecvFrom(self):
|
|
# Testing recvfrom() in chunks over TCP
|
|
seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
|
|
seg2, addr = self.cli_conn.recvfrom(1024)
|
|
msg = seg1 + seg2
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testOverFlowRecvFrom(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testSendAll(self):
|
|
# Testing sendall() with a 2048 byte string over TCP
|
|
msg = ''
|
|
while 1:
|
|
read = self.cli_conn.recv(1024)
|
|
if not read:
|
|
break
|
|
msg += read
|
|
self.assertEqual(msg, 'f' * 2048)
|
|
|
|
def _testSendAll(self):
|
|
big_chunk = 'f' * 2048
|
|
self.serv_conn.sendall(big_chunk)
|
|
|
|
def testFromFd(self):
|
|
# Testing fromfd()
|
|
if not hasattr(socket, "fromfd"):
|
|
return # On Windows, this doesn't exist
|
|
fd = self.cli_conn.fileno()
|
|
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
|
|
self.addCleanup(sock.close)
|
|
msg = sock.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testFromFd(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testDup(self):
|
|
# Testing dup()
|
|
sock = self.cli_conn.dup()
|
|
self.addCleanup(sock.close)
|
|
msg = sock.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testDup(self):
|
|
self.serv_conn.send(MSG)
|
|
|
|
def testShutdown(self):
|
|
# Testing shutdown()
|
|
msg = self.cli_conn.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
# wait for _testShutdown to finish: on OS X, when the server
|
|
# closes the connection the client also becomes disconnected,
|
|
# and the client's shutdown call will fail. (Issue #4397.)
|
|
self.done.wait()
|
|
|
|
def _testShutdown(self):
|
|
self.serv_conn.send(MSG)
|
|
self.serv_conn.shutdown(2)
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class BasicUDPTest(ThreadedUDPSocketTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
ThreadedUDPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
def testSendtoAndRecv(self):
|
|
# Testing sendto() and Recv() over UDP
|
|
msg = self.serv.recv(len(MSG))
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testSendtoAndRecv(self):
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
|
|
|
def testRecvFrom(self):
|
|
# Testing recvfrom() over UDP
|
|
msg, addr = self.serv.recvfrom(len(MSG))
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecvFrom(self):
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
|
|
|
def testRecvFromNegative(self):
|
|
# Negative lengths passed to recvfrom should give ValueError.
|
|
self.assertRaises(ValueError, self.serv.recvfrom, -1)
|
|
|
|
def _testRecvFromNegative(self):
|
|
self.cli.sendto(MSG, 0, (HOST, self.port))
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class TCPCloserTest(ThreadedTCPSocketTest):
|
|
|
|
def testClose(self):
|
|
conn, addr = self.serv.accept()
|
|
conn.close()
|
|
|
|
sd = self.cli
|
|
read, write, err = select.select([sd], [], [], 1.0)
|
|
self.assertEqual(read, [sd])
|
|
self.assertEqual(sd.recv(1), '')
|
|
|
|
def _testClose(self):
|
|
self.cli.connect((HOST, self.port))
|
|
time.sleep(1.0)
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class BasicSocketPairTest(SocketPairTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketPairTest.__init__(self, methodName=methodName)
|
|
|
|
def testRecv(self):
|
|
msg = self.serv.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecv(self):
|
|
self.cli.send(MSG)
|
|
|
|
def testSend(self):
|
|
self.serv.send(MSG)
|
|
|
|
def _testSend(self):
|
|
msg = self.cli.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class NonBlockingTCPTests(ThreadedTCPSocketTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
ThreadedTCPSocketTest.__init__(self, methodName=methodName)
|
|
|
|
def testSetBlocking(self):
|
|
# Testing whether set blocking works
|
|
self.serv.setblocking(0)
|
|
start = time.time()
|
|
try:
|
|
self.serv.accept()
|
|
except socket.error:
|
|
pass
|
|
end = time.time()
|
|
self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
|
|
|
|
def _testSetBlocking(self):
|
|
pass
|
|
|
|
def testAccept(self):
|
|
# Testing non-blocking accept
|
|
self.serv.setblocking(0)
|
|
try:
|
|
conn, addr = self.serv.accept()
|
|
except socket.error:
|
|
pass
|
|
else:
|
|
self.fail("Error trying to do non-blocking accept.")
|
|
read, write, err = select.select([self.serv], [], [])
|
|
if self.serv in read:
|
|
conn, addr = self.serv.accept()
|
|
conn.close()
|
|
else:
|
|
self.fail("Error trying to do accept after select.")
|
|
|
|
def _testAccept(self):
|
|
time.sleep(0.1)
|
|
self.cli.connect((HOST, self.port))
|
|
|
|
def testConnect(self):
|
|
# Testing non-blocking connect
|
|
conn, addr = self.serv.accept()
|
|
conn.close()
|
|
|
|
def _testConnect(self):
|
|
self.cli.settimeout(10)
|
|
self.cli.connect((HOST, self.port))
|
|
|
|
def testRecv(self):
|
|
# Testing non-blocking recv
|
|
conn, addr = self.serv.accept()
|
|
conn.setblocking(0)
|
|
try:
|
|
msg = conn.recv(len(MSG))
|
|
except socket.error:
|
|
pass
|
|
else:
|
|
self.fail("Error trying to do non-blocking recv.")
|
|
read, write, err = select.select([conn], [], [])
|
|
if conn in read:
|
|
msg = conn.recv(len(MSG))
|
|
conn.close()
|
|
self.assertEqual(msg, MSG)
|
|
else:
|
|
self.fail("Error during select call to non-blocking socket.")
|
|
|
|
def _testRecv(self):
|
|
self.cli.connect((HOST, self.port))
|
|
time.sleep(0.1)
|
|
self.cli.send(MSG)
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class FileObjectClassTestCase(SocketConnectedTest):
|
|
|
|
bufsize = -1 # Use default buffer size
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
def setUp(self):
|
|
SocketConnectedTest.setUp(self)
|
|
self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
|
|
|
|
def tearDown(self):
|
|
self.serv_file.close()
|
|
self.assertTrue(self.serv_file.closed)
|
|
self.serv_file = None
|
|
SocketConnectedTest.tearDown(self)
|
|
|
|
def clientSetUp(self):
|
|
SocketConnectedTest.clientSetUp(self)
|
|
self.cli_file = self.serv_conn.makefile('wb')
|
|
|
|
def clientTearDown(self):
|
|
self.cli_file.close()
|
|
self.assertTrue(self.cli_file.closed)
|
|
self.cli_file = None
|
|
SocketConnectedTest.clientTearDown(self)
|
|
|
|
def testSmallRead(self):
|
|
# Performing small file read test
|
|
first_seg = self.serv_file.read(len(MSG)-3)
|
|
second_seg = self.serv_file.read(3)
|
|
msg = first_seg + second_seg
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testSmallRead(self):
|
|
self.cli_file.write(MSG)
|
|
self.cli_file.flush()
|
|
|
|
def testFullRead(self):
|
|
# read until EOF
|
|
msg = self.serv_file.read()
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testFullRead(self):
|
|
self.cli_file.write(MSG)
|
|
self.cli_file.close()
|
|
|
|
def testUnbufferedRead(self):
|
|
# Performing unbuffered file read test
|
|
buf = ''
|
|
while 1:
|
|
char = self.serv_file.read(1)
|
|
if not char:
|
|
break
|
|
buf += char
|
|
self.assertEqual(buf, MSG)
|
|
|
|
def _testUnbufferedRead(self):
|
|
self.cli_file.write(MSG)
|
|
self.cli_file.flush()
|
|
|
|
def testReadline(self):
|
|
# Performing file readline test
|
|
line = self.serv_file.readline()
|
|
self.assertEqual(line, MSG)
|
|
|
|
def _testReadline(self):
|
|
self.cli_file.write(MSG)
|
|
self.cli_file.flush()
|
|
|
|
def testReadlineAfterRead(self):
|
|
a_baloo_is = self.serv_file.read(len("A baloo is"))
|
|
self.assertEqual("A baloo is", a_baloo_is)
|
|
_a_bear = self.serv_file.read(len(" a bear"))
|
|
self.assertEqual(" a bear", _a_bear)
|
|
line = self.serv_file.readline()
|
|
self.assertEqual("\n", line)
|
|
line = self.serv_file.readline()
|
|
self.assertEqual("A BALOO IS A BEAR.\n", line)
|
|
line = self.serv_file.readline()
|
|
self.assertEqual(MSG, line)
|
|
|
|
def _testReadlineAfterRead(self):
|
|
self.cli_file.write("A baloo is a bear\n")
|
|
self.cli_file.write("A BALOO IS A BEAR.\n")
|
|
self.cli_file.write(MSG)
|
|
self.cli_file.flush()
|
|
|
|
def testReadlineAfterReadNoNewline(self):
|
|
end_of_ = self.serv_file.read(len("End Of "))
|
|
self.assertEqual("End Of ", end_of_)
|
|
line = self.serv_file.readline()
|
|
self.assertEqual("Line", line)
|
|
|
|
def _testReadlineAfterReadNoNewline(self):
|
|
self.cli_file.write("End Of Line")
|
|
|
|
def testClosedAttr(self):
|
|
self.assertTrue(not self.serv_file.closed)
|
|
|
|
def _testClosedAttr(self):
|
|
self.assertTrue(not self.cli_file.closed)
|
|
|
|
|
|
class FileObjectInterruptedTestCase(unittest.TestCase):
|
|
"""Test that the file object correctly handles EINTR internally."""
|
|
|
|
class MockSocket(object):
|
|
def __init__(self, recv_funcs=()):
|
|
# A generator that returns callables that we'll call for each
|
|
# call to recv().
|
|
self._recv_step = iter(recv_funcs)
|
|
|
|
def recv(self, size):
|
|
return self._recv_step.next()()
|
|
|
|
@staticmethod
|
|
def _raise_eintr():
|
|
raise socket.error(errno.EINTR)
|
|
|
|
def _test_readline(self, size=-1, **kwargs):
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
lambda : "This is the first line\nAnd the sec",
|
|
self._raise_eintr,
|
|
lambda : "ond line is here\n",
|
|
lambda : "",
|
|
])
|
|
fo = socket._fileobject(mock_sock, **kwargs)
|
|
self.assertEqual(fo.readline(size), "This is the first line\n")
|
|
self.assertEqual(fo.readline(size), "And the second line is here\n")
|
|
|
|
def _test_read(self, size=-1, **kwargs):
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
lambda : "This is the first line\nAnd the sec",
|
|
self._raise_eintr,
|
|
lambda : "ond line is here\n",
|
|
lambda : "",
|
|
])
|
|
fo = socket._fileobject(mock_sock, **kwargs)
|
|
self.assertEqual(fo.read(size), "This is the first line\n"
|
|
"And the second line is here\n")
|
|
|
|
def test_default(self):
|
|
self._test_readline()
|
|
self._test_readline(size=100)
|
|
self._test_read()
|
|
self._test_read(size=100)
|
|
|
|
def test_with_1k_buffer(self):
|
|
self._test_readline(bufsize=1024)
|
|
self._test_readline(size=100, bufsize=1024)
|
|
self._test_read(bufsize=1024)
|
|
self._test_read(size=100, bufsize=1024)
|
|
|
|
def _test_readline_no_buffer(self, size=-1):
|
|
mock_sock = self.MockSocket(recv_funcs=[
|
|
lambda : "aa",
|
|
lambda : "\n",
|
|
lambda : "BB",
|
|
self._raise_eintr,
|
|
lambda : "bb",
|
|
lambda : "",
|
|
])
|
|
fo = socket._fileobject(mock_sock, bufsize=0)
|
|
self.assertEqual(fo.readline(size), "aa\n")
|
|
self.assertEqual(fo.readline(size), "BBbb")
|
|
|
|
def test_no_buffer(self):
|
|
self._test_readline_no_buffer()
|
|
self._test_readline_no_buffer(size=4)
|
|
self._test_read(bufsize=0)
|
|
self._test_read(size=100, bufsize=0)
|
|
|
|
|
|
class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
"""Repeat the tests from FileObjectClassTestCase with bufsize==0.
|
|
|
|
In this case (and in this case only), it should be possible to
|
|
create a file object, read a line from it, create another file
|
|
object, read another line from it, without loss of data in the
|
|
first file object's buffer. Note that httplib relies on this
|
|
when reading multiple requests from the same socket."""
|
|
|
|
bufsize = 0 # Use unbuffered mode
|
|
|
|
def testUnbufferedReadline(self):
|
|
# Read a line, create a new file object, read another line with it
|
|
line = self.serv_file.readline() # first line
|
|
self.assertEqual(line, "A. " + MSG) # first line
|
|
self.serv_file = self.cli_conn.makefile('rb', 0)
|
|
line = self.serv_file.readline() # second line
|
|
self.assertEqual(line, "B. " + MSG) # second line
|
|
|
|
def _testUnbufferedReadline(self):
|
|
self.cli_file.write("A. " + MSG)
|
|
self.cli_file.write("B. " + MSG)
|
|
self.cli_file.flush()
|
|
|
|
class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
bufsize = 1 # Default-buffered for reading; line-buffered for writing
|
|
|
|
|
|
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
|
|
|
|
bufsize = 2 # Exercise the buffering code
|
|
|
|
|
|
class NetworkConnectionTest(object):
|
|
"""Prove network connection."""
|
|
def clientSetUp(self):
|
|
# We're inherited below by BasicTCPTest2, which also inherits
|
|
# BasicTCPTest, which defines self.port referenced below.
|
|
self.cli = socket.create_connection((HOST, self.port))
|
|
self.serv_conn = self.cli
|
|
|
|
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
|
|
"""Tests that NetworkConnection does not break existing TCP functionality.
|
|
"""
|
|
|
|
class NetworkConnectionNoServer(unittest.TestCase):
|
|
class MockSocket(socket.socket):
|
|
def connect(self, *args):
|
|
raise socket.timeout('timed out')
|
|
|
|
@contextlib.contextmanager
|
|
def mocked_socket_module(self):
|
|
"""Return a socket which times out on connect"""
|
|
old_socket = socket.socket
|
|
socket.socket = self.MockSocket
|
|
try:
|
|
yield
|
|
finally:
|
|
socket.socket = old_socket
|
|
|
|
def test_connect(self):
|
|
port = test_support.find_unused_port()
|
|
cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.addCleanup(cli.close)
|
|
with self.assertRaises(socket.error) as cm:
|
|
cli.connect((HOST, port))
|
|
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
|
|
|
|
def test_create_connection(self):
|
|
# Issue #9792: errors raised by create_connection() should have
|
|
# a proper errno attribute.
|
|
port = test_support.find_unused_port()
|
|
with self.assertRaises(socket.error) as cm:
|
|
socket.create_connection((HOST, port))
|
|
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
|
|
|
|
def test_create_connection_timeout(self):
|
|
# Issue #9792: create_connection() should not recast timeout errors
|
|
# as generic socket errors.
|
|
with self.mocked_socket_module():
|
|
with self.assertRaises(socket.timeout):
|
|
socket.create_connection((HOST, 1234))
|
|
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def clientSetUp(self):
|
|
self.source_port = test_support.find_unused_port()
|
|
|
|
def clientTearDown(self):
|
|
self.cli.close()
|
|
self.cli = None
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
def _justAccept(self):
|
|
conn, addr = self.serv.accept()
|
|
conn.close()
|
|
|
|
testFamily = _justAccept
|
|
def _testFamily(self):
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30)
|
|
self.addCleanup(self.cli.close)
|
|
self.assertEqual(self.cli.family, 2)
|
|
|
|
testSourceAddress = _justAccept
|
|
def _testSourceAddress(self):
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30,
|
|
source_address=('', self.source_port))
|
|
self.addCleanup(self.cli.close)
|
|
self.assertEqual(self.cli.getsockname()[1], self.source_port)
|
|
# The port number being used is sufficient to show that the bind()
|
|
# call happened.
|
|
|
|
testTimeoutDefault = _justAccept
|
|
def _testTimeoutDefault(self):
|
|
# passing no explicit timeout uses socket's global default
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(42)
|
|
try:
|
|
self.cli = socket.create_connection((HOST, self.port))
|
|
self.addCleanup(self.cli.close)
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(self.cli.gettimeout(), 42)
|
|
|
|
testTimeoutNone = _justAccept
|
|
def _testTimeoutNone(self):
|
|
# None timeout means the same as sock.settimeout(None)
|
|
self.assertTrue(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(30)
|
|
try:
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=None)
|
|
self.addCleanup(self.cli.close)
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(self.cli.gettimeout(), None)
|
|
|
|
testTimeoutValueNamed = _justAccept
|
|
def _testTimeoutValueNamed(self):
|
|
self.cli = socket.create_connection((HOST, self.port), timeout=30)
|
|
self.assertEqual(self.cli.gettimeout(), 30)
|
|
|
|
testTimeoutValueNonamed = _justAccept
|
|
def _testTimeoutValueNonamed(self):
|
|
self.cli = socket.create_connection((HOST, self.port), 30)
|
|
self.addCleanup(self.cli.close)
|
|
self.assertEqual(self.cli.gettimeout(), 30)
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
|
|
|
|
def __init__(self, methodName='runTest'):
|
|
SocketTCPTest.__init__(self, methodName=methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def clientSetUp(self):
|
|
pass
|
|
|
|
def clientTearDown(self):
|
|
self.cli.close()
|
|
self.cli = None
|
|
ThreadableTest.clientTearDown(self)
|
|
|
|
def testInsideTimeout(self):
|
|
conn, addr = self.serv.accept()
|
|
self.addCleanup(conn.close)
|
|
time.sleep(3)
|
|
conn.send("done!")
|
|
testOutsideTimeout = testInsideTimeout
|
|
|
|
def _testInsideTimeout(self):
|
|
self.cli = sock = socket.create_connection((HOST, self.port))
|
|
data = sock.recv(5)
|
|
self.assertEqual(data, "done!")
|
|
|
|
def _testOutsideTimeout(self):
|
|
self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
|
|
self.assertRaises(socket.timeout, lambda: sock.recv(5))
|
|
|
|
|
|
class Urllib2FileobjectTest(unittest.TestCase):
|
|
|
|
# urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
|
|
# it close the socket if the close c'tor argument is true
|
|
|
|
def testClose(self):
|
|
class MockSocket:
|
|
closed = False
|
|
def flush(self): pass
|
|
def close(self): self.closed = True
|
|
|
|
# must not close unless we request it: the original use of _fileobject
|
|
# by module socket requires that the underlying socket not be closed until
|
|
# the _socketobject that created the _fileobject is closed
|
|
s = MockSocket()
|
|
f = socket._fileobject(s)
|
|
f.close()
|
|
self.assertTrue(not s.closed)
|
|
|
|
s = MockSocket()
|
|
f = socket._fileobject(s, close=True)
|
|
f.close()
|
|
self.assertTrue(s.closed)
|
|
|
|
class TCPTimeoutTest(SocketTCPTest):
|
|
|
|
def testTCPTimeout(self):
|
|
def raise_timeout(*args, **kwargs):
|
|
self.serv.settimeout(1.0)
|
|
self.serv.accept()
|
|
self.assertRaises(socket.timeout, raise_timeout,
|
|
"Error generating a timeout exception (TCP)")
|
|
|
|
def testTimeoutZero(self):
|
|
ok = False
|
|
try:
|
|
self.serv.settimeout(0.0)
|
|
foo = self.serv.accept()
|
|
except socket.timeout:
|
|
self.fail("caught timeout instead of error (TCP)")
|
|
except socket.error:
|
|
ok = True
|
|
except:
|
|
self.fail("caught unexpected exception (TCP)")
|
|
if not ok:
|
|
self.fail("accept() returned success when we did not expect it")
|
|
|
|
def testInterruptedTimeout(self):
|
|
# XXX I don't know how to do this test on MSWindows or any other
|
|
# plaform that doesn't support signal.alarm() or os.kill(), though
|
|
# the bug should have existed on all platforms.
|
|
if not hasattr(signal, "alarm"):
|
|
return # can only test on *nix
|
|
self.serv.settimeout(5.0) # must be longer than alarm
|
|
class Alarm(Exception):
|
|
pass
|
|
def alarm_handler(signal, frame):
|
|
raise Alarm
|
|
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
|
|
try:
|
|
signal.alarm(2) # POSIX allows alarm to be up to 1 second early
|
|
try:
|
|
foo = self.serv.accept()
|
|
except socket.timeout:
|
|
self.fail("caught timeout instead of Alarm")
|
|
except Alarm:
|
|
pass
|
|
except:
|
|
self.fail("caught other exception instead of Alarm:"
|
|
" %s(%s):\n%s" %
|
|
(sys.exc_info()[:2] + (traceback.format_exc(),)))
|
|
else:
|
|
self.fail("nothing caught")
|
|
finally:
|
|
signal.alarm(0) # shut off alarm
|
|
except Alarm:
|
|
self.fail("got Alarm in wrong place")
|
|
finally:
|
|
# no alarm can be pending. Safe to restore old handler.
|
|
signal.signal(signal.SIGALRM, old_alarm)
|
|
|
|
class UDPTimeoutTest(SocketUDPTest):
|
|
|
|
def testUDPTimeout(self):
|
|
def raise_timeout(*args, **kwargs):
|
|
self.serv.settimeout(1.0)
|
|
self.serv.recv(1024)
|
|
self.assertRaises(socket.timeout, raise_timeout,
|
|
"Error generating a timeout exception (UDP)")
|
|
|
|
def testTimeoutZero(self):
|
|
ok = False
|
|
try:
|
|
self.serv.settimeout(0.0)
|
|
foo = self.serv.recv(1024)
|
|
except socket.timeout:
|
|
self.fail("caught timeout instead of error (UDP)")
|
|
except socket.error:
|
|
ok = True
|
|
except:
|
|
self.fail("caught unexpected exception (UDP)")
|
|
if not ok:
|
|
self.fail("recv() returned success when we did not expect it")
|
|
|
|
class TestExceptions(unittest.TestCase):
|
|
|
|
def testExceptionTree(self):
|
|
self.assertTrue(issubclass(socket.error, Exception))
|
|
self.assertTrue(issubclass(socket.herror, socket.error))
|
|
self.assertTrue(issubclass(socket.gaierror, socket.error))
|
|
self.assertTrue(issubclass(socket.timeout, socket.error))
|
|
|
|
class TestLinuxAbstractNamespace(unittest.TestCase):
|
|
|
|
UNIX_PATH_MAX = 108
|
|
|
|
def testLinuxAbstractNamespace(self):
|
|
address = "\x00python-test-hello\x00\xff"
|
|
s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
s1.bind(address)
|
|
s1.listen(1)
|
|
s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
s2.connect(s1.getsockname())
|
|
s1.accept()
|
|
self.assertEqual(s1.getsockname(), address)
|
|
self.assertEqual(s2.getpeername(), address)
|
|
|
|
def testMaxName(self):
|
|
address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
s.bind(address)
|
|
self.assertEqual(s.getsockname(), address)
|
|
|
|
def testNameOverflow(self):
|
|
address = "\x00" + "h" * self.UNIX_PATH_MAX
|
|
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.assertRaises(socket.error, s.bind, address)
|
|
|
|
|
|
@unittest.skipUnless(thread, 'Threading required for this test.')
|
|
class BufferIOTest(SocketConnectedTest):
|
|
"""
|
|
Test the buffer versions of socket.recv() and socket.send().
|
|
"""
|
|
def __init__(self, methodName='runTest'):
|
|
SocketConnectedTest.__init__(self, methodName=methodName)
|
|
|
|
def testRecvIntoArray(self):
|
|
buf = array.array('c', ' '*1024)
|
|
nbytes = self.cli_conn.recv_into(buf)
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf.tostring()[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecvIntoArray(self):
|
|
with test_support.check_py3k_warnings():
|
|
buf = buffer(MSG)
|
|
self.serv_conn.send(buf)
|
|
|
|
def testRecvIntoBytearray(self):
|
|
buf = bytearray(1024)
|
|
nbytes = self.cli_conn.recv_into(buf)
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
_testRecvIntoBytearray = _testRecvIntoArray
|
|
|
|
def testRecvIntoMemoryview(self):
|
|
buf = bytearray(1024)
|
|
nbytes = self.cli_conn.recv_into(memoryview(buf))
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
_testRecvIntoMemoryview = _testRecvIntoArray
|
|
|
|
def testRecvFromIntoArray(self):
|
|
buf = array.array('c', ' '*1024)
|
|
nbytes, addr = self.cli_conn.recvfrom_into(buf)
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf.tostring()[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
def _testRecvFromIntoArray(self):
|
|
with test_support.check_py3k_warnings():
|
|
buf = buffer(MSG)
|
|
self.serv_conn.send(buf)
|
|
|
|
def testRecvFromIntoBytearray(self):
|
|
buf = bytearray(1024)
|
|
nbytes, addr = self.cli_conn.recvfrom_into(buf)
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
_testRecvFromIntoBytearray = _testRecvFromIntoArray
|
|
|
|
def testRecvFromIntoMemoryview(self):
|
|
buf = bytearray(1024)
|
|
nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
|
|
self.assertEqual(nbytes, len(MSG))
|
|
msg = buf[:len(MSG)]
|
|
self.assertEqual(msg, MSG)
|
|
|
|
_testRecvFromIntoMemoryview = _testRecvFromIntoArray
|
|
|
|
|
|
TIPC_STYPE = 2000
|
|
TIPC_LOWER = 200
|
|
TIPC_UPPER = 210
|
|
|
|
def isTipcAvailable():
|
|
"""Check if the TIPC module is loaded
|
|
|
|
The TIPC module is not loaded automatically on Ubuntu and probably
|
|
other Linux distros.
|
|
"""
|
|
if not hasattr(socket, "AF_TIPC"):
|
|
return False
|
|
if not os.path.isfile("/proc/modules"):
|
|
return False
|
|
with open("/proc/modules") as f:
|
|
for line in f:
|
|
if line.startswith("tipc "):
|
|
return True
|
|
if test_support.verbose:
|
|
print "TIPC module is not loaded, please 'sudo modprobe tipc'"
|
|
return False
|
|
|
|
class TIPCTest (unittest.TestCase):
|
|
def testRDM(self):
|
|
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
|
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
|
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
|
TIPC_LOWER, TIPC_UPPER)
|
|
srv.bind(srvaddr)
|
|
|
|
sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
|
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
|
cli.sendto(MSG, sendaddr)
|
|
|
|
msg, recvaddr = srv.recvfrom(1024)
|
|
|
|
self.assertEqual(cli.getsockname(), recvaddr)
|
|
self.assertEqual(msg, MSG)
|
|
|
|
|
|
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
|
|
def __init__(self, methodName = 'runTest'):
|
|
unittest.TestCase.__init__(self, methodName = methodName)
|
|
ThreadableTest.__init__(self)
|
|
|
|
def setUp(self):
|
|
self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
|
self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
|
TIPC_LOWER, TIPC_UPPER)
|
|
self.srv.bind(srvaddr)
|
|
self.srv.listen(5)
|
|
self.serverExplicitReady()
|
|
self.conn, self.connaddr = self.srv.accept()
|
|
|
|
def clientSetUp(self):
|
|
# The is a hittable race between serverExplicitReady() and the
|
|
# accept() call; sleep a little while to avoid it, otherwise
|
|
# we could get an exception
|
|
time.sleep(0.1)
|
|
self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
|
addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
|
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
|
self.cli.connect(addr)
|
|
self.cliaddr = self.cli.getsockname()
|
|
|
|
def testStream(self):
|
|
msg = self.conn.recv(1024)
|
|
self.assertEqual(msg, MSG)
|
|
self.assertEqual(self.cliaddr, self.connaddr)
|
|
|
|
def _testStream(self):
|
|
self.cli.send(MSG)
|
|
self.cli.close()
|
|
|
|
|
|
def test_main():
|
|
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
|
|
TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
|
|
UDPTimeoutTest ]
|
|
|
|
tests.extend([
|
|
NonBlockingTCPTests,
|
|
FileObjectClassTestCase,
|
|
FileObjectInterruptedTestCase,
|
|
UnbufferedFileObjectClassTestCase,
|
|
LineBufferedFileObjectClassTestCase,
|
|
SmallBufferedFileObjectClassTestCase,
|
|
Urllib2FileobjectTest,
|
|
NetworkConnectionNoServer,
|
|
NetworkConnectionAttributesTest,
|
|
NetworkConnectionBehaviourTest,
|
|
])
|
|
if hasattr(socket, "socketpair"):
|
|
tests.append(BasicSocketPairTest)
|
|
if sys.platform == 'linux2':
|
|
tests.append(TestLinuxAbstractNamespace)
|
|
if isTipcAvailable():
|
|
tests.append(TIPCTest)
|
|
tests.append(TIPCThreadableTest)
|
|
|
|
thread_info = test_support.threading_setup()
|
|
test_support.run_unittest(*tests)
|
|
test_support.threading_cleanup(*thread_info)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|