Issue #13634: Add support for querying and disabling SSL compression.

This commit is contained in:
Antoine Pitrou 2011-12-20 10:13:40 +01:00
parent 3563b18c19
commit 8abdb8abd8
6 changed files with 83 additions and 0 deletions

View File

@ -436,6 +436,15 @@ Constants
.. versionadded:: 3.3
.. data:: OP_NO_COMPRESSION
Disable compression on the SSL channel. This is useful if the application
protocol supports its own compression scheme.
This option is only available with OpenSSL 1.0.0 and later.
.. versionadded:: 3.3
.. data:: HAS_SNI
Whether the OpenSSL library has built-in support for the *Server Name
@ -561,6 +570,16 @@ SSL sockets also have the following additional methods and attributes:
version of the SSL protocol that defines its use, and the number of secret
bits being used. If no connection has been established, returns ``None``.
.. method:: SSLSocket.compression()
Return the compression algorithm being used as a string, or ``None``
if the connection isn't compressed.
If the higher-level protocol supports its own compression mechanism,
you can use :data:`OP_NO_COMPRESSION` to disable SSL-level compression.
.. versionadded:: 3.3
.. method:: SSLSocket.get_channel_binding(cb_type="tls-unique")
Get channel binding data for current connection, as a bytes object. Returns

View File

@ -70,6 +70,10 @@ from _ssl import (
OP_ALL, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1,
OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_ECDH_USE,
)
try:
from _ssl import OP_NO_COMPRESSION
except ImportError:
pass
from _ssl import RAND_status, RAND_egd, RAND_add, RAND_bytes, RAND_pseudo_bytes
from _ssl import (
SSL_ERROR_ZERO_RETURN,
@ -330,6 +334,13 @@ class SSLSocket(socket):
else:
return self._sslobj.cipher()
def compression(self):
self._checkClosed()
if not self._sslobj:
return None
else:
return self._sslobj.compression()
def send(self, data, flags=0):
self._checkClosed()
if self._sslobj:

View File

@ -97,6 +97,7 @@ class StatsRequestHandler(BaseHTTPRequestHandler):
stats = {
'session_cache': context.session_stats(),
'cipher': sock.cipher(),
'compression': sock.compression(),
}
body = pprint.pformat(stats)
body = body.encode('utf-8')

View File

@ -100,6 +100,8 @@ class BasicSocketTests(unittest.TestCase):
ssl.CERT_REQUIRED
ssl.OP_CIPHER_SERVER_PREFERENCE
ssl.OP_SINGLE_ECDH_USE
if ssl.OPENSSL_VERSION_INFO >= (1, 0):
ssl.OP_NO_COMPRESSION
self.assertIn(ssl.HAS_SNI, {True, False})
def test_random(self):
@ -1185,7 +1187,12 @@ else:
if connectionchatty:
if support.verbose:
sys.stdout.write(" client: closing connection.\n")
stats = {
'compression': s.compression(),
'cipher': s.cipher(),
}
s.close()
return stats
finally:
server.stop()
server.join()
@ -1814,6 +1821,25 @@ else:
server.stop()
server.join()
def test_compression(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.load_cert_chain(CERTFILE)
stats = server_params_test(context, context,
chatty=True, connectionchatty=True)
if support.verbose:
sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
@unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
"ssl.OP_NO_COMPRESSION needed for this test")
def test_compression_disabled(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.load_cert_chain(CERTFILE)
stats = server_params_test(context, context,
chatty=True, connectionchatty=True)
self.assertIs(stats['compression'], None)
def test_main(verbose=False):
if support.verbose:
plats = {

View File

@ -419,6 +419,8 @@ Core and Builtins
Library
-------
- Issue #13634: Add support for querying and disabling SSL compression.
- Issue #13627: Add support for SSL Elliptic Curve-based Diffie-Hellman
key exchange, through the SSLContext.set_ecdh_curve() method and the
ssl.OP_SINGLE_ECDH_USE option.

View File

@ -999,6 +999,25 @@ static PyObject *PySSL_cipher (PySSLSocket *self) {
return NULL;
}
static PyObject *PySSL_compression(PySSLSocket *self) {
#ifdef OPENSSL_NO_COMP
Py_RETURN_NONE;
#else
const COMP_METHOD *comp_method;
const char *short_name;
if (self->ssl == NULL)
Py_RETURN_NONE;
comp_method = SSL_get_current_compression(self->ssl);
if (comp_method == NULL || comp_method->type == NID_undef)
Py_RETURN_NONE;
short_name = OBJ_nid2sn(comp_method->type);
if (short_name == NULL)
Py_RETURN_NONE;
return PyUnicode_DecodeFSDefault(short_name);
#endif
}
static void PySSL_dealloc(PySSLSocket *self)
{
if (self->peer_cert) /* Possible not to have one? */
@ -1452,6 +1471,7 @@ static PyMethodDef PySSLMethods[] = {
{"peer_certificate", (PyCFunction)PySSL_peercert, METH_VARARGS,
PySSL_peercert_doc},
{"cipher", (PyCFunction)PySSL_cipher, METH_NOARGS},
{"compression", (PyCFunction)PySSL_compression, METH_NOARGS},
{"shutdown", (PyCFunction)PySSL_SSLshutdown, METH_NOARGS,
PySSL_SSLshutdown_doc},
#if HAVE_OPENSSL_FINISHED
@ -2482,6 +2502,10 @@ PyInit__ssl(void)
PyModule_AddIntConstant(m, "OP_CIPHER_SERVER_PREFERENCE",
SSL_OP_CIPHER_SERVER_PREFERENCE);
PyModule_AddIntConstant(m, "OP_SINGLE_ECDH_USE", SSL_OP_SINGLE_ECDH_USE);
#ifdef SSL_OP_NO_COMPRESSION
PyModule_AddIntConstant(m, "OP_NO_COMPRESSION",
SSL_OP_NO_COMPRESSION);
#endif
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
r = Py_True;