mirror of https://github.com/python/cpython
bpo-43880: Show DeprecationWarnings for deprecated ssl module features (GH-25455)
* ssl.OP_NO_SSLv2 * ssl.OP_NO_SSLv3 * ssl.OP_NO_TLSv1 * ssl.OP_NO_TLSv1_1 * ssl.OP_NO_TLSv1_2 * ssl.OP_NO_TLSv1_3 * ssl.PROTOCOL_SSLv2 * ssl.PROTOCOL_SSLv3 * ssl.PROTOCOL_SSLv23 (alias for PROTOCOL_TLS) * ssl.PROTOCOL_TLS * ssl.PROTOCOL_TLSv1 * ssl.PROTOCOL_TLSv1_1 * ssl.PROTOCOL_TLSv1_2 * ssl.TLSVersion.SSLv3 * ssl.TLSVersion.TLSv1 * ssl.TLSVersion.TLSv1_1 * ssl.wrap_socket() * ssl.RAND_pseudo_bytes() * ssl.RAND_egd() (already removed since it's not supported by OpenSSL 1.1.1) * ssl.SSLContext() without a protocol argument * ssl.match_hostname() * hashlib.pbkdf2_hmac() (pure Python implementation, fast OpenSSL function will stay) Signed-off-by: Christian Heimes <christian@python.org>
This commit is contained in:
parent
89d1550d14
commit
2875c603b2
|
@ -266,6 +266,12 @@ include a `salt <https://en.wikipedia.org/wiki/Salt_%28cryptography%29>`_.
|
|||
Python implementation uses an inline version of :mod:`hmac`. It is about
|
||||
three times slower and doesn't release the GIL.
|
||||
|
||||
.. deprecated:: 3.10
|
||||
|
||||
Slow Python implementation of *pbkdf2_hmac* is deprecated. In the
|
||||
future the function will only be available when Python is compiled
|
||||
with OpenSSL.
|
||||
|
||||
.. function:: scrypt(password, *, salt, n, r, p, maxmem=0, dklen=64)
|
||||
|
||||
The function provides scrypt password-based key derivation function as
|
||||
|
|
|
@ -25,8 +25,8 @@ probably additional platforms, as long as OpenSSL is installed on that platform.
|
|||
|
||||
Some behavior may be platform dependent, since calls are made to the
|
||||
operating system socket APIs. The installed version of OpenSSL may also
|
||||
cause variations in behavior. For example, TLSv1.1 and TLSv1.2 come with
|
||||
openssl version 1.0.1.
|
||||
cause variations in behavior. For example, TLSv1.3 with OpenSSL version
|
||||
1.1.1.
|
||||
|
||||
.. warning::
|
||||
Don't use this module without reading the :ref:`ssl-security`. Doing so
|
||||
|
@ -63,6 +63,8 @@ by SSL sockets created through the :meth:`SSLContext.wrap_socket` method.
|
|||
:pep:`644` has been implemented. The ssl module requires OpenSSL 1.1.1
|
||||
or newer.
|
||||
|
||||
Use of deprecated constants and functions result in deprecation warnings.
|
||||
|
||||
|
||||
Functions, Constants, and Exceptions
|
||||
------------------------------------
|
||||
|
@ -136,8 +138,9 @@ purposes.
|
|||
:const:`None`, this function can choose to trust the system's default
|
||||
CA certificates instead.
|
||||
|
||||
The settings are: :data:`PROTOCOL_TLS`, :data:`OP_NO_SSLv2`, and
|
||||
:data:`OP_NO_SSLv3` with high encryption cipher suites without RC4 and
|
||||
The settings are: :data:`PROTOCOL_TLS_CLIENT` or
|
||||
:data:`PROTOCOL_TLS_SERVER`, :data:`OP_NO_SSLv2`, and :data:`OP_NO_SSLv3`
|
||||
with high encryption cipher suites without RC4 and
|
||||
without unauthenticated cipher suites. Passing :data:`~Purpose.SERVER_AUTH`
|
||||
as *purpose* sets :data:`~SSLContext.verify_mode` to :data:`CERT_REQUIRED`
|
||||
and either loads CA certificates (when at least one of *cafile*, *capath* or
|
||||
|
@ -185,6 +188,12 @@ purposes.
|
|||
|
||||
Support for key logging to :envvar:`SSLKEYLOGFILE` was added.
|
||||
|
||||
.. versionchanged:: 3.10
|
||||
|
||||
The context now uses :data:`PROTOCOL_TLS_CLIENT` or
|
||||
:data:`PROTOCOL_TLS_SERVER` protocol instead of generic
|
||||
:data:`PROTOCOL_TLS`.
|
||||
|
||||
|
||||
Exceptions
|
||||
^^^^^^^^^^
|
||||
|
@ -417,7 +426,7 @@ Certificate handling
|
|||
previously. Return an integer (no fractions of a second in the
|
||||
input format)
|
||||
|
||||
.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS, ca_certs=None)
|
||||
.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None)
|
||||
|
||||
Given the address ``addr`` of an SSL-protected server, as a (*hostname*,
|
||||
*port-number*) pair, fetches the server's certificate, and returns it as a
|
||||
|
@ -654,6 +663,8 @@ Constants
|
|||
|
||||
.. versionadded:: 3.6
|
||||
|
||||
.. deprecated:: 3.10
|
||||
|
||||
.. data:: PROTOCOL_TLS_CLIENT
|
||||
|
||||
Auto-negotiate the highest protocol version like :data:`PROTOCOL_TLS`,
|
||||
|
@ -707,7 +718,10 @@ Constants
|
|||
.. deprecated:: 3.6
|
||||
|
||||
OpenSSL has deprecated all version specific protocols. Use the default
|
||||
protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
|
||||
protocol :data:`PROTOCOL_TLS_SERVER` or :data:`PROTOCOL_TLS_CLIENT`
|
||||
with :attr:`SSLContext.minimum_version` and
|
||||
:attr:`SSLContext.maximum_version` instead.
|
||||
|
||||
|
||||
.. data:: PROTOCOL_TLSv1
|
||||
|
||||
|
@ -715,8 +729,7 @@ Constants
|
|||
|
||||
.. deprecated:: 3.6
|
||||
|
||||
OpenSSL has deprecated all version specific protocols. Use the default
|
||||
protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
|
||||
OpenSSL has deprecated all version specific protocols.
|
||||
|
||||
.. data:: PROTOCOL_TLSv1_1
|
||||
|
||||
|
@ -727,8 +740,7 @@ Constants
|
|||
|
||||
.. deprecated:: 3.6
|
||||
|
||||
OpenSSL has deprecated all version specific protocols. Use the default
|
||||
protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
|
||||
OpenSSL has deprecated all version specific protocols.
|
||||
|
||||
.. data:: PROTOCOL_TLSv1_2
|
||||
|
||||
|
@ -739,8 +751,7 @@ Constants
|
|||
|
||||
.. deprecated:: 3.6
|
||||
|
||||
OpenSSL has deprecated all version specific protocols. Use the default
|
||||
protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
|
||||
OpenSSL has deprecated all version specific protocols.
|
||||
|
||||
.. data:: OP_ALL
|
||||
|
||||
|
@ -762,7 +773,6 @@ Constants
|
|||
|
||||
SSLv2 is deprecated
|
||||
|
||||
|
||||
.. data:: OP_NO_SSLv3
|
||||
|
||||
Prevents an SSLv3 connection. This option is only applicable in
|
||||
|
@ -1068,6 +1078,11 @@ Constants
|
|||
|
||||
SSL 3.0 to TLS 1.3.
|
||||
|
||||
.. deprecated:: 3.10
|
||||
|
||||
All :class:`TLSVersion` members except :attr:`TLSVersion.TLSv1_2` and
|
||||
:attr:`TLSVersion.TLSv1_3` are deprecated.
|
||||
|
||||
|
||||
SSL Sockets
|
||||
-----------
|
||||
|
@ -1423,7 +1438,7 @@ such as SSL configuration options, certificate(s) and private key(s).
|
|||
It also manages a cache of SSL sessions for server-side sockets, in order
|
||||
to speed up repeated connections from the same clients.
|
||||
|
||||
.. class:: SSLContext(protocol=PROTOCOL_TLS)
|
||||
.. class:: SSLContext(protocol=None)
|
||||
|
||||
Create a new SSL context. You may pass *protocol* which must be one
|
||||
of the ``PROTOCOL_*`` constants defined in this module. The parameter
|
||||
|
@ -1472,6 +1487,12 @@ to speed up repeated connections from the same clients.
|
|||
ciphers, no ``NULL`` ciphers and no ``MD5`` ciphers (except for
|
||||
:data:`PROTOCOL_SSLv2`).
|
||||
|
||||
.. deprecated:: 3.10
|
||||
|
||||
:class:`SSLContext` without protocol argument is deprecated. The
|
||||
context class will either require :data:`PROTOCOL_TLS_CLIENT` or
|
||||
:data:`PROTOCOL_TLS_SERVER` protocol in the future.
|
||||
|
||||
|
||||
:class:`SSLContext` objects have the following methods and attributes:
|
||||
|
||||
|
@ -1934,7 +1955,7 @@ to speed up repeated connections from the same clients.
|
|||
.. attribute:: SSLContext.num_tickets
|
||||
|
||||
Control the number of TLS 1.3 session tickets of a
|
||||
:attr:`TLS_PROTOCOL_SERVER` context. The setting has no impact on TLS
|
||||
:attr:`PROTOCOL_TLS_SERVER` context. The setting has no impact on TLS
|
||||
1.0 to 1.2 connections.
|
||||
|
||||
.. versionadded:: 3.8
|
||||
|
@ -1951,6 +1972,12 @@ to speed up repeated connections from the same clients.
|
|||
>>> ssl.create_default_context().options # doctest: +SKIP
|
||||
<Options.OP_ALL|OP_NO_SSLv3|OP_NO_SSLv2|OP_NO_COMPRESSION: 2197947391>
|
||||
|
||||
.. deprecated:: 3.7
|
||||
|
||||
All ``OP_NO_SSL*`` and ``OP_NO_TLS*`` options have been deprecated since
|
||||
Python 3.7. Use :attr:`SSLContext.minimum_version` and
|
||||
:attr:`SSLContext.maximum_version` instead.
|
||||
|
||||
.. attribute:: SSLContext.post_handshake_auth
|
||||
|
||||
Enable TLS 1.3 post-handshake client authentication. Post-handshake auth
|
||||
|
@ -2623,8 +2650,8 @@ disabled by default.
|
|||
::
|
||||
|
||||
>>> client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
>>> client_context.options |= ssl.OP_NO_TLSv1
|
||||
>>> client_context.options |= ssl.OP_NO_TLSv1_1
|
||||
>>> client_context.minimum_version = ssl.TLSVersion.TLSv1_3
|
||||
>>> client_context.maximum_version = ssl.TLSVersion.TLSv1_3
|
||||
|
||||
|
||||
The SSL context created above will only allow TLSv1.2 and later (if
|
||||
|
|
|
@ -181,6 +181,7 @@ try:
|
|||
# OpenSSL's PKCS5_PBKDF2_HMAC requires OpenSSL 1.0+ with HMAC and SHA
|
||||
from _hashlib import pbkdf2_hmac
|
||||
except ImportError:
|
||||
from warnings import warn as _warn
|
||||
_trans_5C = bytes((x ^ 0x5C) for x in range(256))
|
||||
_trans_36 = bytes((x ^ 0x36) for x in range(256))
|
||||
|
||||
|
@ -191,6 +192,11 @@ except ImportError:
|
|||
as OpenSSL's PKCS5_PBKDF2_HMAC for short passwords and much faster
|
||||
for long passwords.
|
||||
"""
|
||||
_warn(
|
||||
"Python implementation of pbkdf2_hmac() is deprecated.",
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
if not isinstance(hash_name, str):
|
||||
raise TypeError(hash_name)
|
||||
|
||||
|
|
53
Lib/ssl.py
53
Lib/ssl.py
|
@ -381,6 +381,11 @@ def match_hostname(cert, hostname):
|
|||
CertificateError is raised on failure. On success, the function
|
||||
returns nothing.
|
||||
"""
|
||||
warnings.warn(
|
||||
"ssl module: match_hostname() is deprecated",
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
if not cert:
|
||||
raise ValueError("empty or no certificate, match_hostname needs a "
|
||||
"SSL socket or SSL context with either "
|
||||
|
@ -479,7 +484,15 @@ class SSLContext(_SSLContext):
|
|||
sslsocket_class = None # SSLSocket is assigned later.
|
||||
sslobject_class = None # SSLObject is assigned later.
|
||||
|
||||
def __new__(cls, protocol=PROTOCOL_TLS, *args, **kwargs):
|
||||
def __new__(cls, protocol=None, *args, **kwargs):
|
||||
if protocol is None:
|
||||
warnings.warn(
|
||||
"ssl module: "
|
||||
"SSLContext() without protocol argument is deprecated.",
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
protocol = PROTOCOL_TLS
|
||||
self = _SSLContext.__new__(cls, protocol)
|
||||
return self
|
||||
|
||||
|
@ -518,6 +531,7 @@ class SSLContext(_SSLContext):
|
|||
)
|
||||
|
||||
def set_npn_protocols(self, npn_protocols):
|
||||
warnings.warn("NPN is deprecated, use ALPN instead", stacklevel=2)
|
||||
protos = bytearray()
|
||||
for protocol in npn_protocols:
|
||||
b = bytes(protocol, 'ascii')
|
||||
|
@ -734,12 +748,15 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
|
|||
# SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
|
||||
# OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
|
||||
# by default.
|
||||
context = SSLContext(PROTOCOL_TLS)
|
||||
|
||||
if purpose == Purpose.SERVER_AUTH:
|
||||
# verify certs and host name in client mode
|
||||
context = SSLContext(PROTOCOL_TLS_CLIENT)
|
||||
context.verify_mode = CERT_REQUIRED
|
||||
context.check_hostname = True
|
||||
elif purpose == Purpose.CLIENT_AUTH:
|
||||
context = SSLContext(PROTOCOL_TLS_SERVER)
|
||||
else:
|
||||
raise ValueError(purpose)
|
||||
|
||||
if cafile or capath or cadata:
|
||||
context.load_verify_locations(cafile, capath, cadata)
|
||||
|
@ -755,7 +772,7 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
|
|||
context.keylog_filename = keylogfile
|
||||
return context
|
||||
|
||||
def _create_unverified_context(protocol=PROTOCOL_TLS, *, cert_reqs=CERT_NONE,
|
||||
def _create_unverified_context(protocol=None, *, cert_reqs=CERT_NONE,
|
||||
check_hostname=False, purpose=Purpose.SERVER_AUTH,
|
||||
certfile=None, keyfile=None,
|
||||
cafile=None, capath=None, cadata=None):
|
||||
|
@ -772,10 +789,18 @@ def _create_unverified_context(protocol=PROTOCOL_TLS, *, cert_reqs=CERT_NONE,
|
|||
# SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
|
||||
# OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
|
||||
# by default.
|
||||
context = SSLContext(protocol)
|
||||
if purpose == Purpose.SERVER_AUTH:
|
||||
# verify certs and host name in client mode
|
||||
if protocol is None:
|
||||
protocol = PROTOCOL_TLS_CLIENT
|
||||
elif purpose == Purpose.CLIENT_AUTH:
|
||||
if protocol is None:
|
||||
protocol = PROTOCOL_TLS_SERVER
|
||||
else:
|
||||
raise ValueError(purpose)
|
||||
|
||||
if not check_hostname:
|
||||
context.check_hostname = False
|
||||
context = SSLContext(protocol)
|
||||
context.check_hostname = check_hostname
|
||||
if cert_reqs is not None:
|
||||
context.verify_mode = cert_reqs
|
||||
if check_hostname:
|
||||
|
@ -909,6 +934,9 @@ class SSLObject:
|
|||
"""Return the currently selected NPN protocol as a string, or ``None``
|
||||
if a next protocol was not negotiated or if NPN is not supported by one
|
||||
of the peers."""
|
||||
warnings.warn(
|
||||
"ssl module: NPN is deprecated, use ALPN instead", stacklevel=2
|
||||
)
|
||||
|
||||
def selected_alpn_protocol(self):
|
||||
"""Return the currently selected ALPN protocol as a string, or ``None``
|
||||
|
@ -1123,6 +1151,9 @@ class SSLSocket(socket):
|
|||
@_sslcopydoc
|
||||
def selected_npn_protocol(self):
|
||||
self._checkClosed()
|
||||
warnings.warn(
|
||||
"ssl module: NPN is deprecated, use ALPN instead", stacklevel=2
|
||||
)
|
||||
return None
|
||||
|
||||
@_sslcopydoc
|
||||
|
@ -1382,7 +1413,11 @@ def wrap_socket(sock, keyfile=None, certfile=None,
|
|||
do_handshake_on_connect=True,
|
||||
suppress_ragged_eofs=True,
|
||||
ciphers=None):
|
||||
|
||||
warnings.warn(
|
||||
"ssl module: wrap_socket is deprecated, use SSLContext.wrap_socket()",
|
||||
category=DeprecationWarning,
|
||||
stacklevel=2
|
||||
)
|
||||
if server_side and not certfile:
|
||||
raise ValueError("certfile must be specified for server-side "
|
||||
"operations")
|
||||
|
@ -1460,7 +1495,7 @@ def PEM_cert_to_DER_cert(pem_cert_string):
|
|||
d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
|
||||
return base64.decodebytes(d.encode('ASCII', 'strict'))
|
||||
|
||||
def get_server_certificate(addr, ssl_version=PROTOCOL_TLS, ca_certs=None):
|
||||
def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None):
|
||||
"""Retrieve the certificate from the server at the specified address,
|
||||
and return it as a PEM-encoded string.
|
||||
If 'ca_certs' is specified, validate the server cert against it.
|
||||
|
|
|
@ -504,7 +504,7 @@ def collect_ssl(info_add):
|
|||
copy_attributes(info_add, ssl, 'ssl.%s', attributes, formatter=format_attr)
|
||||
|
||||
for name, ctx in (
|
||||
('SSLContext', ssl.SSLContext()),
|
||||
('SSLContext', ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)),
|
||||
('default_https_context', ssl._create_default_https_context()),
|
||||
('stdlib_context', ssl._create_stdlib_context()),
|
||||
):
|
||||
|
|
|
@ -91,7 +91,7 @@ def dummy_ssl_context():
|
|||
if ssl is None:
|
||||
return None
|
||||
else:
|
||||
return ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
return simple_client_sslcontext(disable_verify=True)
|
||||
|
||||
|
||||
def run_briefly(loop):
|
||||
|
@ -158,7 +158,7 @@ class SSLWSGIServerMixin:
|
|||
# contains the ssl key and certificate files) differs
|
||||
# between the stdlib and stand-alone asyncio.
|
||||
# Prefer our own if we can find it.
|
||||
context = ssl.SSLContext()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(ONLYCERT, ONLYKEY)
|
||||
|
||||
ssock = context.wrap_socket(request, server_side=True)
|
||||
|
|
|
@ -324,7 +324,7 @@ if ssl is not None:
|
|||
_ssl_closing = False
|
||||
|
||||
def secure_connection(self):
|
||||
context = ssl.SSLContext()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
socket = context.wrap_socket(self.socket,
|
||||
suppress_ragged_eofs=False,
|
||||
|
|
|
@ -21,6 +21,7 @@ from test import support
|
|||
from test.support import _4G, bigmemtest
|
||||
from test.support.import_helper import import_fresh_module
|
||||
from test.support import threading_helper
|
||||
from test.support import warnings_helper
|
||||
from http.client import HTTPException
|
||||
|
||||
# Were we compiled --with-pydebug or with #define Py_DEBUG?
|
||||
|
@ -1021,7 +1022,10 @@ class KDFTests(unittest.TestCase):
|
|||
|
||||
@unittest.skipIf(builtin_hashlib is None, "test requires builtin_hashlib")
|
||||
def test_pbkdf2_hmac_py(self):
|
||||
self._test_pbkdf2_hmac(builtin_hashlib.pbkdf2_hmac, builtin_hashes)
|
||||
with warnings_helper.check_warnings():
|
||||
self._test_pbkdf2_hmac(
|
||||
builtin_hashlib.pbkdf2_hmac, builtin_hashes
|
||||
)
|
||||
|
||||
@unittest.skipUnless(hasattr(openssl_hashlib, 'pbkdf2_hmac'),
|
||||
' test requires OpenSSL > 1.0')
|
||||
|
|
|
@ -96,7 +96,7 @@ if ssl:
|
|||
|
||||
def get_request(self):
|
||||
newsocket, fromaddr = self.socket.accept()
|
||||
context = ssl.SSLContext()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
connstream = context.wrap_socket(newsocket, server_side=True)
|
||||
return connstream, fromaddr
|
||||
|
|
|
@ -1602,7 +1602,7 @@ class LocalServerTests(unittest.TestCase):
|
|||
elif cmd == b'STARTTLS\r\n':
|
||||
reader.close()
|
||||
client.sendall(b'382 Begin TLS negotiation now\r\n')
|
||||
context = ssl.SSLContext()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(certfile)
|
||||
client = context.wrap_socket(
|
||||
client, server_side=True)
|
||||
|
|
|
@ -155,7 +155,7 @@ class DummyPOP3Handler(asynchat.async_chat):
|
|||
def cmd_stls(self, arg):
|
||||
if self.tls_active is False:
|
||||
self.push('+OK Begin TLS negotiation')
|
||||
context = ssl.SSLContext()
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
tls_sock = context.wrap_socket(self.socket,
|
||||
server_side=True,
|
||||
|
|
|
@ -224,7 +224,7 @@ def has_tls_version(version):
|
|||
|
||||
# check runtime and dynamic crypto policy settings. A TLS version may
|
||||
# be compiled in but disabled by a policy or config option.
|
||||
ctx = ssl.SSLContext()
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
if (
|
||||
hasattr(ctx, 'minimum_version') and
|
||||
ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
|
||||
|
@ -306,12 +306,20 @@ def asn1time(cert_time):
|
|||
|
||||
needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
|
||||
|
||||
ignore_deprecation = warnings_helper.ignore_warnings(
|
||||
category=DeprecationWarning
|
||||
)
|
||||
|
||||
def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
|
||||
|
||||
def test_wrap_socket(sock, *,
|
||||
cert_reqs=ssl.CERT_NONE, ca_certs=None,
|
||||
ciphers=None, certfile=None, keyfile=None,
|
||||
**kwargs):
|
||||
context = ssl.SSLContext(ssl_version)
|
||||
if not kwargs.get("server_side"):
|
||||
kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
else:
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
if cert_reqs is not None:
|
||||
if cert_reqs == ssl.CERT_NONE:
|
||||
context.check_hostname = False
|
||||
|
@ -378,8 +386,8 @@ class BasicSocketTests(unittest.TestCase):
|
|||
def test_str_for_enums(self):
|
||||
# Make sure that the PROTOCOL_* constants have enum-like string
|
||||
# reprs.
|
||||
proto = ssl.PROTOCOL_TLS
|
||||
self.assertEqual(str(proto), 'PROTOCOL_TLS')
|
||||
proto = ssl.PROTOCOL_TLS_CLIENT
|
||||
self.assertEqual(str(proto), 'PROTOCOL_TLS_CLIENT')
|
||||
ctx = ssl.SSLContext(proto)
|
||||
self.assertIs(ctx.protocol, proto)
|
||||
|
||||
|
@ -390,7 +398,8 @@ class BasicSocketTests(unittest.TestCase):
|
|||
% (v, (v and "sufficient randomness") or
|
||||
"insufficient randomness"))
|
||||
|
||||
data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
|
||||
with warnings_helper.check_warnings():
|
||||
data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
|
||||
self.assertEqual(len(data), 16)
|
||||
self.assertEqual(is_cryptographic, v == 1)
|
||||
if v:
|
||||
|
@ -401,48 +410,13 @@ class BasicSocketTests(unittest.TestCase):
|
|||
|
||||
# negative num is invalid
|
||||
self.assertRaises(ValueError, ssl.RAND_bytes, -5)
|
||||
self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
|
||||
with warnings_helper.check_warnings():
|
||||
self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
|
||||
|
||||
if hasattr(ssl, 'RAND_egd'):
|
||||
self.assertRaises(TypeError, ssl.RAND_egd, 1)
|
||||
self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
|
||||
ssl.RAND_add("this is a random string", 75.0)
|
||||
ssl.RAND_add(b"this is a random bytes object", 75.0)
|
||||
ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
|
||||
def test_random_fork(self):
|
||||
status = ssl.RAND_status()
|
||||
if not status:
|
||||
self.fail("OpenSSL's PRNG has insufficient randomness")
|
||||
|
||||
rfd, wfd = os.pipe()
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
try:
|
||||
os.close(rfd)
|
||||
child_random = ssl.RAND_pseudo_bytes(16)[0]
|
||||
self.assertEqual(len(child_random), 16)
|
||||
os.write(wfd, child_random)
|
||||
os.close(wfd)
|
||||
except BaseException:
|
||||
os._exit(1)
|
||||
else:
|
||||
os._exit(0)
|
||||
else:
|
||||
os.close(wfd)
|
||||
self.addCleanup(os.close, rfd)
|
||||
support.wait_process(pid, exitcode=0)
|
||||
|
||||
child_random = os.read(rfd, 16)
|
||||
self.assertEqual(len(child_random), 16)
|
||||
parent_random = ssl.RAND_pseudo_bytes(16)[0]
|
||||
self.assertEqual(len(parent_random), 16)
|
||||
|
||||
self.assertNotEqual(child_random, parent_random)
|
||||
|
||||
maxDiff = None
|
||||
|
||||
def test_parse_cert(self):
|
||||
# note that this uses an 'unofficial' function in _ssl.c,
|
||||
# provided solely for this test, to exercise the certificate
|
||||
|
@ -624,6 +598,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
with test_wrap_socket(s) as ss:
|
||||
self.assertEqual(timeout, ss.gettimeout())
|
||||
|
||||
@ignore_deprecation
|
||||
def test_errors_sslwrap(self):
|
||||
sock = socket.socket()
|
||||
self.assertRaisesRegex(ValueError,
|
||||
|
@ -675,6 +650,7 @@ class BasicSocketTests(unittest.TestCase):
|
|||
"""Wrapping with a badly formatted key (syntax error)"""
|
||||
self.bad_cert_test("badkey.pem")
|
||||
|
||||
@ignore_deprecation
|
||||
def test_match_hostname(self):
|
||||
def ok(cert, hostname):
|
||||
ssl.match_hostname(cert, hostname)
|
||||
|
@ -1126,17 +1102,15 @@ class ContextTests(unittest.TestCase):
|
|||
|
||||
def test_constructor(self):
|
||||
for protocol in PROTOCOLS:
|
||||
ssl.SSLContext(protocol)
|
||||
ctx = ssl.SSLContext()
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl.SSLContext(protocol)
|
||||
self.assertEqual(ctx.protocol, protocol)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl.SSLContext()
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertRaises(ValueError, ssl.SSLContext, -1)
|
||||
self.assertRaises(ValueError, ssl.SSLContext, 42)
|
||||
|
||||
def test_protocol(self):
|
||||
for proto in PROTOCOLS:
|
||||
ctx = ssl.SSLContext(proto)
|
||||
self.assertEqual(ctx.protocol, proto)
|
||||
|
||||
def test_ciphers(self):
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.set_ciphers("ALL")
|
||||
|
@ -1174,16 +1148,19 @@ class ContextTests(unittest.TestCase):
|
|||
OP_ENABLE_MIDDLEBOX_COMPAT |
|
||||
OP_IGNORE_UNEXPECTED_EOF)
|
||||
self.assertEqual(default, ctx.options)
|
||||
ctx.options |= ssl.OP_NO_TLSv1
|
||||
with warnings_helper.check_warnings():
|
||||
ctx.options |= ssl.OP_NO_TLSv1
|
||||
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
|
||||
ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
|
||||
self.assertEqual(default, ctx.options)
|
||||
ctx.options = 0
|
||||
# Ubuntu has OP_NO_SSLv3 forced on by default
|
||||
self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
|
||||
|
||||
def test_verify_mode_protocol(self):
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
# Default value
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
ctx.verify_mode = ssl.CERT_OPTIONAL
|
||||
|
@ -1221,6 +1198,7 @@ class ContextTests(unittest.TestCase):
|
|||
|
||||
@requires_minimum_version
|
||||
@unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
|
||||
@ignore_deprecation
|
||||
def test_min_max_version(self):
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||
# OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
|
||||
|
@ -1304,7 +1282,7 @@ class ContextTests(unittest.TestCase):
|
|||
"requires OpenSSL >= 1.1.0"
|
||||
)
|
||||
def test_security_level(self):
|
||||
ctx = ssl.SSLContext()
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
# The default security callback allows for levels between 0-5
|
||||
# with OpenSSL defaulting to 1, however some vendors override the
|
||||
# default value (e.g. Debian defaults to 2)
|
||||
|
@ -1513,7 +1491,7 @@ class ContextTests(unittest.TestCase):
|
|||
ctx.load_dh_params(CERTFILE)
|
||||
|
||||
def test_session_stats(self):
|
||||
for proto in PROTOCOLS:
|
||||
for proto in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
|
||||
ctx = ssl.SSLContext(proto)
|
||||
self.assertEqual(ctx.session_stats(), {
|
||||
'number': 0,
|
||||
|
@ -1673,7 +1651,7 @@ class ContextTests(unittest.TestCase):
|
|||
def test_create_default_context(self):
|
||||
ctx = ssl.create_default_context()
|
||||
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||
self.assertTrue(ctx.check_hostname)
|
||||
self._assert_context_options(ctx)
|
||||
|
@ -1682,42 +1660,49 @@ class ContextTests(unittest.TestCase):
|
|||
cadata = f.read()
|
||||
ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
|
||||
cadata=cadata)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
|
||||
|
||||
def test__create_stdlib_context(self):
|
||||
ctx = ssl._create_stdlib_context()
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self.assertFalse(ctx.check_hostname)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
check_hostname=True)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl._create_stdlib_context(
|
||||
ssl.PROTOCOL_TLSv1_2,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
check_hostname=True
|
||||
)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1_2)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
||||
self.assertTrue(ctx.check_hostname)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
|
||||
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
self._assert_context_options(ctx)
|
||||
|
||||
def test_check_hostname(self):
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
with warnings_helper.check_warnings():
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
self.assertFalse(ctx.check_hostname)
|
||||
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
||||
|
||||
|
@ -2042,7 +2027,9 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
|
||||
def test_connect_with_context(self):
|
||||
# Same as test_connect, but with a separately created context
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||
s.connect(self.server_addr)
|
||||
self.assertEqual({}, s.getpeercert())
|
||||
|
@ -2062,9 +2049,11 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
# This should fail because we have no verification certs. Connection
|
||||
# failure crashes ThreadedEchoServer, so run this in an independent
|
||||
# test method.
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
s = ctx.wrap_socket(socket.socket(socket.AF_INET))
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
s = ctx.wrap_socket(
|
||||
socket.socket(socket.AF_INET),
|
||||
server_hostname=SIGNED_CERTFILE_HOSTNAME
|
||||
)
|
||||
self.addCleanup(s.close)
|
||||
self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
|
||||
s.connect, self.server_addr)
|
||||
|
@ -2075,19 +2064,19 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
# OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
|
||||
# contain both versions of each certificate (same content, different
|
||||
# filename) for this test to be portable across OpenSSL releases.
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.load_verify_locations(capath=CAPATH)
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET),
|
||||
server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
|
||||
s.connect(self.server_addr)
|
||||
cert = s.getpeercert()
|
||||
self.assertTrue(cert)
|
||||
|
||||
# Same with a bytes `capath` argument
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.load_verify_locations(capath=BYTES_CAPATH)
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET),
|
||||
server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
|
||||
s.connect(self.server_addr)
|
||||
cert = s.getpeercert()
|
||||
self.assertTrue(cert)
|
||||
|
@ -2096,19 +2085,19 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
with open(SIGNING_CA) as f:
|
||||
pem = f.read()
|
||||
der = ssl.PEM_cert_to_DER_cert(pem)
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.load_verify_locations(cadata=pem)
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET),
|
||||
server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
|
||||
s.connect(self.server_addr)
|
||||
cert = s.getpeercert()
|
||||
self.assertTrue(cert)
|
||||
|
||||
# same with DER
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.load_verify_locations(cadata=der)
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
|
||||
with ctx.wrap_socket(socket.socket(socket.AF_INET),
|
||||
server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
|
||||
s.connect(self.server_addr)
|
||||
cert = s.getpeercert()
|
||||
self.assertTrue(cert)
|
||||
|
@ -2302,7 +2291,8 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
sock.connect(self.server_addr)
|
||||
incoming = ssl.MemoryBIO()
|
||||
outgoing = ssl.MemoryBIO()
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
sslobj = ctx.wrap_bio(incoming, outgoing, False)
|
||||
self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
|
||||
|
@ -2384,7 +2374,6 @@ class ThreadedEchoServer(threading.Thread):
|
|||
try:
|
||||
self.sslconn = self.server.context.wrap_socket(
|
||||
self.sock, server_side=True)
|
||||
self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
|
||||
self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
|
||||
except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
|
||||
# We treat ConnectionResetError as though it were an
|
||||
|
@ -2433,8 +2422,6 @@ class ThreadedEchoServer(threading.Thread):
|
|||
cipher = self.sslconn.cipher()
|
||||
if support.verbose and self.server.chatty:
|
||||
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
|
||||
sys.stdout.write(" server: selected protocol is now "
|
||||
+ str(self.sslconn.selected_npn_protocol()) + "\n")
|
||||
return True
|
||||
|
||||
def read(self):
|
||||
|
@ -2562,7 +2549,7 @@ class ThreadedEchoServer(threading.Thread):
|
|||
def __init__(self, certificate=None, ssl_version=None,
|
||||
certreqs=None, cacerts=None,
|
||||
chatty=True, connectionchatty=False, starttls_server=False,
|
||||
npn_protocols=None, alpn_protocols=None,
|
||||
alpn_protocols=None,
|
||||
ciphers=None, context=None):
|
||||
if context:
|
||||
self.context = context
|
||||
|
@ -2576,8 +2563,6 @@ class ThreadedEchoServer(threading.Thread):
|
|||
self.context.load_verify_locations(cacerts)
|
||||
if certificate:
|
||||
self.context.load_cert_chain(certificate)
|
||||
if npn_protocols:
|
||||
self.context.set_npn_protocols(npn_protocols)
|
||||
if alpn_protocols:
|
||||
self.context.set_alpn_protocols(alpn_protocols)
|
||||
if ciphers:
|
||||
|
@ -2589,7 +2574,6 @@ class ThreadedEchoServer(threading.Thread):
|
|||
self.port = socket_helper.bind_port(self.sock)
|
||||
self.flag = None
|
||||
self.active = False
|
||||
self.selected_npn_protocols = []
|
||||
self.selected_alpn_protocols = []
|
||||
self.shared_ciphers = []
|
||||
self.conn_errors = []
|
||||
|
@ -2796,14 +2780,12 @@ def server_params_test(client_context, server_context, indata=b"FOO\n",
|
|||
'cipher': s.cipher(),
|
||||
'peercert': s.getpeercert(),
|
||||
'client_alpn_protocol': s.selected_alpn_protocol(),
|
||||
'client_npn_protocol': s.selected_npn_protocol(),
|
||||
'version': s.version(),
|
||||
'session_reused': s.session_reused,
|
||||
'session': s.session,
|
||||
})
|
||||
s.close()
|
||||
stats['server_alpn_protocols'] = server.selected_alpn_protocols
|
||||
stats['server_npn_protocols'] = server.selected_npn_protocols
|
||||
stats['server_shared_ciphers'] = server.shared_ciphers
|
||||
return stats
|
||||
|
||||
|
@ -2829,21 +2811,26 @@ def try_protocol_combo(server_protocol, client_protocol, expect_success,
|
|||
(ssl.get_protocol_name(client_protocol),
|
||||
ssl.get_protocol_name(server_protocol),
|
||||
certtype))
|
||||
client_context = ssl.SSLContext(client_protocol)
|
||||
client_context.options |= client_options
|
||||
server_context = ssl.SSLContext(server_protocol)
|
||||
server_context.options |= server_options
|
||||
|
||||
with warnings_helper.check_warnings():
|
||||
# ignore Deprecation warnings
|
||||
client_context = ssl.SSLContext(client_protocol)
|
||||
client_context.options |= client_options
|
||||
server_context = ssl.SSLContext(server_protocol)
|
||||
server_context.options |= server_options
|
||||
|
||||
min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
|
||||
if (min_version is not None
|
||||
# SSLContext.minimum_version is only available on recent OpenSSL
|
||||
# (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
|
||||
and hasattr(server_context, 'minimum_version')
|
||||
and server_protocol == ssl.PROTOCOL_TLS
|
||||
and server_context.minimum_version > min_version):
|
||||
# SSLContext.minimum_version is only available on recent OpenSSL
|
||||
# (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
|
||||
and hasattr(server_context, 'minimum_version')
|
||||
and server_protocol == ssl.PROTOCOL_TLS
|
||||
and server_context.minimum_version > min_version
|
||||
):
|
||||
# If OpenSSL configuration is strict and requires more recent TLS
|
||||
# version, we have to change the minimum to test old TLS versions.
|
||||
server_context.minimum_version = min_version
|
||||
with warnings_helper.check_warnings():
|
||||
server_context.minimum_version = min_version
|
||||
|
||||
# NOTE: we must enable "ALL" ciphers on the client, otherwise an
|
||||
# SSLv23 client will send an SSLv3 hello (rather than SSLv2)
|
||||
|
@ -2886,17 +2873,6 @@ class ThreadedTests(unittest.TestCase):
|
|||
"""Basic test of an SSL client connecting to a server"""
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
for protocol in PROTOCOLS:
|
||||
if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
|
||||
continue
|
||||
if not has_tls_protocol(protocol):
|
||||
continue
|
||||
with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
|
||||
context = ssl.SSLContext(protocol)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
seclevel_workaround(context)
|
||||
server_params_test(context, context,
|
||||
chatty=True, connectionchatty=True)
|
||||
|
||||
client_context, server_context, hostname = testing_context()
|
||||
|
||||
|
@ -3565,8 +3541,7 @@ class ThreadedTests(unittest.TestCase):
|
|||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLS_CLIENT)
|
||||
cert_reqs=ssl.CERT_NONE)
|
||||
s.connect((HOST, server.port))
|
||||
# helper methods for standardising recv* method signatures
|
||||
def _recv_into():
|
||||
|
@ -3718,8 +3693,7 @@ class ThreadedTests(unittest.TestCase):
|
|||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLS_CLIENT)
|
||||
cert_reqs=ssl.CERT_NONE)
|
||||
s.connect((HOST, server.port))
|
||||
s.setblocking(False)
|
||||
|
||||
|
@ -3788,14 +3762,11 @@ class ThreadedTests(unittest.TestCase):
|
|||
def test_server_accept(self):
|
||||
# Issue #16357: accept() on a SSLSocket created through
|
||||
# SSLContext.wrap_socket().
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context.verify_mode = ssl.CERT_REQUIRED
|
||||
context.load_verify_locations(SIGNING_CA)
|
||||
context.load_cert_chain(SIGNED_CERTFILE)
|
||||
client_ctx, server_ctx, hostname = testing_context()
|
||||
server = socket.socket(socket.AF_INET)
|
||||
host = "127.0.0.1"
|
||||
port = socket_helper.bind_port(server)
|
||||
server = context.wrap_socket(server, server_side=True)
|
||||
server = server_ctx.wrap_socket(server, server_side=True)
|
||||
self.assertTrue(server.server_side)
|
||||
|
||||
evt = threading.Event()
|
||||
|
@ -3813,8 +3784,10 @@ class ThreadedTests(unittest.TestCase):
|
|||
t.start()
|
||||
# Client wait until server setup and perform a connect.
|
||||
evt.wait()
|
||||
client = context.wrap_socket(socket.socket())
|
||||
client.connect((host, port))
|
||||
client = client_ctx.wrap_socket(
|
||||
socket.socket(), server_hostname=hostname
|
||||
)
|
||||
client.connect((hostname, port))
|
||||
client.send(b'data')
|
||||
client.recv()
|
||||
client_addr = client.getsockname()
|
||||
|
@ -3827,14 +3800,16 @@ class ThreadedTests(unittest.TestCase):
|
|||
self.assertEqual(peer, client_addr)
|
||||
|
||||
def test_getpeercert_enotconn(self):
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context.check_hostname = False
|
||||
with context.wrap_socket(socket.socket()) as sock:
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sock.getpeercert()
|
||||
self.assertEqual(cm.exception.errno, errno.ENOTCONN)
|
||||
|
||||
def test_do_handshake_enotconn(self):
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
context.check_hostname = False
|
||||
with context.wrap_socket(socket.socket()) as sock:
|
||||
with self.assertRaises(OSError) as cm:
|
||||
sock.do_handshake()
|
||||
|
@ -3875,13 +3850,11 @@ class ThreadedTests(unittest.TestCase):
|
|||
|
||||
@requires_tls_version('TLSv1_3')
|
||||
def test_tls1_3(self):
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
context.options |= (
|
||||
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
|
||||
)
|
||||
with ThreadedEchoServer(context=context) as server:
|
||||
with context.wrap_socket(socket.socket()) as s:
|
||||
client_context, server_context, hostname = testing_context()
|
||||
client_context.minimum_version = ssl.TLSVersion.TLSv1_3
|
||||
with ThreadedEchoServer(context=server_context) as server:
|
||||
with client_context.wrap_socket(socket.socket(),
|
||||
server_hostname=hostname) as s:
|
||||
s.connect((HOST, server.port))
|
||||
self.assertIn(s.cipher()[0], {
|
||||
'TLS_AES_256_GCM_SHA384',
|
||||
|
@ -3892,6 +3865,8 @@ class ThreadedTests(unittest.TestCase):
|
|||
|
||||
@requires_minimum_version
|
||||
@requires_tls_version('TLSv1_2')
|
||||
@requires_tls_version('TLSv1')
|
||||
@ignore_deprecation
|
||||
def test_min_max_version_tlsv1_2(self):
|
||||
client_context, server_context, hostname = testing_context()
|
||||
# client TLSv1.0 to 1.2
|
||||
|
@ -3909,6 +3884,7 @@ class ThreadedTests(unittest.TestCase):
|
|||
|
||||
@requires_minimum_version
|
||||
@requires_tls_version('TLSv1_1')
|
||||
@ignore_deprecation
|
||||
def test_min_max_version_tlsv1_1(self):
|
||||
client_context, server_context, hostname = testing_context()
|
||||
# client 1.0 to 1.2, server 1.0 to 1.1
|
||||
|
@ -3927,6 +3903,7 @@ class ThreadedTests(unittest.TestCase):
|
|||
@requires_minimum_version
|
||||
@requires_tls_version('TLSv1_2')
|
||||
@requires_tls_version('TLSv1')
|
||||
@ignore_deprecation
|
||||
def test_min_max_version_mismatch(self):
|
||||
client_context, server_context, hostname = testing_context()
|
||||
# client 1.0, server 1.2 (mismatch)
|
||||
|
@ -3962,17 +3939,17 @@ class ThreadedTests(unittest.TestCase):
|
|||
def test_default_ecdh_curve(self):
|
||||
# Issue #21015: elliptic curve-based Diffie Hellman key exchange
|
||||
# should be enabled by default on SSL contexts.
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context.load_cert_chain(CERTFILE)
|
||||
client_context, server_context, hostname = testing_context()
|
||||
# TLSv1.3 defaults to PFS key agreement and no longer has KEA in
|
||||
# cipher name.
|
||||
context.options |= ssl.OP_NO_TLSv1_3
|
||||
client_context.maximum_version = ssl.TLSVersion.TLSv1_2
|
||||
# Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
|
||||
# explicitly using the 'ECCdraft' cipher alias. Otherwise,
|
||||
# our default cipher list should prefer ECDH-based ciphers
|
||||
# automatically.
|
||||
with ThreadedEchoServer(context=context) as server:
|
||||
with context.wrap_socket(socket.socket()) as s:
|
||||
with ThreadedEchoServer(context=server_context) as server:
|
||||
with client_context.wrap_socket(socket.socket(),
|
||||
server_hostname=hostname) as s:
|
||||
s.connect((HOST, server.port))
|
||||
self.assertIn("ECDH", s.cipher()[0])
|
||||
|
||||
|
@ -4159,14 +4136,6 @@ class ThreadedTests(unittest.TestCase):
|
|||
self.assertEqual(server_result, expected,
|
||||
msg % (server_result, "server"))
|
||||
|
||||
def test_selected_npn_protocol(self):
|
||||
# selected_npn_protocol() is None unless NPN is used
|
||||
client_context, server_context, hostname = testing_context()
|
||||
stats = server_params_test(client_context, server_context,
|
||||
chatty=True, connectionchatty=True,
|
||||
sni_name=hostname)
|
||||
self.assertIs(stats['client_npn_protocol'], None)
|
||||
|
||||
def test_npn_protocols(self):
|
||||
assert not ssl.HAS_NPN
|
||||
|
||||
|
@ -4313,13 +4282,11 @@ class ThreadedTests(unittest.TestCase):
|
|||
with open(os_helper.TESTFN, 'wb') as f:
|
||||
f.write(TEST_DATA)
|
||||
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
|
||||
context.verify_mode = ssl.CERT_REQUIRED
|
||||
context.load_verify_locations(SIGNING_CA)
|
||||
context.load_cert_chain(SIGNED_CERTFILE)
|
||||
server = ThreadedEchoServer(context=context, chatty=False)
|
||||
client_context, server_context, hostname = testing_context()
|
||||
server = ThreadedEchoServer(context=server_context, chatty=False)
|
||||
with server:
|
||||
with context.wrap_socket(socket.socket()) as s:
|
||||
with client_context.wrap_socket(socket.socket(),
|
||||
server_hostname=hostname) as s:
|
||||
s.connect((HOST, server.port))
|
||||
with open(os_helper.TESTFN, 'rb') as file:
|
||||
s.sendfile(file)
|
||||
|
@ -4437,7 +4404,7 @@ class ThreadedTests(unittest.TestCase):
|
|||
class TestPostHandshakeAuth(unittest.TestCase):
|
||||
def test_pha_setter(self):
|
||||
protocols = [
|
||||
ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
|
||||
ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
|
||||
]
|
||||
for protocol in protocols:
|
||||
ctx = ssl.SSLContext(protocol)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
:mod:`ssl` now raises DeprecationWarning for OP_NO_SSL/TLS* options, old TLS
|
||||
versions, old protocols, and other features that have been deprecated since
|
||||
Python 3.6, 3.7, or OpenSSL 1.1.0.
|
|
@ -682,6 +682,17 @@ _setSSLError (_sslmodulestate *state, const char *errstr, int errcode, const cha
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int
|
||||
_ssl_deprecated(const char* name, int stacklevel) {
|
||||
return PyErr_WarnFormat(
|
||||
PyExc_DeprecationWarning, stacklevel,
|
||||
"ssl module: %s is deprecated", name
|
||||
);
|
||||
}
|
||||
|
||||
#define PY_SSL_DEPRECATED(name, stacklevel, ret) \
|
||||
if (_ssl_deprecated((name), (stacklevel)) == -1) return (ret)
|
||||
|
||||
/*
|
||||
* SSL objects
|
||||
*/
|
||||
|
@ -2863,6 +2874,7 @@ _ssl__SSLContext_impl(PyTypeObject *type, int proto_version)
|
|||
{
|
||||
PySSLContext *self;
|
||||
long options;
|
||||
const SSL_METHOD *method = NULL;
|
||||
SSL_CTX *ctx = NULL;
|
||||
X509_VERIFY_PARAM *params;
|
||||
int result;
|
||||
|
@ -2876,54 +2888,62 @@ _ssl__SSLContext_impl(PyTypeObject *type, int proto_version)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
PySSL_BEGIN_ALLOW_THREADS
|
||||
switch(proto_version) {
|
||||
#if defined(SSL3_VERSION) && !defined(OPENSSL_NO_SSL3)
|
||||
case PY_SSL_VERSION_SSL3:
|
||||
ctx = SSL_CTX_new(SSLv3_method());
|
||||
PY_SSL_DEPRECATED("PROTOCOL_SSLv3", 2, NULL);
|
||||
method = SSLv3_method();
|
||||
break;
|
||||
#endif
|
||||
#if (defined(TLS1_VERSION) && \
|
||||
!defined(OPENSSL_NO_TLS1) && \
|
||||
!defined(OPENSSL_NO_TLS1_METHOD))
|
||||
case PY_SSL_VERSION_TLS1:
|
||||
ctx = SSL_CTX_new(TLSv1_method());
|
||||
PY_SSL_DEPRECATED("PROTOCOL_TLSv1", 2, NULL);
|
||||
method = TLSv1_method();
|
||||
break;
|
||||
#endif
|
||||
#if (defined(TLS1_1_VERSION) && \
|
||||
!defined(OPENSSL_NO_TLS1_1) && \
|
||||
!defined(OPENSSL_NO_TLS1_1_METHOD))
|
||||
case PY_SSL_VERSION_TLS1_1:
|
||||
ctx = SSL_CTX_new(TLSv1_1_method());
|
||||
PY_SSL_DEPRECATED("PROTOCOL_TLSv1_1", 2, NULL);
|
||||
method = TLSv1_1_method();
|
||||
break;
|
||||
#endif
|
||||
#if (defined(TLS1_2_VERSION) && \
|
||||
!defined(OPENSSL_NO_TLS1_2) && \
|
||||
!defined(OPENSSL_NO_TLS1_2_METHOD))
|
||||
case PY_SSL_VERSION_TLS1_2:
|
||||
ctx = SSL_CTX_new(TLSv1_2_method());
|
||||
PY_SSL_DEPRECATED("PROTOCOL_TLSv1_2", 2, NULL);
|
||||
method = TLSv1_2_method();
|
||||
break;
|
||||
#endif
|
||||
case PY_SSL_VERSION_TLS:
|
||||
/* SSLv23 */
|
||||
ctx = SSL_CTX_new(TLS_method());
|
||||
PY_SSL_DEPRECATED("PROTOCOL_TLS", 2, NULL);
|
||||
method = TLS_method();
|
||||
break;
|
||||
case PY_SSL_VERSION_TLS_CLIENT:
|
||||
ctx = SSL_CTX_new(TLS_client_method());
|
||||
method = TLS_client_method();
|
||||
break;
|
||||
case PY_SSL_VERSION_TLS_SERVER:
|
||||
ctx = SSL_CTX_new(TLS_server_method());
|
||||
method = TLS_server_method();
|
||||
break;
|
||||
default:
|
||||
proto_version = -1;
|
||||
method = NULL;
|
||||
}
|
||||
PySSL_END_ALLOW_THREADS
|
||||
|
||||
if (proto_version == -1) {
|
||||
PyErr_SetString(PyExc_ValueError,
|
||||
"invalid or unsupported protocol version");
|
||||
if (method == NULL) {
|
||||
PyErr_Format(PyExc_ValueError,
|
||||
"invalid or unsupported protocol version %i",
|
||||
proto_version);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
PySSL_BEGIN_ALLOW_THREADS
|
||||
ctx = SSL_CTX_new(method);
|
||||
PySSL_END_ALLOW_THREADS
|
||||
|
||||
if (ctx == NULL) {
|
||||
_setSSLError(get_ssl_state(module), NULL, 0, __FILE__, __LINE__);
|
||||
return NULL;
|
||||
|
@ -3299,6 +3319,29 @@ set_min_max_proto_version(PySSLContext *self, PyObject *arg, int what)
|
|||
return -1;
|
||||
}
|
||||
|
||||
/* check for deprecations and supported values */
|
||||
switch(v) {
|
||||
case PY_PROTO_SSLv3:
|
||||
PY_SSL_DEPRECATED("TLSVersion.SSLv3", 2, -1);
|
||||
break;
|
||||
case PY_PROTO_TLSv1:
|
||||
PY_SSL_DEPRECATED("TLSVersion.TLSv1", 2, -1);
|
||||
break;
|
||||
case PY_PROTO_TLSv1_1:
|
||||
PY_SSL_DEPRECATED("TLSVersion.TLSv1_1", 2, -1);
|
||||
break;
|
||||
case PY_PROTO_MINIMUM_SUPPORTED:
|
||||
case PY_PROTO_MAXIMUM_SUPPORTED:
|
||||
case PY_PROTO_TLSv1_2:
|
||||
case PY_PROTO_TLSv1_3:
|
||||
/* ok */
|
||||
break;
|
||||
default:
|
||||
PyErr_Format(PyExc_ValueError,
|
||||
"Unsupported TLS/SSL version 0x%x", v);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (what == 0) {
|
||||
switch(v) {
|
||||
case PY_PROTO_MINIMUM_SUPPORTED:
|
||||
|
@ -3417,11 +3460,23 @@ static int
|
|||
set_options(PySSLContext *self, PyObject *arg, void *c)
|
||||
{
|
||||
long new_opts, opts, set, clear;
|
||||
long opt_no = (
|
||||
SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_TLSv1 |
|
||||
SSL_OP_NO_TLSv1_1 | SSL_OP_NO_TLSv1_2 | SSL_OP_NO_TLSv1_2
|
||||
);
|
||||
|
||||
if (!PyArg_Parse(arg, "l", &new_opts))
|
||||
return -1;
|
||||
opts = SSL_CTX_get_options(self->ctx);
|
||||
clear = opts & ~new_opts;
|
||||
set = ~opts & new_opts;
|
||||
|
||||
if ((set & opt_no) != 0) {
|
||||
if (_ssl_deprecated("Setting OP_NO_SSL* or SSL_NO_TLS* options is "
|
||||
"deprecated", 2) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
if (clear) {
|
||||
SSL_CTX_clear_options(self->ctx, clear);
|
||||
}
|
||||
|
@ -4961,6 +5016,7 @@ static PyObject *
|
|||
_ssl_RAND_pseudo_bytes_impl(PyObject *module, int n)
|
||||
/*[clinic end generated code: output=b1509e937000e52d input=58312bd53f9bbdd0]*/
|
||||
{
|
||||
PY_SSL_DEPRECATED("RAND_pseudo_bytes", 1, NULL);
|
||||
return PySSL_RAND(module, n, 1);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue