More work on SSL support.

* Much expanded test suite:

  All protocols tested against all other protocols.
  All protocols tested with all certificate options.
  Tests for bad key and bad cert.
  Test of STARTTLS functionality.
  Test of RAND_* functions.

* Fixes for threading/malloc bug.

* Issue 1065 fixed:

  sslsocket class renamed to SSLSocket.
  sslerror class renamed to SSLError.
  Function "wrap_socket" now used to wrap an existing socket.

* Issue 1583946 finally fixed:

  Support for subjectAltName added.
  Subject name now returned as proper DN list of RDNs.

* SSLError exported from socket as "sslerror".

* RAND_* functions properly exported from ssl.py.

* Documentation improved:

  Example of how to create a self-signed certificate.
  Better indexing.
This commit is contained in:
Bill Janssen 2007-09-10 21:51:02 +00:00
parent a0c05512ec
commit 98d19dafd9
15 changed files with 1602 additions and 519 deletions

View File

@ -32,7 +32,7 @@ using the :meth:`update` method. At any point you can ask it for the
:dfn:`digest` of the concatenation of the strings fed to it so far using the
:meth:`digest` or :meth:`hexdigest` methods.
.. index:: single: OpenSSL
.. index:: single: OpenSSL; (use in module hashlib)
Constructors for hash algorithms that are always present in this module are
:func:`md5`, :func:`sha1`, :func:`sha224`, :func:`sha256`, :func:`sha384`, and

View File

