Merged revisions 84650 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/branches/py3k

........
  r84650 | antoine.pitrou | 2010-09-09 15:31:46 +0200 (jeu., 09 sept. 2010) | 4 lines

  Use transient_internet() where appropriate in test_ssl
  (svn.python.org is sometimes unavailable)
........
This commit is contained in:
Antoine Pitrou 2010-09-09 13:33:33 +00:00
parent f77342076e
commit 78d8946d19
1 changed files with 80 additions and 82 deletions

View File

@ -103,106 +103,104 @@ class BasicTests(unittest.TestCase):
class NetworkedTests(unittest.TestCase):
def setUp(self):
self.old_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
def tearDown(self):
socket.setdefaulttimeout(self.old_timeout)
def test_connect(self):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
c = s.getpeercert()
if c:
self.fail("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
with support.transient_internet("svn.python.org"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("svn.python.org", 443))
except ssl.SSLError:
pass
finally:
c = s.getpeercert()
if c:
self.fail("Peer cert %s shouldn't be here!")
s.close()
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
s.connect(("svn.python.org", 443))
finally:
s.close()
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("svn.python.org", 443))
except ssl.SSLError:
pass
finally:
s.close()
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
s.connect(("svn.python.org", 443))
finally:
s.close()
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
ss.connect(("svn.python.org", 443))
fd = ss.fileno()
f = ss.makefile()
f.close()
# The fd is still open
os.read(fd, 0)
# Closing the SSL socket should close the fd too
ss.close()
gc.collect()
try:
with support.transient_internet("svn.python.org"):
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
ss.connect(("svn.python.org", 443))
fd = ss.fileno()
f = ss.makefile()
f.close()
# The fd is still open
os.read(fd, 0)
except OSError as e:
self.assertEqual(e.errno, errno.EBADF)
else:
self.fail("OSError wasn't raised")
# Closing the SSL socket should close the fd too
ss.close()
gc.collect()
try:
os.read(fd, 0)
except OSError as e:
self.assertEqual(e.errno, errno.EBADF)
else:
self.fail("OSError wasn't raised")
def test_non_blocking_handshake(self):
s = socket.socket(socket.AF_INET)
s.connect(("svn.python.org", 443))
s.setblocking(False)
s = ssl.wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
do_handshake_on_connect=False)
count = 0
while True:
try:
count += 1
s.do_handshake()
break
except ssl.SSLError as err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([s], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [s], [])
else:
raise
s.close()
if support.verbose:
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
with support.transient_internet("svn.python.org"):
s = socket.socket(socket.AF_INET)
s.connect(("svn.python.org", 443))
s.setblocking(False)
s = ssl.wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
do_handshake_on_connect=False)
count = 0
while True:
try:
count += 1
s.do_handshake()
break
except ssl.SSLError as err:
if err.args[0] == ssl.SSL_ERROR_WANT_READ:
select.select([s], [], [])
elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
select.select([], [s], [])
else:
raise
s.close()
if support.verbose:
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
def test_get_server_certificate(self):
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
self.fail("No server certificate on svn.python.org:443!")
with support.transient_internet("svn.python.org"):
pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem:
self.fail("No server certificate on svn.python.org:443!")
try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
if support.verbose:
sys.stdout.write("%s\n" % x)
else:
self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
self.fail("No server certificate on svn.python.org:443!")
if support.verbose:
sys.stdout.write("%s\n" % x)
else:
self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
self.fail("No server certificate on svn.python.org:443!")
if support.verbose:
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
# Test disabled: OPENSSL_VERSION* not available in Python 3.1
def test_algorithms(self):