Issue #25940: Use internal local server more in test_ssl

Move many tests from NetworkedTests and NetworkedBIOTests to a new Simple-
BackgroundTests class, using the existing ThreadedEchoServer and SIGNED_
CERTFILE infrastructure.

For tests that cause the server to crash by rejecting its certificate,
separate them into independent test methods.

Added custom root certificate to capath with the following commands:

cp Lib/test/{pycacert.pem,capath/}
# Edit copy to remove part before certificate
c_rehash -v Lib/test/capath/
c_rehash -v -old Lib/test/capath/
# Note the generated file names
cp Lib/test/capath/{pycacert.pem,b1930218.0}
mv Lib/test/capath/{pycacert.pem,ceff1710.0}

Change to pure PEM version of SIGNING_CA because PEM_cert_to_DER_cert() does
not like the extra text at the start.

Moved test_connect_ex_error() into BasicSocketTests and rewrote it to connect
to a reserved localhost port.

NetworkedTests.test_get_server_certificate_ipv6() split out because it needs
to connect to an IPv6 DNS address.

The only reference left to self-signed.pythontest.net is test_timeout_
connect_ex(), which needs a remote server to reliably time out the
connection, but does not rely on the server running SSL.

Made ThreadedEchoServer call unwrap() by default when it sees the client has
shut the connection down, so that the client can cleanly call unwrap().
This commit is contained in:
Martin Panter 2016-03-27 01:53:46 +00:00
parent 2c257ab0f8
commit 3840b2ac67
6 changed files with 380 additions and 377 deletions

View File

@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIClTCCAf6gAwIBAgIJAKGU95wKR8pTMA0GCSqGSIb3DQEBBQUAMHAxCzAJBgNV
BAYTAlhZMRcwFQYDVQQHDA5DYXN0bGUgQW50aHJheDEjMCEGA1UECgwaUHl0aG9u
IFNvZnR3YXJlIEZvdW5kYXRpb24xIzAhBgNVBAMMGnNlbGYtc2lnbmVkLnB5dGhv
bnRlc3QubmV0MB4XDTE0MTEwMjE4MDkyOVoXDTI0MTAzMDE4MDkyOVowcDELMAkG
A1UEBhMCWFkxFzAVBgNVBAcMDkNhc3RsZSBBbnRocmF4MSMwIQYDVQQKDBpQeXRo
b24gU29mdHdhcmUgRm91bmRhdGlvbjEjMCEGA1UEAwwac2VsZi1zaWduZWQucHl0
aG9udGVzdC5uZXQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBANDXQXW9tjyZ
Xt0Iv2tLL1+jinr4wGg36ioLDLFkMf+2Y1GL0v0BnKYG4N1OKlAU15LXGeGer8vm
Sv/yIvmdrELvhAbbo3w4a9TMYQA4XkIVLdvu3mvNOAet+8PMJxn26dbDhG809ALv
EHY57lQsBS3G59RZyBPVqAqmImWNJnVzAgMBAAGjNzA1MCUGA1UdEQQeMByCGnNl
bGYtc2lnbmVkLnB5dGhvbnRlc3QubmV0MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcN
AQEFBQADgYEAIuzAhgMouJpNdf3URCHIineyoSt6WK/9+eyUcjlKOrDoXNZaD72h
TXMeKYoWvJyVcSLKL8ckPtDobgP2OTt0UkyAaj0n+ZHaqq1lH2yVfGUA1ILJv515
C8BqbvVZuqm3i7ygmw3bqE/lYMgOrYtXXnqOrz6nvsE6Yc9V9rFflOM=
-----END CERTIFICATE-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDbTCCAlWgAwIBAgIJALCSZLHy2iHQMA0GCSqGSIb3DQEBBQUAME0xCzAJBgNV
BAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUgRm91bmRhdGlvbiBDQTEW
MBQGA1UEAwwNb3VyLWNhLXNlcnZlcjAeFw0xMzAxMDQxOTQ3MDdaFw0yMzAxMDIx
OTQ3MDdaME0xCzAJBgNVBAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUg
Rm91bmRhdGlvbiBDQTEWMBQGA1UEAwwNb3VyLWNhLXNlcnZlcjCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBAOfe6eMMnwC2of0rW5bSb8zgvoa5IF7sA3pV
q+qk6flJhdJm1e3HeupWji2P50LiYiipn9Ybjuu1tJyfFKvf5pSLdh0+bSRh7Qy/
AIphDN9cyDZzFgDNR7ptpKR0iIMjChn8Cac8SkvT5x0t5OpMVCHzJtuJNxjUArtA
Ml+k/y0c99S77I7PXIKs5nwIbEiFYQd/JeBc4Lw0X+C5BEd1yEcLjbzWyGhfM4Ni
0iBENbGtgRqKzbw1sFyLR9YY6ZwYl8wBPCnM6B7k5MG43ufCERiHWpM02KYl9xRx
6+QhotIPLi7UYgA109bvXGBLTKkU4t0VWEY3Mya35y5d7ULkxU0CAwEAAaNQME4w
HQYDVR0OBBYEFLzdYtl22hvSVGvP4GabHh57VgwLMB8GA1UdIwQYMBaAFLzdYtl2
2hvSVGvP4GabHh57VgwLMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
AH0K9cuN0129mY74Kw+668LZpidPLnsvDmTYHDVQTu78kLmNbajFxgawr/Mtvzu4
QgfdGH1tlVRXhRhgRy/reBv56Bf9Wg2HFyisTGrmvCn09FVwKULeheqrbCMGZDB1
Ao5TvF4BMzfMHs24pP3K5F9lO4MchvFVAqA6j9uRt0AUtOeN0u5zuuPlNC28lG9O
JAb3X4sOp45r3l519DKaULFEM5rQBeJ4gv/b2opj66nd0b+gYa3jnookXWIO50yR
f+/fNDY7L131hLIvxG2TlhpvMCjx2hKaZLRAMx293itTqOq+1rxOlvVE+zIYrtUf
9mmvtk57HVjsO6lTo15YyJ4=
-----END CERTIFICATE-----