@ -12,6 +12,10 @@
.. sectionauthor:: Bill Janssen <bill.janssen@gmail.com>
.. index:: single: OpenSSL; (use in module ssl)
.. index:: TLS, SSL, Transport Layer Security, Secure Sockets Layer
This module provides access to Transport Layer Security (often known
as "Secure Sockets Layer") encryption and peer authentication
facilities for network sockets, both client-side and server-side.
@ -22,19 +26,113 @@ platforms, as long as OpenSSL is installed on that platform.
.. note::
Some behavior may be platform dependent, since calls are made to the operating
system socket APIs.
system socket APIs. The installed version of OpenSSL may also cause
variations in behavior.
This section documents the objects and functions in the ``ssl`` module;
for more general information about TLS, SSL, and certificates, the
reader is referred to the documents in the :ref:`ssl-references` section.
reader is referred to the documents in the "See Also" section at
the bottom.
This module defines a class, :class:`ssl.sslsocket`, which is
This module defines a class, :class:`ssl.SSLSocket`, which is
derived from the :class:`socket.socket` type, and supports additional
:meth:`read` and :meth:`write` methods, along with a method, :meth:`getpeercert`,
to retrieve the certificate of the other side of the connection.
This module defines the following functions, exceptions, and constants:
.. function:: wrap_socket (sock [, keyfile=None, certfile=None, server_side=False,
cert_reqs=CERT_NONE, ssl_version={see docs}, ca_certs=None])
Takes an instance ``sock`` of :class:`socket.socket`, and returns an instance of :class:`ssl.SSLSocket`, a subtype
of :class:`socket.socket`, which wraps the underlying socket in an SSL context.
For client-side sockets, the context construction is lazy; if the underlying socket isn't
connected yet, the context construction will be performed after :meth:`connect` is called
on the socket. For server-side sockets, if the socket has no remote peer, it is assumed
to be a listening socket, and the server-side SSL wrapping is automatically performed
on client connections accepted via the :meth:`accept` method.
The ``keyfile`` and ``certfile`` parameters specify optional files which contain a certificate
to be used to identify the local side of the connection. See the discussion of :ref:`ssl-certificates`
for more information on how the certificate is stored in the ``certfile``.
Often the private key is stored
in the same file as the certificate; in this case, only the ``certfile`` parameter need be
passed. If the private key is stored in a separate file, both parameters must be used.
If the private key is stored in the ``certfile``, it should come before the first certificate
in the certificate chain::
-----BEGIN RSA PRIVATE KEY-----
... (private key in base64 encoding) ...
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
... (certificate in base64 PEM encoding) ...
-----END CERTIFICATE-----
The parameter ``server_side`` is a boolean which identifies whether server-side or client-side
behavior is desired from this socket.
The parameter ``cert_reqs`` specifies whether a certificate is
required from the other side of the connection, and whether it will
be validated if provided. It must be one of the three values
:const:`CERT_NONE` (certificates ignored), :const:`CERT_OPTIONAL` (not required,
but validated if provided), or :const:`CERT_REQUIRED` (required and
validated). If the value of this parameter is not :const:`CERT_NONE`, then
the ``ca_certs`` parameter must point to a file of CA certificates.
The ``ca_certs`` file contains a set of concatenated "certification authority" certificates,
which are used to validate certificates passed from the other end of the connection.
See the discussion of :ref:`ssl-certificates` for more information about how to arrange
the certificates in this file.
The parameter ``ssl_version`` specifies which version of the SSL protocol to use.
Typically, the server chooses a particular protocol version, and the client
must adapt to the server's choice. Most of the versions are not interoperable
with the other versions. If not specified, for client-side operation, the
default SSL version is SSLv3; for server-side operation, SSLv23. These
version selections provide the most compatibility with other versions.
Here's a table showing which versions in a client (down the side)
can connect to which versions in a server (along the top):
.. table::
======================== ========= ========= ========== =========
*client* / **server** **SSLv2** **SSLv3** **SSLv23** **TLSv1**
*SSLv2* yes no yes* no
*SSLv3* yes yes yes no
*SSLv23* yes no yes no
*TLSv1* no no yes yes
======================== ========= ========= ========== =========
`*` In some older versions of OpenSSL (for instance, 0.9.7l on OS X 10.4),
an SSLv2 client could not connect to an SSLv23 server.
.. function:: RAND_status()
Returns True if the SSL pseudo-random number generator has been
seeded with 'enough' randomness, and False otherwise. You can use
:func:`ssl.RAND_egd` and :func:`ssl.RAND_add` to increase the randomness
of the pseudo-random number generator.
.. function:: RAND_egd(path)
If you are running an entropy-gathering daemon (EGD) somewhere, and ``path``
is the pathname of a socket connection open to it, this will read
256 bytes of randomness from the socket, and add it to the SSL pseudo-random number generator
to increase the security of generated secret keys. This is typically only
necessary on systems without better sources of randomness.
See http://egd.sourceforge.net/ or http://prngd.sourceforge.net/ for
sources of EGDs.
.. function:: RAND_add(bytes, entropy)
Mixes the given ``bytes`` into the SSL pseudo-random number generator.
The parameter ``entropy`` (a float) is a lower bound on the entropy
contained in string (so you can always use :const:`0.0`).
See :rfc:`1750` for more information on sources of entropy.
.. function:: cert_time_to_seconds(timestring)
Returns a floating-point value containing a normal seconds-after-the-epoch time
@ -51,7 +149,7 @@ This module defines the following functions, exceptions, and constants:
'Wed May 9 00:00:00 2007'
>>>
.. exception:: sslerror
.. exception:: SSLError
Raised to signal an error from the underlying SSL implementation. This
signifies some problem in the higher-level
@ -104,6 +202,60 @@ This module defines the following functions, exceptions, and constants:
.. _ssl-certificates:
SSLSocket Objects
-----------------
.. method:: SSLSocket.read([nbytes=1024])
Reads up to ``nbytes`` bytes from the SSL-encrypted channel and returns them.
.. method:: SSLSocket.write(data)
Writes the ``data`` to the other side of the connection, using the SSL channel to encrypt. Returns the number
of bytes written.
.. method:: SSLSocket.getpeercert()
If there is no certificate for the peer on the other end of the connection, returns ``None``.
If a certificate was received from the peer, but not validated, returns an empty ``dict`` instance.
If a certificate was received and validated, returns a ``dict`` instance with the fields
``subject`` (the principal for which the certificate was issued),
and ``notAfter`` (the time after which the certificate should not be trusted) filled in.
The certificate was already validated, so the ``notBefore`` and ``issuer`` fields are not
returned. If a certificate contains an instance of the *subjectAltName* extension,
there will also be a ``subjectAltName`` field in the dictionary.
The "subject" field is a tuple containing the sequence
of relative distinguished names (RDNs) given in the certificate's data structure
for the principal, and each RDN is a sequence of name-value pairs::
{'notAfter': 'Feb 16 16:54:50 2013 GMT',
'subject': ((('countryName', u'US'),),
(('stateOrProvinceName', u'Delaware'),),
(('localityName', u'Wilmington'),),
(('organizationName', u'Python Software Foundation'),),
(('organizationalUnitName', u'SSL'),),
(('commonName', u'somemachine.python.org'),))}
.. method:: SSLSocket.cipher()
Returns a three-value tuple containing the name of the cipher being
used, the version of the SSL protocol that defines its use, and the
number of secret bits being used. If no connection has been
established, returns ``None``.
.. method:: SSLSocket.ssl_shutdown()
Closes the SSL context (if any) over the socket, but leaves the socket connection
open for further use, if both sides are willing. This is different from :meth:`socket.socket.shutdown`,
which will close the connection, but leave the local socket available for further use.
.. index:: single: certificates
.. index:: single: X509 certificate
Certificates
------------
@ -130,8 +282,12 @@ can use a certificate to prove who they are. The other
side of a network connection can also be required to produce a certificate,
and that certificate can be validated to the satisfaction
of the client or server that requires such validation.
The connection can be set to fail automatically if such
validation is not achieved.
The connection attempt can be set to raise an exception if
the validation fails. Validation is done
automatically, by the underlying OpenSSL framework; the
application need not concern itself with its mechanics.
But the application does usually need to provide
sets of certificates to allow this process to take place.
Python uses files to contain certificates. They should be formatted
as "PEM" (see :rfc:`1422`), which is a base-64 encoded form wrapped
@ -170,108 +326,54 @@ certificate, you need to provide a "CA certs" file, filled with the certificate
chains for each issuer you are willing to trust. Again, this file just
contains these chains concatenated together. For validation, Python will
use the first chain it finds in the file which matches.
Some "standard" root certificates are available at
http://www.thawte.com/roots/ (for Thawte roots) and
http://www.verisign.com/support/roots.html (for Verisign roots).
See also :rfc:`4158` for more discussion of the way in which
Some "standard" root certificates are available from various certification
authorities:
`CACert.org <http://www.cacert.org/index.php?id=3>`_,
`Thawte <http://www.thawte.com/roots/>`_,
`Verisign <http://www.verisign.com/support/roots.html>`_,
`Equifax and GeoTrust <http://www.geotrust.com/resources/root_certificates/index.asp>`_.
In general, if you are using
SSL3 or TLS1, you don't need to put the full chain in your "CA certs" file;
you only need the root certificates, and the remote peer is supposed to
furnish the other certificates necessary to chain from its certificate to
a root certificate.
See :rfc:`4158` for more discussion of the way in which
certification chains can be built.
If you are going to create a server that provides SSL-encrypted
connection services, you will need to acquire a certificate for that
service. There are many ways of acquiring appropriate certificates,
such as buying one from a certification authority. Another common
practice is to generate a self-signed certificate. The simplest
way to do this is with the OpenSSL package, using something like
the following::
sslsocket Objects
-----------------
% openssl req -new -x509 -days 365 -nodes -out cert.pem -keyout cert.pem
Generating a 1024 bit RSA private key
.......++++++
.............................++++++
writing new private key to 'cert.pem'
-----
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:MyState
Locality Name (eg, city) []:Some City
Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Organization, Inc.
Organizational Unit Name (eg, section) []:My Group
Common Name (eg, YOUR name) []:myserver.mygroup.myorganization.com
Email Address []:ops@myserver.mygroup.myorganization.com
%
.. class:: sslsocket(sock [, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_SSLv23, ca_certs=None])
Takes an instance ``sock`` of :class:`socket.socket`, and returns an instance of a subtype
of :class:`socket.socket` which wraps the underlying socket in an SSL context.
For client-side sockets, the context construction is lazy; if the underlying socket isn't
connected yet, the context construction will be performed after :meth:`connect` is called
on the socket.
The ``keyfile`` and ``certfile`` parameters specify optional files which contain a certificate
to be used to identify the local side of the connection. See the above discussion of :ref:`ssl-certificates`
for more information on how the certificate is stored in the ``certfile``.
Often the private key is stored
in the same file as the certificate; in this case, only the ``certfile`` parameter need be
passed. If the private key is stored in a separate file, both parameters must be used.
If the private key is stored in the ``certfile``, it should come before the first certificate
in the certificate chain::
-----BEGIN RSA PRIVATE KEY-----
... (private key in base64 encoding) ...
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
... (certificate in base64 PEM encoding) ...
-----END CERTIFICATE-----
The parameter ``server_side`` is a boolean which identifies whether server-side or client-side
behavior is desired from this socket.
The parameter ``cert_reqs`` specifies whether a certificate is
required from the other side of the connection, and whether it will
be validated if provided. It must be one of the three values
:const:`CERT_NONE` (certificates ignored), :const:`CERT_OPTIONAL` (not required,
but validated if provided), or :const:`CERT_REQUIRED` (required and
validated). If the value of this parameter is not :const:`CERT_NONE`, then
the ``ca_certs`` parameter must point to a file of CA certificates.
The parameter ``ssl_version`` specifies which version of the SSL protocol to use. Typically,
the server specifies this, and a client connecting to it must use the same protocol. An
SSL server using :const:`PROTOCOL_SSLv23` can understand a client connecting via SSL2, SSL3, or TLS1,
but a client using :const:`PROTOCOL_SSLv23` can only connect to an SSL2 server.
The ``ca_certs`` file contains a set of concatenated "certification authority" certificates,
which are used to validate certificates passed from the other end of the connection.
See the above discussion of :ref:`ssl-certificates` for more information about how to arrange
the certificates in this file.
.. method:: sslsocket.read([nbytes])
Reads up to ``nbytes`` bytes from the SSL-encrypted channel and returns them.
.. method:: sslsocket.write(data)
Writes the ``data`` to the other side of the connection, using the SSL channel to encrypt. Returns the number
of bytes written.
.. method:: sslsocket.getpeercert()
If there is no certificate for the peer on the other end of the connection, returns ``None``.
If a certificate was received from the peer, but not validated, returns an empty ``dict`` instance.
If a certificate was received and validated, returns a ``dict`` instance with the fields
``subject`` (the principal for which the certificate was issued), ``issuer`` (the signer of
the certificate), ``notBefore`` (the time before which the certificate should not be trusted),
and ``notAfter`` (the time after which the certificate should not be trusted) filled in.
The "subject" and "issuer" fields are tuples containing the name-value fields
given in the certificate's data structure for each principal::
{'issuer': (('countryName', u'US'),
('stateOrProvinceName', u'Delaware'),
('localityName', u'Wilmington'),
('organizationName', u'Python Software Foundation'),
('organizationalUnitName', u'SSL'),
('commonName', u'somemachine.python.org')),
'notAfter': 'Feb 16 16:54:50 2013 GMT',
'notBefore': 'Aug 27 16:54:50 2007 GMT',
'subject': (('countryName', u'US'),
('stateOrProvinceName', u'Delaware'),
('localityName', u'Wilmington'),
('organizationName', u'Python Software Foundation'),
('organizationalUnitName', u'SSL'),
('commonName', u'somemachine.python.org')),
'version': 2}
This certificate is said to be *self-signed*, because the subject
and issuer are the same entity. The *version* field refers to the X509 version
that's used for the certificate.
.. method:: sslsocket.ssl_shutdown()
Closes the SSL context (if any) over the socket, but leaves the socket connection
open for further use, if both sides are willing. This is different from :meth:`socket.socket.shutdown`,
which will close the connection, but leave the local socket available for further use.
The disadvantage of a self-signed certificate is that it is its
own root certificate, and no one else will have it in their cache
of known (and trusted) root certificates.
Examples
@ -298,11 +400,16 @@ sends some bytes, and reads part of the response::
import socket, ssl, pprint
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssl_sock = ssl.sslsocket(s, ca_certs="/etc/ca_certs_file", cert_reqs=ssl.CERT_REQUIRED)
# require a certificate from the server
ssl_sock = ssl.wrap_socket(s,
ca_certs="/etc/ca_certs_file",
cert_reqs=ssl.CERT_REQUIRED)
ssl_sock.connect(('www.verisign.com', 443))
print repr(ssl_sock.getpeername())
print ssl_sock.cipher()
print pprint.pformat(ssl_sock.getpeercert())
# Set a simple HTTP request -- use httplib in actual code.
@ -313,35 +420,29 @@ sends some bytes, and reads part of the response::
# read all the data returned by the server.
data = ssl_sock.read()
# note that closing the sslsocket will also close the underlying socket
# note that closing the SSLSocket will also close the underlying socket
ssl_sock.close()
As of September 4, 2007, the certificate printed by this program
As of September 6, 2007, the certificate printed by this program
looked like this::
{'issuer': (('countryName', u'US'),
('organizationName', u'VeriSign, Inc.'),
('organizationalUnitName', u'VeriSign Trust Network'),
('organizationalUnitName',
u'Terms of use at https://www.verisign.com/rpa (c)06'),
('commonName',
u'VeriSign Class 3 Extended Validation SSL SGC CA')),
'notAfter': 'May 8 23:59:59 2009 GMT',
'notBefore': 'May 9 00:00:00 2007 GMT',
'subject': (('serialNumber', u'2497886'),
('1.3.6.1.4.1.311.60.2.1.3', u'US'),
('1.3.6.1.4.1.311.60.2.1.2', u'Delaware'),
('countryName', u'US'),
('postalCode', u'94043'),
('stateOrProvinceName', u'California'),
('localityName', u'Mountain View'),
('streetAddress', u'487 East Middlefield Road'),
('organizationName', u'VeriSign, Inc.'),
('organizationalUnitName', u'Production Security Services'),
('organizationalUnitName',
u'Terms of use at www.verisign.com/rpa (c)06'),
('commonName', u'www.verisign.com')),
'version': 2}
{'notAfter': 'May 8 23:59:59 2009 GMT',
'subject': ((('serialNumber', u'2497886'),),
(('1.3.6.1.4.1.311.60.2.1.3', u'US'),),
(('1.3.6.1.4.1.311.60.2.1.2', u'Delaware'),),
(('countryName', u'US'),),
(('postalCode', u'94043'),),
(('stateOrProvinceName', u'California'),),
(('localityName', u'Mountain View'),),
(('streetAddress', u'487 East Middlefield Road'),),
(('organizationName', u'VeriSign, Inc.'),),
(('organizationalUnitName',
u'Production Security Services'),),
(('organizationalUnitName',
u'Terms of use at www.verisign.com/rpa (c)06'),),
(('commonName', u'www.verisign.com'),))}
which is a fairly poorly-formed ``subject`` field.
Server-side operation
^^^^^^^^^^^^^^^^^^^^^
@ -357,12 +458,15 @@ to connect::
bindsocket.listen(5)
When one did, you'd call :meth:`accept` on the socket to get the new socket from the other
end, and use :func:`sslsocket` to create a server-side SSL context for it::
end, and use :func:`wrap_socket` to create a server-side SSL context for it::
while True:
newsocket, fromaddr = bindsocket.accept()
connstream = ssl.sslsocket(newsocket, server_side=True, certfile="mycertfile",
keyfile="mykeyfile", ssl_protocol=ssl.PROTOCOL_TLSv1)
connstream = ssl.wrap_socket(newsocket,
server_side=True,
certfile="mycertfile",
keyfile="mykeyfile",
ssl_protocol=ssl.PROTOCOL_TLSv1)
deal_with_client(connstream)
Then you'd read data from the ``connstream`` and do something with it till you are finished with the client (or the client is finished with you)::
@ -373,7 +477,8 @@ Then you'd read data from the ``connstream`` and do something with it till you a
# null data means the client is finished with us
while data:
if not do_something(connstream, data):
# we'll assume do_something returns False when we're finished with client
# we'll assume do_something returns False
# when we're finished with client
break
data = connstream.read()
# finished with client
@ -382,16 +487,19 @@ Then you'd read data from the ``connstream`` and do something with it till you a
And go back to listening for new client connections.
.. _ssl-references:
.. seealso::
References
----------
Class :class:`socket.socket`
Documentation of underlying :mod:`socket` class
Class :class:`socket.socket`
Documentation of underlying :mod:`socket` class
`Introducing SSL and Certificates using OpenSSL <http://old.pseudonym.org/ssl/wwwj-index.html>`_
Frederick J. Hirsch
`Introducing SSL and Certificates using OpenSSL <http://old.pseudonym.org/ssl/wwwj-index.html>`_, by Frederick J. Hirsch
`RFC 1422: Privacy Enhancement for Internet Electronic Mail: Part II: Certificate-Based Key Management <http://www.ietf.org/rfc/rfc1422>`_
Steve Kent
`Privacy Enhancement for Internet Electronic Mail: Part II: Certificate-Based Key Management`, :rfc:`1422`, by Steve Kent
`RFC 1750: Randomness Recommendations for Security <http://www.ietf.org/rfc/rfc1750>`_
D. Eastlake et. al.
`Internet X.509 Public Key Infrastructure Certificate and CRL Profile`, :rfc:`3280`, Housley et. al.
`RFC 3280: Internet X.509 Public Key Infrastructure Certificate and CRL Profile <http://www.ietf.org/rfc/rfc3280>`_
Housley et. al.

