mirror of https://github.com/python/cpython
bpo-31870: Add a timeout parameter to ssl.get_server_certificate() (GH-22270)
This commit is contained in:
parent
6c681e1a4a
commit
b2fac1afaa
|
@ -426,7 +426,8 @@ Certificate handling
|
|||
previously. Return an integer (no fractions of a second in the
|
||||
input format)
|
||||
|
||||
.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None)
|
||||
.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, \
|
||||
ca_certs=None[, timeout])
|
||||
|
||||
Given the address ``addr`` of an SSL-protected server, as a (*hostname*,
|
||||
*port-number*) pair, fetches the server's certificate, and returns it as a
|
||||
|
@ -436,7 +437,8 @@ Certificate handling
|
|||
same format as used for the same parameter in
|
||||
:meth:`SSLContext.wrap_socket`. The call will attempt to validate the
|
||||
server certificate against that set of root certificates, and will fail
|
||||
if the validation attempt fails.
|
||||
if the validation attempt fails. A timeout can be specified with the
|
||||
``timeout`` parameter.
|
||||
|
||||
.. versionchanged:: 3.3
|
||||
This function is now IPv6-compatible.
|
||||
|
@ -445,6 +447,9 @@ Certificate handling
|
|||
The default *ssl_version* is changed from :data:`PROTOCOL_SSLv3` to
|
||||
:data:`PROTOCOL_TLS` for maximum compatibility with modern servers.
|
||||
|
||||
.. versionchanged:: 3.10
|
||||
The *timeout* parameter was added.
|
||||
|
||||
.. function:: DER_cert_to_PEM_cert(DER_cert_bytes)
|
||||
|
||||
Given a certificate as a DER-encoded blob of bytes, returns a PEM-encoded
|
||||
|
|
|
@ -1062,6 +1062,12 @@ The exception :exc:`socket.timeout` is now an alias of :exc:`TimeoutError`.
|
|||
Added option to create MPTCP sockets with ``IPPROTO_MPTCP``
|
||||
(Contributed by Rui Cunha in :issue:`43571`.)
|
||||
|
||||
ssl
|
||||
---
|
||||
|
||||
Add a *timeout* parameter to the :func:`ssl.get_server_certificate` function.
|
||||
(Contributed by Zackery Spytz in :issue:`31870`.)
|
||||
|
||||
sys
|
||||
---
|
||||
|
||||
|
|
11
Lib/ssl.py
11
Lib/ssl.py
|
@ -258,7 +258,7 @@ if sys.platform == "win32":
|
|||
from _ssl import enum_certificates, enum_crls
|
||||
|
||||
from socket import socket, SOCK_STREAM, create_connection
|
||||
from socket import SOL_SOCKET, SO_TYPE
|
||||
from socket import SOL_SOCKET, SO_TYPE, _GLOBAL_DEFAULT_TIMEOUT
|
||||
import socket as _socket
|
||||
import base64 # for DER-to-PEM translation
|
||||
import errno
|
||||
|
@ -1500,11 +1500,14 @@ def PEM_cert_to_DER_cert(pem_cert_string):
|
|||
d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
|
||||
return base64.decodebytes(d.encode('ASCII', 'strict'))
|
||||
|
||||
def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None):
|
||||
def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT,
|
||||
ca_certs=None, timeout=_GLOBAL_DEFAULT_TIMEOUT):
|
||||
"""Retrieve the certificate from the server at the specified address,
|
||||
and return it as a PEM-encoded string.
|
||||
If 'ca_certs' is specified, validate the server cert against it.
|
||||
If 'ssl_version' is specified, use it in the connection attempt."""
|
||||
If 'ssl_version' is specified, use it in the connection attempt.
|
||||
If 'timeout' is specified, use it in the connection attempt.
|
||||
"""
|
||||
|
||||
host, port = addr
|
||||
if ca_certs is not None:
|
||||
|
@ -1514,7 +1517,7 @@ def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None)
|
|||
context = _create_stdlib_context(ssl_version,
|
||||
cert_reqs=cert_reqs,
|
||||
cafile=ca_certs)
|
||||
with create_connection(addr) as sock:
|
||||
with create_connection(addr, timeout=timeout) as sock:
|
||||
with context.wrap_socket(sock, server_hostname=host) as sslsock:
|
||||
dercert = sslsock.getpeercert(True)
|
||||
return DER_cert_to_PEM_cert(dercert)
|
||||
|
|
|
@ -2136,6 +2136,11 @@ class SimpleBackgroundTests(unittest.TestCase):
|
|||
# independent test method
|
||||
_test_get_server_certificate_fail(self, *self.server_addr)
|
||||
|
||||
def test_get_server_certificate_timeout(self):
|
||||
with self.assertRaises(socket.timeout):
|
||||
ssl.get_server_certificate(self.server_addr, ca_certs=SIGNING_CA,
|
||||
timeout=0.0001)
|
||||
|
||||
def test_ciphers(self):
|
||||
with test_wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s:
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
The :func:`ssl.get_server_certificate` function now has a *timeout*
|
||||
parameter.
|
Loading…
Reference in New Issue