View File

@ -1,16 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIClTCCAf6gAwIBAgIJAKGU95wKR8pTMA0GCSqGSIb3DQEBBQUAMHAxCzAJBgNV
BAYTAlhZMRcwFQYDVQQHDA5DYXN0bGUgQW50aHJheDEjMCEGA1UECgwaUHl0aG9u
IFNvZnR3YXJlIEZvdW5kYXRpb24xIzAhBgNVBAMMGnNlbGYtc2lnbmVkLnB5dGhv
bnRlc3QubmV0MB4XDTE0MTEwMjE4MDkyOVoXDTI0MTAzMDE4MDkyOVowcDELMAkG
A1UEBhMCWFkxFzAVBgNVBAcMDkNhc3RsZSBBbnRocmF4MSMwIQYDVQQKDBpQeXRo
b24gU29mdHdhcmUgRm91bmRhdGlvbjEjMCEGA1UEAwwac2VsZi1zaWduZWQucHl0
aG9udGVzdC5uZXQwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBANDXQXW9tjyZ
Xt0Iv2tLL1+jinr4wGg36ioLDLFkMf+2Y1GL0v0BnKYG4N1OKlAU15LXGeGer8vm
Sv/yIvmdrELvhAbbo3w4a9TMYQA4XkIVLdvu3mvNOAet+8PMJxn26dbDhG809ALv
EHY57lQsBS3G59RZyBPVqAqmImWNJnVzAgMBAAGjNzA1MCUGA1UdEQQeMByCGnNl
bGYtc2lnbmVkLnB5dGhvbnRlc3QubmV0MAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcN
AQEFBQADgYEAIuzAhgMouJpNdf3URCHIineyoSt6WK/9+eyUcjlKOrDoXNZaD72h
TXMeKYoWvJyVcSLKL8ckPtDobgP2OTt0UkyAaj0n+ZHaqq1lH2yVfGUA1ILJv515
C8BqbvVZuqm3i7ygmw3bqE/lYMgOrYtXXnqOrz6nvsE6Yc9V9rFflOM=
-----END CERTIFICATE-----