View File

@ -1051,7 +1051,7 @@ else:
"Connect to a host on a given (SSL) port."
sock = socket.create_connection((self.host, self.port), self.timeout)
self.sock = ssl.sslsocket(sock, self.key_file, self.cert_file)
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file)
__all__.append("HTTPSConnection")
@ -1083,7 +1083,7 @@ else:
def FakeSocket (sock, sslobj):
warnings.warn("FakeSocket is deprecated, and won't be in 3.x. " +
"Use the result of ssl.sslsocket directly instead.",
"Use the result of ssl.wrap_socket() directly instead.",
DeprecationWarning, stacklevel=2)
return sslobj

View File

@ -1147,7 +1147,7 @@ else:
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
def read(self, size):
@ -1199,7 +1199,7 @@ else:
def ssl(self):
"""Return SSLObject instance used to communicate with the IMAP4 server.
ssl = ssl.sslsocket(<instance>.socket)
ssl = ssl.wrap_socket(<instance>.socket)
"""
return self.sslobj

View File

@ -348,7 +348,7 @@ else:
if not self.sock:
raise socket.error, msg
self.file = self.sock.makefile('rb')
self.sslobj = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self._debugging = 0
self.welcome = self._getresp()

View File

@ -587,7 +587,7 @@ class SMTP:
if resp == 220:
if not _have_ssl:
raise RuntimeError("No SSL support included in this Python")
self.sock = ssl.sslsocket(self.sock, keyfile, certfile)
self.sock = ssl.wrap_socket(self.sock, keyfile, certfile)
self.file = SSLFakeFile(self.sock)
return (resp, reply)
@ -720,7 +720,7 @@ if _have_ssl:
def _get_socket(self, host, port, timeout):
if self.debuglevel > 0: print>>stderr, 'connect:', (host, port)
self.sock = socket.create_connection((host, port), timeout)
self.sock = ssl.sslsocket(self.sock, self.keyfile, self.certfile)
self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self.file = SSLFakeFile(self.sock)
__all__.append("SMTP_SSL")

