Issue #19509: Add SSLContext.check_hostname to match the peer's certificate

with server_hostname on handshake.
This commit is contained in:
Christian Heimes 2013-12-02 02:41:19 +01:00
parent 6e6429a2cd
commit 1aa9a75fbf
5 changed files with 162 additions and 6 deletions

View File

@ -773,6 +773,11 @@ SSL sockets also have the following additional methods and attributes:
Perform the SSL setup handshake.
.. versionchanged:: 3.4
The handshake method also performce :func:`match_hostname` when the
:attr:`~SSLContext.check_hostname` attribute of the socket's
:attr:`~SSLSocket.context` is true.
.. method:: SSLSocket.getpeercert(binary_form=False)
If there is no certificate for the peer on the other end of the connection,
@ -1182,6 +1187,33 @@ to speed up repeated connections from the same clients.
.. versionadded:: 3.4
.. attribute:: SSLContext.check_hostname
Wether to match the peer cert's hostname with :func:`match_hostname` in
:meth:`SSLSocket.do_handshake`. The context's
:attr:`~SSLContext.verify_mode` must be set to :data:`CERT_OPTIONAL` or
:data:`CERT_REQUIRED`, and you must pass *server_hostname* to
:meth:`~SSLContext.wrap_socket` in order to match the hostname.
Example::
import socket, ssl
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_default_certs()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssl_sock = context.wrap_socket(s, server_hostname='www.verisign.com'):
ssl_sock.connect(('www.verisign.com', 443))
.. versionadded:: 3.4
.. note::
This features requires OpenSSL 0.9.8f or newer.
.. attribute:: SSLContext.options
An integer representing the set of SSL options enabled on this context.
@ -1596,7 +1628,9 @@ Therefore, when in client mode, it is highly recommended to use
have to check that the server certificate, which can be obtained by calling
:meth:`SSLSocket.getpeercert`, matches the desired service. For many
protocols and applications, the service can be identified by the hostname;
in this case, the :func:`match_hostname` function can be used.
in this case, the :func:`match_hostname` function can be used. This common
check is automatically performed when :attr:`SSLContext.check_hostname` is
enabled.
In server mode, if you want to authenticate your clients using the SSL layer
(rather than using a higher-level authentication mechanism), you'll also have

View File