View File

@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDbTCCAlWgAwIBAgIJALCSZLHy2iHQMA0GCSqGSIb3DQEBBQUAME0xCzAJBgNV
BAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUgRm91bmRhdGlvbiBDQTEW
MBQGA1UEAwwNb3VyLWNhLXNlcnZlcjAeFw0xMzAxMDQxOTQ3MDdaFw0yMzAxMDIx
OTQ3MDdaME0xCzAJBgNVBAYTAlhZMSYwJAYDVQQKDB1QeXRob24gU29mdHdhcmUg
Rm91bmRhdGlvbiBDQTEWMBQGA1UEAwwNb3VyLWNhLXNlcnZlcjCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBAOfe6eMMnwC2of0rW5bSb8zgvoa5IF7sA3pV
q+qk6flJhdJm1e3HeupWji2P50LiYiipn9Ybjuu1tJyfFKvf5pSLdh0+bSRh7Qy/
AIphDN9cyDZzFgDNR7ptpKR0iIMjChn8Cac8SkvT5x0t5OpMVCHzJtuJNxjUArtA
Ml+k/y0c99S77I7PXIKs5nwIbEiFYQd/JeBc4Lw0X+C5BEd1yEcLjbzWyGhfM4Ni
0iBENbGtgRqKzbw1sFyLR9YY6ZwYl8wBPCnM6B7k5MG43ufCERiHWpM02KYl9xRx
6+QhotIPLi7UYgA109bvXGBLTKkU4t0VWEY3Mya35y5d7ULkxU0CAwEAAaNQME4w
HQYDVR0OBBYEFLzdYtl22hvSVGvP4GabHh57VgwLMB8GA1UdIwQYMBaAFLzdYtl2
2hvSVGvP4GabHh57VgwLMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADggEB
AH0K9cuN0129mY74Kw+668LZpidPLnsvDmTYHDVQTu78kLmNbajFxgawr/Mtvzu4
QgfdGH1tlVRXhRhgRy/reBv56Bf9Wg2HFyisTGrmvCn09FVwKULeheqrbCMGZDB1
Ao5TvF4BMzfMHs24pP3K5F9lO4MchvFVAqA6j9uRt0AUtOeN0u5zuuPlNC28lG9O
JAb3X4sOp45r3l519DKaULFEM5rQBeJ4gv/b2opj66nd0b+gYa3jnookXWIO50yR
f+/fNDY7L131hLIvxG2TlhpvMCjx2hKaZLRAMx293itTqOq+1rxOlvVE+zIYrtUf
9mmvtk57HVjsO6lTo15YyJ4=
-----END CERTIFICATE-----

View File