View File

@ -56,13 +56,13 @@ else:
# we do an internal import here because the ssl
# module imports the socket module
import ssl as _realssl
warnings.warn("socket.ssl() is deprecated. Use ssl.sslsocket() instead.",
warnings.warn("socket.ssl() is deprecated. Use ssl.wrap_socket() instead.",
DeprecationWarning, stacklevel=2)
return _realssl.sslwrap_simple(sock, keyfile, certfile)
# we need to import the same constants we used to...
from _ssl import SSLError as sslerror
from _ssl import \
sslerror, \
RAND_add, \
RAND_egd, \
RAND_status, \

View File

@ -6,11 +6,11 @@ This module provides some more Pythonic support for SSL.
Object types:
sslsocket -- subtype of socket.socket which does SSL over the socket
SSLSocket -- subtype of socket.socket which does SSL over the socket
Exceptions:
sslerror -- exception raised for I/O errors
SSLError -- exception raised for I/O errors
Functions:
@ -58,9 +58,11 @@ PROTOCOL_TLSv1
import os, sys
import _ssl # if we can't import it, let the error propagate
from _ssl import sslerror
from _ssl import SSLError
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
from _ssl import RAND_status, RAND_egd, RAND_add
from _ssl import \
SSL_ERROR_ZERO_RETURN, \
SSL_ERROR_WANT_READ, \
@ -75,8 +77,20 @@ from _ssl import \
from socket import socket
from socket import getnameinfo as _getnameinfo
def get_protocol_name (protocol_code):
if protocol_code == PROTOCOL_TLSv1:
return "TLSv1"
elif protocol_code == PROTOCOL_SSLv23:
return "SSLv23"
elif protocol_code == PROTOCOL_SSLv2:
return "SSLv2"
elif protocol_code == PROTOCOL_SSLv3:
return "SSLv3"
else:
return "<unknown>"
class sslsocket (socket):
class SSLSocket (socket):
"""This class implements a subtype of socket.socket that wraps
the underlying OS socket in an SSL context when necessary, and
@ -119,14 +133,21 @@ class sslsocket (socket):
return self._sslobj.write(data)
def getpeercert(self):
def getpeercert(self, binary_form=False):
"""Returns a formatted version of the data in the
certificate provided by the other end of the SSL channel.
Return None if no certificate was provided, {} if a
certificate was provided, but not validated."""
return self._sslobj.peer_certificate()
return self._sslobj.peer_certificate(binary_form)
def cipher (self):
if not self._sslobj:
return None
else:
return self._sslobj.cipher()
def send (self, data, flags=0):
if self._sslobj:
@ -197,7 +218,7 @@ class sslsocket (socket):
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._sslobj:
raise ValueError("attempt to connect already-connected sslsocket!")
raise ValueError("attempt to connect already-connected SSLSocket!")
socket.connect(self, addr)
self._sslobj = _ssl.sslwrap(self._sock, False, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
@ -210,11 +231,19 @@ class sslsocket (socket):
SSL channel, and the address of the remote client."""
newsock, addr = socket.accept(self)
return (sslsocket(newsock, True, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs), addr)
return (SSLSocket(newsock, True, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs), addr)
def wrap_socket(sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None):
return SSLSocket(sock, keyfile=keyfile, certfile=certfile,
server_side=server_side, cert_reqs=cert_reqs,
ssl_version=ssl_version, ca_certs=ca_certs)
# some utility functions
def cert_time_to_seconds(cert_time):

36
Lib/test/badcert.pem Normal file
View File

@ -0,0 +1,36 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
Just bad cert data
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
Just bad cert data
-----END CERTIFICATE-----

40
Lib/test/badkey.pem Normal file
View File

@ -0,0 +1,40 @@
-----BEGIN RSA PRIVATE KEY-----
Bad Key, though the cert should be OK
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Bad Key, though the cert should be OK
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----

0
Lib/test/nullcert.pem Normal file
View File

View File

@ -1108,7 +1108,6 @@ _expectations['freebsd7'] = _expectations['freebsd4']
class _ExpectedSkips:
def __init__(self):
import os.path
from test import test_socket_ssl
from test import test_timeout
self.valid = False
@ -1122,8 +1121,13 @@ class _ExpectedSkips:
if not os.path.supports_unicode_filenames:
self.expected.add('test_pep277')
if test_socket_ssl.skip_expected:
self.expected.add('test_socket_ssl')
try:
from test import test_socket_ssl
except ImportError:
pass
else:
if test_socket_ssl.skip_expected:
self.expected.add('test_socket_ssl')
if test_timeout.skip_expected:
self.expected.add('test_timeout')

View File

@ -115,7 +115,7 @@ class BasicTests(unittest.TestCase):
s = socket.socket(socket.AF_INET)
s.connect(("www.sf.net", 443))
fd = s._sock.fileno()
sock = ssl.sslsocket(s)
sock = ssl.wrap_socket(s)
s = None
sock.close()
try:

View File

@ -5,7 +5,6 @@ import unittest
from test import test_support
import socket
import errno
import threading
import subprocess
import time
import os
@ -23,57 +22,21 @@ except ImportError:
CERTFILE = None
TESTPORT = 10025
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
sys.stdout.write(prefix + exc_format)
if test_support.verbose:
sys.stdout.write(prefix + exc_format)
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# Then it rudely closes the socket, and sets Event `listener_gone`
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def testSSLconnect(self):
import os
with test_support.transient_internet():
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("pop.gmail.com", 995))
c = s.getpeercert()
if c:
@ -81,177 +44,551 @@ class BasicTests(unittest.TestCase):
s.close()
# this should fail because we have no verification certs
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("pop.gmail.com", 995))
except ssl.sslerror:
except ssl.SSLError:
pass
finally:
s.close()
class ConnectedTests(unittest.TestCase):
def testCrucialConstants(self):
ssl.PROTOCOL_SSLv2
ssl.PROTOCOL_SSLv23
ssl.PROTOCOL_SSLv3
ssl.PROTOCOL_TLSv1
ssl.CERT_NONE
ssl.CERT_OPTIONAL
ssl.CERT_REQUIRED
def testTLSecho (self):
s1 = socket.socket()
def testRAND(self):
v = ssl.RAND_status()
if test_support.verbose:
sys.stdout.write("\n RAND_status is %d (%s)\n"
% (v, (v and "sufficient randomness") or
"insufficient randomness"))
try:
s1.connect(('127.0.0.1', 10024))
except:
handle_error("connection failure:\n")
raise test_support.TestFailed("Can't connect to test server")
ssl.RAND_egd(1)
except TypeError:
pass
else:
try:
c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
except:
handle_error("SSL handshake failure:\n")
raise test_support.TestFailed("Can't SSL-handshake with test server")
else:
if not c1:
raise test_support.TestFailed("Can't SSL-handshake with test server")
indata = "FOO\n"
c1.write(indata)
outdata = c1.read()
if outdata != indata.lower():
raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
c1.close()
print "didn't raise TypeError"
ssl.RAND_add("this is a random string", 75.0)
def testReadCert(self):
def testParseCert(self):
# note that this uses an 'unofficial' function in _ssl.c,
# provided solely for this test, to exercise the certificate
# parsing code
p = ssl._ssl._test_decode_cert(CERTFILE, False)
if test_support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
s2 = socket.socket()
try:
s2.connect(('127.0.0.1', 10024))
except:
handle_error("connection failure:\n")
raise test_support.TestFailed("Can't connect to test server")
else:
try:
c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
except:
handle_error("SSL handshake failure:\n")
raise test_support.TestFailed("Can't SSL-handshake with test server")
else:
if not c2:
raise test_support.TestFailed("Can't SSL-handshake with test server")
cert = c2.getpeercert()
if not cert:
raise test_support.TestFailed("Can't get peer certificate.")
if test_support.verbose:
sys.stdout.write(pprint.pformat(cert) + '\n')
if not cert.has_key('subject'):
raise test_support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if not ('organizationName', 'Python Software Foundation') in cert['subject']:
raise test_support.TestFailed(
"Missing or invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.");
c2.close()
try:
import threading
except ImportError:
_have_threads = False
else:
_have_threads = True
class ThreadedEchoServer(threading.Thread):
class ThreadedEchoServer(threading.Thread):
class ConnectionHandler(threading.Thread):
class ConnectionHandler(threading.Thread):
def __init__(self, server, connsock):
self.server = server
self.running = False
self.sock = connsock
"""A mildly complicated class, because we want it to work both
with and without the SSL wrapper around the socket connection, so
that we can test the STARTTLS functionality."""
def __init__(self, server, connsock):
self.server = server
self.running = False
self.sock = connsock
self.sock.setblocking(1)
self.sslconn = None
threading.Thread.__init__(self)
self.setDaemon(True)
def wrap_conn (self):
try:
self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
ca_certs=self.server.cacerts,
cert_reqs=self.server.certreqs)
except:
if self.server.chatty:
handle_error("\n server: bad connection attempt from " +
str(self.sock.getpeername()) + ":\n")
if not self.server.expect_bad_connects:
# here, we want to stop the server, because this shouldn't
# happen in the context of our test case
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
return False
else:
if self.server.certreqs == ssl.CERT_REQUIRED:
cert = self.sslconn.getpeercert()
if test_support.verbose and self.server.chatty:
sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
cert_binary = self.sslconn.getpeercert(True)
if test_support.verbose and self.server.chatty:
sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
cipher = self.sslconn.cipher()
if test_support.verbose and self.server.chatty:
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
return True
def read(self):
if self.sslconn:
return self.sslconn.read()
else:
return self.sock.recv(1024)
def write(self, bytes):
if self.sslconn:
return self.sslconn.write(bytes)
else:
return self.sock.send(bytes)
def close(self):
if self.sslconn:
self.sslconn.close()
else:
self.sock.close()
def run (self):
self.running = True
if not self.server.starttls_server:
if not self.wrap_conn():
return
while self.running:
try:
msg = self.read()
if not msg:
# eof, so quit this handler
self.running = False
self.close()
elif msg.strip() == 'over':
if test_support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: client closed connection\n")
self.close()
return
elif self.server.starttls_server and msg.strip() == 'STARTTLS':
if test_support.verbose and self.server.connectionchatty:
sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
self.write("OK\n")
if not self.wrap_conn():
return
else:
if (test_support.verbose and
self.server.connectionchatty):
ctype = (self.sslconn and "encrypted") or "unencrypted"
sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
% (repr(msg), ctype, repr(msg.lower()), ctype))
self.write(msg.lower())
except ssl.SSLError:
if self.server.chatty:
handle_error("Test server failure:\n")
self.close()
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
except:
handle_error('')
def __init__(self, port, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None:
ssl_version = ssl.PROTOCOL_TLSv1
if certreqs is None:
certreqs = ssl.CERT_NONE
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.expect_bad_connects = expect_bad_connects
self.chatty = chatty
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
self.flag = None
if hasattr(socket, 'SO_REUSEADDR'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(True)
self.setDaemon(False)
def start (self, flag=None):
self.flag = flag
threading.Thread.start(self)
def run (self):
self.running = True
try:
sslconn = ssl.sslsocket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
cert_reqs=self.server.certreqs)
except:
# here, we want to stop the server, because this shouldn't
# happen in the context of our test case
handle_error("Test server failure:\n")
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
return
while self.running:
self.sock.settimeout(0.5)
self.sock.listen(5)
self.active = True
if self.flag:
# signal an event
self.flag.set()
while self.active:
try:
msg = sslconn.read()
if not msg:
# eof, so quit this handler
self.running = False
sslconn.close()
elif msg.strip() == 'over':
sslconn.close()
self.server.stop()
self.running = False
else:
if test_support.verbose:
sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
sslconn.write(msg.lower())
except ssl.sslerror:
handle_error("Test server failure:\n")
sslconn.close()
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
newconn, connaddr = self.sock.accept()
if test_support.verbose and self.chatty:
sys.stdout.write(' server: new connection from '
+ str(connaddr) + '\n')
handler = self.ConnectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
except:
handle_error('')
if self.chatty:
handle_error("Test server failure:\n")
def __init__(self, port, certificate, ssl_version=None,
certreqs=None, cacerts=None):
if ssl_version is None:
ssl_version = ssl.PROTOCOL_TLSv1
if certreqs is None:
certreqs = ssl.CERT_NONE
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.sock = socket.socket()
self.flag = None
if hasattr(socket, 'SO_REUSEADDR'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
def stop (self):
self.active = False
self.sock.close()
def start (self, flag=None):
self.flag = flag
threading.Thread.start(self)
def run (self):
self.sock.settimeout(0.5)
self.sock.listen(5)
self.active = True
if self.flag:
# signal an event
self.flag.set()
while self.active:
def badCertTest (certfile):
server = ThreadedEchoServer(TESTPORT, CERTFILE,
certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
# try to connect
try:
try:
newconn, connaddr = self.sock.accept()
s = ssl.wrap_socket(socket.socket(),
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
s.connect(('127.0.0.1', TESTPORT))
except ssl.SSLError, x:
if test_support.verbose:
sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
handler = self.ConnectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
except:
handle_error("Test server failure:\n")
sys.stdout.write("\nSSLError is %s\n" % x[1])
else:
raise test_support.TestFailed(
"Use of invalid cert should have failed!")
finally:
server.stop()
server.join()
def serverParamsTest (certfile, protocol, certreqs, cacertsfile,
client_certfile, client_protocol=None, indata="FOO\n",
chatty=True, connectionchatty=False):
server = ThreadedEchoServer(TESTPORT, certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
chatty=chatty,
connectionchatty=connectionchatty)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
# try to connect
if client_protocol is None:
client_protocol = protocol
try:
try:
s = ssl.wrap_socket(socket.socket(),
certfile=client_certfile,
ca_certs=cacertsfile,
cert_reqs=certreqs,
ssl_version=client_protocol)
s.connect(('127.0.0.1', TESTPORT))
except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception, x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
if connectionchatty:
if test_support.verbose:
sys.stdout.write(
" client: sending %s...\n" % (repr(indata)))
s.write(indata)
outdata = s.read()
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: read %s\n" % repr(outdata))
if outdata != indata.lower():
raise test_support.TestFailed(
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
% (outdata[:min(len(outdata),20)], len(outdata),
indata[:min(len(indata),20)].lower(), len(indata)))
s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.ssl_shutdown()
s.close()
finally:
server.stop()
server.join()
def tryProtocolCombo (server_protocol,
client_protocol,
expectedToWork,
certsreqs=ssl.CERT_NONE):
if certsreqs == ssl.CERT_NONE:
certtype = "CERT_NONE"
elif certsreqs == ssl.CERT_OPTIONAL:
certtype = "CERT_OPTIONAL"
elif certsreqs == ssl.CERT_REQUIRED:
certtype = "CERT_REQUIRED"
if test_support.verbose:
formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
sys.stdout.write(formatstr %
(ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol),
certtype))
try:
serverParamsTest(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol, chatty=False)
except test_support.TestFailed:
if expectedToWork:
raise
else:
if not expectedToWork:
raise test_support.TestFailed(
"Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol)))
class ConnectedTests(unittest.TestCase):
def testRudeShutdown(self):
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# Then it rudely closes the socket, and sets Event `listener_gone`
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
if hasattr(socket, 'SO_REUSEPORT'):
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
port = test_support.bind_port(s, 'localhost', TESTPORT)
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', TESTPORT))
listener_gone.wait()
try:
ssl_sock = ssl.wrap_socket(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def testEcho (self):
if test_support.verbose:
sys.stdout.write("\n")
serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
chatty=True, connectionchatty=True)
def testReadCert(self):
if test_support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
server = ThreadedEchoServer(TESTPORT, CERTFILE,
certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE,
chatty=False)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
# try to connect
try:
try:
s = ssl.wrap_socket(socket.socket(),
certfile=CERTFILE,
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23)
s.connect(('127.0.0.1', TESTPORT))
except ssl.SSLError, x:
raise test_support.TestFailed(
"Unexpected SSL error: " + str(x))
except Exception, x:
raise test_support.TestFailed(
"Unexpected exception: " + str(x))
else:
if not s:
raise test_support.TestFailed(
"Can't SSL-handshake with test server")
cert = s.getpeercert()
if not cert:
raise test_support.TestFailed(
"Can't get peer certificate.")
cipher = s.cipher()
if test_support.verbose:
sys.stdout.write(pprint.pformat(cert) + '\n')
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
if not cert.has_key('subject'):
raise test_support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if ((('organizationName', 'Python Software Foundation'),)
not in cert['subject']):
raise test_support.TestFailed(
"Missing or invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.");
s.close()
finally:
server.stop()
server.join()
def testNULLcert(self):
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"nullcert.pem"))
def testMalformedCert(self):
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badcert.pem"))
def testMalformedKey(self):
badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir,
"badkey.pem"))
def testProtocolSSL2(self):
if test_support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
def testProtocolSSL23(self):
if test_support.verbose:
sys.stdout.write("\n")
try:
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
except test_support.TestFailed, x:
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
if test_support.verbose:
sys.stdout.write(
" SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
% str(x))
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
def testProtocolSSL3(self):
if test_support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
def testProtocolTLS1(self):
if test_support.verbose:
sys.stdout.write("\n")
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
def testSTARTTLS (self):
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
server = ThreadedEchoServer(TESTPORT, CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True,
chatty=True,
connectionchatty=True)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
# try to connect
wrapped = False
try:
try:
s = socket.socket()
s.setblocking(1)
s.connect(('127.0.0.1', TESTPORT))
except Exception, x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
if test_support.verbose:
sys.stdout.write("\n")
for indata in msgs:
if test_support.verbose:
sys.stdout.write(" client: sending %s...\n" % repr(indata))
if wrapped:
conn.write(indata)
outdata = conn.read()
else:
s.send(indata)
outdata = s.recv(1024)
if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"):
if test_support.verbose:
sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata))
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
wrapped = True
else:
if test_support.verbose:
sys.stdout.write(" client: read %s from server\n" % repr(outdata))
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
if wrapped:
conn.write("over\n")
conn.ssl_shutdown()
else:
s.send("over\n")
s.close()
finally:
server.stop()
server.join()
def stop (self):
self.active = False
self.sock.close()
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
@ -337,33 +674,21 @@ def test_main(verbose=False):
global CERTFILE
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
if not CERTFILE:
sys.__stdout__.write("Skipping test_ssl ConnectedTests; "
"couldn't create a certificate.\n")
"keycert.pem")
if (not os.path.exists(CERTFILE)):
raise test_support.TestFailed("Can't read certificate files!")
tests = [BasicTests]
server = None
if CERTFILE and test_support.is_resource_enabled('network'):
server = ThreadedEchoServer(10024, CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
tests.append(ConnectedTests)
if _have_threads:
thread_info = test_support.threading_setup()
if CERTFILE and thread_info and test_support.is_resource_enabled('network'):
tests.append(ConnectedTests)
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
try:
test_support.run_unittest(*tests)
finally:
if server is not None and server.active:
server.stop()
# wait for it to stop
server.join()
test_support.threading_cleanup(*thread_info)
if _have_threads:
test_support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()

File diff suppressed because it is too large Load Diff