@ -148,6 +148,7 @@ if sys.platform == "win32":
from _ssl import enum_certificates, enum_crls
from socket import getnameinfo as _getnameinfo
from socket import SHUT_RDWR as _SHUT_RDWR
from socket import socket, AF_INET, SOCK_STREAM, create_connection
import base64 # for DER-to-PEM translation
import traceback
@ -235,7 +236,9 @@ def match_hostname(cert, hostname):
returns nothing.
"""
if not cert:
raise ValueError("empty or no certificate")
raise ValueError("empty or no certificate, match_hostname needs a "
"SSL socket or SSL context with either "
"CERT_OPTIONAL or CERT_REQUIRED")
dnsnames = []
san = cert.get('subjectAltName', ())
for key, value in san:
@ -387,9 +390,10 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
context.options |= getattr(_ssl, "OP_NO_COMPRESSION", 0)
# disallow ciphers with known vulnerabilities
context.set_ciphers(_RESTRICTED_CIPHERS)
# verify certs in client mode
# verify certs and host name in client mode
if purpose == Purpose.SERVER_AUTH:
context.verify_mode = CERT_REQUIRED
context.check_hostname = True
if cafile or capath or cadata:
context.load_verify_locations(cafile, capath, cadata)
elif context.verify_mode != CERT_NONE:
@ -480,6 +484,13 @@ class SSLSocket(socket):
if server_side and server_hostname:
raise ValueError("server_hostname can only be specified "
"in client mode")
if self._context.check_hostname and not server_hostname:
if HAS_SNI:
raise ValueError("check_hostname requires server_hostname")
else:
raise ValueError("check_hostname requires server_hostname, "
"but it's not supported by your OpenSSL "
"library")
self.server_side = server_side
self.server_hostname = server_hostname
self.do_handshake_on_connect = do_handshake_on_connect
@ -522,9 +533,9 @@ class SSLSocket(socket):
raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
self.do_handshake()
except OSError as x:
except (OSError, ValueError):
self.close()
raise x
raise
@property
def context(self):
@ -751,6 +762,17 @@ class SSLSocket(socket):
finally:
self.settimeout(timeout)
if self.context.check_hostname:
try:
if not self.server_hostname:
raise ValueError("check_hostname needs server_hostname "
"argument")
match_hostname(self.getpeercert(), self.server_hostname)
except Exception:
self.shutdown(_SHUT_RDWR)
self.close()
raise
def _real_connect(self, addr, connect_ex):
if self.server_side:
raise ValueError("can't connect in server-side mode")
@ -770,7 +792,7 @@ class SSLSocket(socket):
if self.do_handshake_on_connect:
self.do_handshake()
return rc
except OSError:
except (OSError, ValueError):
self._sslobj = None
raise

View File

@ -1003,6 +1003,7 @@ class ContextTests(unittest.TestCase):
ctx = ssl.create_default_context()
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertTrue(ctx.check_hostname)
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
with open(SIGNING_CA) as f:
@ -1022,6 +1023,7 @@ class ContextTests(unittest.TestCase):
ctx = ssl._create_stdlib_context()
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self.assertFalse(ctx.check_hostname)
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
@ -1040,6 +1042,28 @@ class ContextTests(unittest.TestCase):
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
def test_check_hostname(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertFalse(ctx.check_hostname)
# Requires CERT_REQUIRED or CERT_OPTIONAL
with self.assertRaises(ValueError):
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
self.assertFalse(ctx.check_hostname)
ctx.check_hostname = True
self.assertTrue(ctx.check_hostname)
ctx.verify_mode = ssl.CERT_OPTIONAL
ctx.check_hostname = True
self.assertTrue(ctx.check_hostname)
# Cannot set CERT_NONE with check_hostname enabled
with self.assertRaises(ValueError):
ctx.verify_mode = ssl.CERT_NONE
ctx.check_hostname = False
self.assertFalse(ctx.check_hostname)
class SSLErrorTests(unittest.TestCase):
@ -1930,6 +1954,44 @@ else:
cert = s.getpeercert()
self.assertTrue(cert, "Can't get peer certificate.")
def test_check_hostname(self):
if support.verbose:
sys.stdout.write("\n")
server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
server_context.load_cert_chain(SIGNED_CERTFILE)
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
context.load_verify_locations(SIGNING_CA)
# correct hostname should verify
server = ThreadedEchoServer(context=server_context, chatty=True)
with server:
with context.wrap_socket(socket.socket(),
server_hostname="localhost") as s:
s.connect((HOST, server.port))
cert = s.getpeercert()
self.assertTrue(cert, "Can't get peer certificate.")
# incorrect hostname should raise an exception
server = ThreadedEchoServer(context=server_context, chatty=True)
with server:
with context.wrap_socket(socket.socket(),
server_hostname="invalid") as s:
with self.assertRaisesRegex(ssl.CertificateError,
"hostname 'invalid' doesn't match 'localhost'"):
s.connect((HOST, server.port))
# missing server_hostname arg should cause an exception, too
server = ThreadedEchoServer(context=server_context, chatty=True)
with server:
with socket.socket() as s:
with self.assertRaisesRegex(ValueError,
"check_hostname requires server_hostname"):
context.wrap_socket(s)
def test_empty_cert(self):
"""Connecting with an empty cert file"""
bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,

View File

@ -18,6 +18,9 @@ Core and Builtins
Library
-------
- Issue #19509: Add SSLContext.check_hostname to match the peer's certificate
with server_hostname on handshake.
- Issue #15798: Fixed subprocess.Popen() to no longer fail if file
descriptor 0, 1 or 2 is closed.

View File

@ -214,6 +214,7 @@ typedef struct {
#ifndef OPENSSL_NO_TLSEXT
PyObject *set_hostname;
#endif
int check_hostname;
} PySSLContext;
typedef struct {
@ -2050,6 +2051,8 @@ context_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
#ifndef OPENSSL_NO_TLSEXT
self->set_hostname = NULL;
#endif
/* Don't check host name by default */
self->check_hostname = 0;
/* Defaults */
SSL_CTX_set_verify(self->ctx, SSL_VERIFY_NONE, NULL);
SSL_CTX_set_options(self->ctx,
@ -2231,6 +2234,12 @@ set_verify_mode(PySSLContext *self, PyObject *arg, void *c)
"invalid value for verify_mode");
return -1;
}
if (mode == SSL_VERIFY_NONE && self->check_hostname) {
PyErr_SetString(PyExc_ValueError,
"Cannot set verify_mode to CERT_NONE when "
"check_hostname is enabled.");
return -1;
}
SSL_CTX_set_verify(self->ctx, mode, NULL);
return 0;
}
@ -2304,6 +2313,30 @@ set_options(PySSLContext *self, PyObject *arg, void *c)
return 0;
}
static PyObject *
get_check_hostname(PySSLContext *self, void *c)
{
return PyBool_FromLong(self->check_hostname);
}
static int
set_check_hostname(PySSLContext *self, PyObject *arg, void *c)
{
int check_hostname;
if (!PyArg_Parse(arg, "p", &check_hostname))
return -1;
if (check_hostname &&
SSL_CTX_get_verify_mode(self->ctx) == SSL_VERIFY_NONE) {
PyErr_SetString(PyExc_ValueError,
"check_hostname needs a SSL context with either "
"CERT_OPTIONAL or CERT_REQUIRED");
return -1;
}
self->check_hostname = check_hostname;
return 0;
}
typedef struct {
PyThreadState *thread_state;
PyObject *callable;
@ -3093,6 +3126,8 @@ get_ca_certs(PySSLContext *self, PyObject *args, PyObject *kwds)
static PyGetSetDef context_getsetlist[] = {
{"check_hostname", (getter) get_check_hostname,
(setter) set_check_hostname, NULL},
{"options", (getter) get_options,
(setter) set_options, NULL},
#ifdef HAVE_OPENSSL_VERIFY_PARAM