@ -21,6 +21,13 @@ import functools
ssl = support.import_module("ssl")
try:
import threading
except ImportError:
_have_threads = False
else:
_have_threads = True
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
@ -53,10 +60,10 @@ CRLFILE = data_file("revocation.crl")
# Two keys and certs signed by the same CA (for SNI tests)
SIGNED_CERTFILE = data_file("keycert3.pem")
SIGNED_CERTFILE2 = data_file("keycert4.pem")
SIGNING_CA = data_file("pycacert.pem")
# Same certificate as pycacert.pem, but without extra text in file
SIGNING_CA = data_file("capath", "ceff1710.0")
REMOTE_HOST = "self-signed.pythontest.net"
REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
EMPTYCERT = data_file("nullcert.pem")
BADCERT = data_file("badcert.pem")
@ -783,6 +790,22 @@ class BasicSocketTests(unittest.TestCase):
self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0)
self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT")
def test_connect_ex_error(self):
server = socket.socket(socket.AF_INET)
self.addCleanup(server.close)
port = support.bind_port(server) # Reserve port but don't listen
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.addCleanup(s.close)
rc = s.connect_ex((HOST, port))
# Issue #19919: Windows machines or VMs hosted on Windows
# machines sometimes return EWOULDBLOCK.
errors = (
errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
errno.EWOULDBLOCK,
)
self.assertIn(rc, errors)
class ContextTests(unittest.TestCase):
@ -1368,140 +1391,103 @@ class MemoryBIOTests(unittest.TestCase):
self.assertRaises(TypeError, bio.write, 1)
class NetworkedTests(unittest.TestCase):
@unittest.skipUnless(_have_threads, "Needs threading module")
class SimpleBackgroundTests(unittest.TestCase):
"""Tests that connect to a simple server running in the background"""
def setUp(self):
server = ThreadedEchoServer(SIGNED_CERTFILE)
self.server_addr = (HOST, server.port)
server.__enter__()
self.addCleanup(server.__exit__, None, None, None)
def test_connect(self):
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
try:
s.connect((REMOTE_HOST, 443))
self.assertEqual({}, s.getpeercert())
finally:
s.close()
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE) as s:
s.connect(self.server_addr)
self.assertEqual({}, s.getpeercert())
# this should fail because we have no verification certs
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
s.connect, (REMOTE_HOST, 443))
s.close()
# this should succeed because we specify the root cert
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA) as s:
s.connect(self.server_addr)
self.assertTrue(s.getpeercert())
# this should succeed because we specify the root cert
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=REMOTE_ROOT_CERT)
try:
s.connect((REMOTE_HOST, 443))
self.assertTrue(s.getpeercert())
finally:
s.close()
def test_connect_fail(self):
# This should fail because we have no verification certs. Connection
# failure crashes ThreadedEchoServer, so run this in an independent
# test method.
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
self.addCleanup(s.close)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
s.connect, self.server_addr)
def test_connect_ex(self):
# Issue #11326: check connect_ex() implementation
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=REMOTE_ROOT_CERT)
try:
self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
self.assertTrue(s.getpeercert())
finally:
s.close()
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA)
self.addCleanup(s.close)
self.assertEqual(0, s.connect_ex(self.server_addr))
self.assertTrue(s.getpeercert())
def test_non_blocking_connect_ex(self):
# Issue #11326: non-blocking connect_ex() should allow handshake
# to proceed after the socket gets ready.
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=REMOTE_ROOT_CERT,
do_handshake_on_connect=False)
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SIGNING_CA,
do_handshake_on_connect=False)
self.addCleanup(s.close)
s.setblocking(False)
rc = s.connect_ex(self.server_addr)
# EWOULDBLOCK under Windows, EINPROGRESS elsewhere
self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
# Wait for connect to finish
select.select([], [s], [], 5.0)
# Non-blocking handshake
while True:
try:
s.setblocking(False)
rc = s.connect_ex((REMOTE_HOST, 443))
# EWOULDBLOCK under Windows, EINPROGRESS elsewhere
self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
# Wait for connect to finish
s.do_handshake()
break
except ssl.SSLWantReadError:
select.select([s], [], [], 5.0)
except ssl.SSLWantWriteError:
select.select([], [s], [], 5.0)
# Non-blocking handshake
while True:
try:
s.do_handshake()
break
except ssl.SSLWantReadError:
select.select([s], [], [], 5.0)
except ssl.SSLWantWriteError:
select.select([], [s], [], 5.0)
# SSL established
self.assertTrue(s.getpeercert())
finally:
s.close()
def test_timeout_connect_ex(self):
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=REMOTE_ROOT_CERT,
do_handshake_on_connect=False)
try:
s.settimeout(0.0000001)
rc = s.connect_ex((REMOTE_HOST, 443))
if rc == 0:
self.skipTest("REMOTE_HOST responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
finally:
s.close()
def test_connect_ex_error(self):
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=REMOTE_ROOT_CERT)
try:
rc = s.connect_ex((REMOTE_HOST, 444))
# Issue #19919: Windows machines or VMs hosted on Windows
# machines sometimes return EWOULDBLOCK.
errors = (
errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
errno.EWOULDBLOCK,
)
self.assertIn(rc, errors)
finally:
s.close()
# SSL established
self.assertTrue(s.getpeercert())
def test_connect_with_context(self):
with support.transient_internet(REMOTE_HOST):
# Same as test_connect, but with a separately created context
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect((REMOTE_HOST, 443))
try:
self.assertEqual({}, s.getpeercert())
finally:
s.close()
# Same with a server hostname
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname=REMOTE_HOST)
s.connect((REMOTE_HOST, 443))
s.close()
# This should fail because we have no verification certs
ctx.verify_mode = ssl.CERT_REQUIRED
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
s.connect, (REMOTE_HOST, 443))
s.close()
# This should succeed because we specify the root cert
ctx.load_verify_locations(REMOTE_ROOT_CERT)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
# Same as test_connect, but with a separately created context
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
self.assertEqual({}, s.getpeercert())
# Same with a server hostname
with ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname="dummy") as s:
s.connect(self.server_addr)
ctx.verify_mode = ssl.CERT_REQUIRED
# This should succeed because we specify the root cert
ctx.load_verify_locations(SIGNING_CA)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
def test_connect_with_context_fail(self):
# This should fail because we have no verification certs. Connection
# failure crashes ThreadedEchoServer, so run this in an independent
# test method.
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
self.addCleanup(s.close)
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
s.connect, self.server_addr)
def test_connect_capath(self):
# Verify server certificates using the `capath` argument
@ -1509,198 +1495,130 @@ class NetworkedTests(unittest.TestCase):
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
# contain both versions of each certificate (same content, different
# filename) for this test to be portable across OpenSSL releases.
with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
# Same with a bytes `capath` argument
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=BYTES_CAPATH)
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
# Same with a bytes `capath` argument
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=BYTES_CAPATH)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
def test_connect_cadata(self):
with open(REMOTE_ROOT_CERT) as f:
with open(SIGNING_CA) as f:
pem = f.read()
der = ssl.PEM_cert_to_DER_cert(pem)
with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=pem)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect((REMOTE_HOST, 443))
cert = s.getpeercert()
self.assertTrue(cert)
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=pem)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
# same with DER
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=der)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect((REMOTE_HOST, 443))
cert = s.getpeercert()
self.assertTrue(cert)
# same with DER
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cadata=der)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
# delay closing the underlying "real socket" (here tested with its
# file descriptor, hence skipping the test under Windows).
with support.transient_internet(REMOTE_HOST):
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
ss.connect((REMOTE_HOST, 443))
fd = ss.fileno()
f = ss.makefile()
f.close()
# The fd is still open
ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
ss.connect(self.server_addr)
fd = ss.fileno()
f = ss.makefile()
f.close()
# The fd is still open
os.read(fd, 0)
# Closing the SSL socket should close the fd too
ss.close()
gc.collect()
with self.assertRaises(OSError) as e:
os.read(fd, 0)
# Closing the SSL socket should close the fd too
ss.close()
gc.collect()
with self.assertRaises(OSError) as e:
os.read(fd, 0)
self.assertEqual(e.exception.errno, errno.EBADF)
self.assertEqual(e.exception.errno, errno.EBADF)
def test_non_blocking_handshake(self):
with support.transient_internet(REMOTE_HOST):
s = socket.socket(socket.AF_INET)
s.connect((REMOTE_HOST, 443))
s.setblocking(False)
s = ssl.wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
do_handshake_on_connect=False)
count = 0
while True:
try:
count += 1
s.do_handshake()
break
except ssl.SSLWantReadError:
select.select([s], [], [])
except ssl.SSLWantWriteError:
select.select([], [s], [])
s.close()
if support.verbose:
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
s = socket.socket(socket.AF_INET)
s.connect(self.server_addr)
s.setblocking(False)
s = ssl.wrap_socket(s,
cert_reqs=ssl.CERT_NONE,
do_handshake_on_connect=False)
self.addCleanup(s.close)
count = 0
while True:
try:
count += 1
s.do_handshake()
break
except ssl.SSLWantReadError:
select.select([s], [], [])
except ssl.SSLWantWriteError:
select.select([], [s], [])
if support.verbose:
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
def test_get_server_certificate(self):
def _test_get_server_certificate(host, port, cert=None):
with support.transient_internet(host):
pem = ssl.get_server_certificate((host, port))
if not pem:
self.fail("No server certificate on %s:%s!" % (host, port))
_test_get_server_certificate(self, *self.server_addr, cert=SIGNING_CA)
try:
pem = ssl.get_server_certificate((host, port),
ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
if support.verbose:
sys.stdout.write("%s\n" % x)
else:
self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
pem = ssl.get_server_certificate((host, port),
ca_certs=cert)
if not pem:
self.fail("No server certificate on %s:%s!" % (host, port))
if support.verbose:
sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
_test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
if support.IPV6_ENABLED:
_test_get_server_certificate('ipv6.google.com', 443)
def test_get_server_certificate_fail(self):
# Connection failure crashes ThreadedEchoServer, so run this in an
# independent test method
_test_get_server_certificate_fail(self, *self.server_addr)
def test_ciphers(self):
remote = (REMOTE_HOST, 443)
with support.transient_internet(remote[0]):
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
s.connect(remote)
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
s.connect(remote)
# Error checking can happen at instantiation or when connecting
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
with socket.socket(socket.AF_INET) as sock:
s = ssl.wrap_socket(sock,
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
s.connect(remote)
def test_algorithms(self):
# Issue #8484: all algorithms should be available when verifying a
# certificate.
# SHA256 was added in OpenSSL 0.9.8
if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
# sha256.tbs-internet.com needs SNI to use the correct certificate
if not ssl.HAS_SNI:
self.skipTest("SNI needed for this test")
# https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
remote = ("sha256.tbs-internet.com", 443)
sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
with support.transient_internet("sha256.tbs-internet.com"):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(sha256_cert)
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname="sha256.tbs-internet.com")
try:
s.connect(remote)
if support.verbose:
sys.stdout.write("\nCipher with %r is %r\n" %
(remote, s.cipher()))
sys.stdout.write("Certificate is:\n%s\n" %
pprint.pformat(s.getpeercert()))
finally:
s.close()
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
s.connect(self.server_addr)
with ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s:
s.connect(self.server_addr)
# Error checking can happen at instantiation or when connecting
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"):
with socket.socket(socket.AF_INET) as sock:
s = ssl.wrap_socket(sock,
cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
s.connect(self.server_addr)
def test_get_ca_certs_capath(self):
# capath certs are loaded on request
with support.transient_internet(REMOTE_HOST):
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
self.assertEqual(ctx.get_ca_certs(), [])
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
s.connect((REMOTE_HOST, 443))
try:
cert = s.getpeercert()
self.assertTrue(cert)
finally:
s.close()
self.assertEqual(len(ctx.get_ca_certs()), 1)
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(capath=CAPATH)
self.assertEqual(ctx.get_ca_certs(), [])
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr)
cert = s.getpeercert()
self.assertTrue(cert)
self.assertEqual(len(ctx.get_ca_certs()), 1)
@needs_sni
def test_context_setget(self):
# Check that the context of a connected socket can be replaced.
with support.transient_internet(REMOTE_HOST):
ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = socket.socket(socket.AF_INET)
with ctx1.wrap_socket(s) as ss:
ss.connect((REMOTE_HOST, 443))
self.assertIs(ss.context, ctx1)
self.assertIs(ss._sslobj.context, ctx1)
ss.context = ctx2
self.assertIs(ss.context, ctx2)
self.assertIs(ss._sslobj.context, ctx2)
class NetworkedBIOTests(unittest.TestCase):
ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
s = socket.socket(socket.AF_INET)
with ctx1.wrap_socket(s) as ss:
ss.connect(self.server_addr)
self.assertIs(ss.context, ctx1)
self.assertIs(ss._sslobj.context, ctx1)
ss.context = ctx2
self.assertIs(ss.context, ctx2)
self.assertIs(ss._sslobj.context, ctx2)
def ssl_io_loop(self, sock, incoming, outgoing, func, *args, **kwargs):
# A simple IO loop. Call func(*args) depending on the error we get
@ -1736,64 +1654,128 @@ class NetworkedBIOTests(unittest.TestCase):
% (count, func.__name__))
return ret
def test_handshake(self):
with support.transient_internet(REMOTE_HOST):
sock = socket.socket(socket.AF_INET)
sock.connect((REMOTE_HOST, 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(REMOTE_ROOT_CERT)
ctx.check_hostname = True
sslobj = ctx.wrap_bio(incoming, outgoing, False, REMOTE_HOST)
self.assertIs(sslobj._sslobj.owner, sslobj)
self.assertIsNone(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique'))
try:
self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
except ssl.SSLSyscallError:
# self-signed.pythontest.net probably shuts down the TCP
# connection without sending a secure shutdown message, and
# this is reported as SSL_ERROR_SYSCALL
pass
self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
sock.close()
def test_read_write_data(self):
with support.transient_internet(REMOTE_HOST):
sock = socket.socket(socket.AF_INET)
sock.connect((REMOTE_HOST, 443))
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_NONE
sslobj = ctx.wrap_bio(incoming, outgoing, False)
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
req = b'GET / HTTP/1.0\r\n\r\n'
self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
self.assertEqual(buf[:5], b'HTTP/')
def test_bio_handshake(self):
sock = socket.socket(socket.AF_INET)
self.addCleanup(sock.close)
sock.connect(self.server_addr)
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(SIGNING_CA)
ctx.check_hostname = True
sslobj = ctx.wrap_bio(incoming, outgoing, False, 'localhost')
self.assertIs(sslobj._sslobj.owner, sslobj)
self.assertIsNone(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertRaises(ValueError, sslobj.getpeercert)
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertIsNone(sslobj.get_channel_binding('tls-unique'))
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
self.assertTrue(sslobj.cipher())
self.assertIsNone(sslobj.shared_ciphers())
self.assertTrue(sslobj.getpeercert())
if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES:
self.assertTrue(sslobj.get_channel_binding('tls-unique'))
try:
self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
sock.close()
except ssl.SSLSyscallError:
# If the server shuts down the TCP connection without sending a
# secure shutdown message, this is reported as SSL_ERROR_SYSCALL
pass
self.assertRaises(ssl.SSLError, sslobj.write, b'foo')
def test_bio_read_write_data(self):
sock = socket.socket(socket.AF_INET)
self.addCleanup(sock.close)
sock.connect(self.server_addr)
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.verify_mode = ssl.CERT_NONE
sslobj = ctx.wrap_bio(incoming, outgoing, False)
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
req = b'FOO\n'
self.ssl_io_loop(sock, incoming, outgoing, sslobj.write, req)
buf = self.ssl_io_loop(sock, incoming, outgoing, sslobj.read, 1024)
self.assertEqual(buf, b'foo\n')
self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap)
try:
import threading
except ImportError:
_have_threads = False
else:
_have_threads = True
class NetworkedTests(unittest.TestCase):
def test_timeout_connect_ex(self):
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
with support.transient_internet(REMOTE_HOST):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
do_handshake_on_connect=False)
self.addCleanup(s.close)
s.settimeout(0.0000001)
rc = s.connect_ex((REMOTE_HOST, 443))
if rc == 0:
self.skipTest("REMOTE_HOST responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
@unittest.skipUnless(support.IPV6_ENABLED, 'Needs IPv6')
def test_get_server_certificate_ipv6(self):
with support.transient_internet('ipv6.google.com'):
_test_get_server_certificate(self, 'ipv6.google.com', 443)
_test_get_server_certificate_fail(self, 'ipv6.google.com', 443)
def test_algorithms(self):
# Issue #8484: all algorithms should be available when verifying a
# certificate.
# SHA256 was added in OpenSSL 0.9.8
if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
# sha256.tbs-internet.com needs SNI to use the correct certificate
if not ssl.HAS_SNI:
self.skipTest("SNI needed for this test")
# https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
remote = ("sha256.tbs-internet.com", 443)
sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
with support.transient_internet("sha256.tbs-internet.com"):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(sha256_cert)
s = ctx.wrap_socket(socket.socket(socket.AF_INET),
server_hostname="sha256.tbs-internet.com")
try:
s.connect(remote)
if support.verbose:
sys.stdout.write("\nCipher with %r is %r\n" %
(remote, s.cipher()))
sys.stdout.write("Certificate is:\n%s\n" %
pprint.pformat(s.getpeercert()))
finally:
s.close()
def _test_get_server_certificate(test, host, port, cert=None):
pem = ssl.get_server_certificate((host, port))
if not pem:
test.fail("No server certificate on %s:%s!" % (host, port))
pem = ssl.get_server_certificate((host, port), ca_certs=cert)
if not pem:
test.fail("No server certificate on %s:%s!" % (host, port))
if support.verbose:
sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
def _test_get_server_certificate_fail(test, host, port):
try:
pem = ssl.get_server_certificate((host, port), ca_certs=CERTFILE)
except ssl.SSLError as x:
#should fail
if support.verbose:
sys.stdout.write("%s\n" % x)
else:
test.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
if _have_threads:
from test.ssl_servers import make_https_server
class ThreadedEchoServer(threading.Thread):
@ -1881,6 +1863,15 @@ else:
if not stripped:
# eof, so quit this handler
self.running = False
try:
self.sock = self.sslconn.unwrap()
except OSError:
# Many tests shut the TCP connection down
# without an SSL shutdown. This causes
# unwrap() to raise OSError with errno=0!
pass
else:
self.sslconn = None
self.close()
elif stripped == b'over':
if support.verbose and self.server.connectionchatty:
@ -3342,18 +3333,20 @@ def test_main(verbose=False):
pass
for filename in [
CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
CERTFILE, BYTES_CERTFILE,
ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
BADCERT, BADKEY, EMPTYCERT]:
if not os.path.exists(filename):
raise support.TestFailed("Can't read certificate file %r" % filename)
tests = [ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests]
tests = [
ContextTests, BasicSocketTests, SSLErrorTests, MemoryBIOTests,
SimpleBackgroundTests,
]
if support.is_resource_enabled('network'):
tests.append(NetworkedTests)
tests.append(NetworkedBIOTests)
if _have_threads:
thread_info = support.threading_setup()

View File

@ -873,7 +873,7 @@ Tests
- Issue #26325: Added test.support.check_no_resource_warning() to check that
no ResourceWarning is emitted.
- Issue #25940: Changed test_ssl to use self-signed.pythontest.net. This
- Issue #25940: Changed test_ssl to use its internal local server more. This
avoids relying on svn.python.org, which recently changed root certificate.
- Issue #25616: Tests for OrderedDict are extracted from test_collections