From 6ab7f009ba130bc79ecc8fde2f85b1d1a2430b26 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 27 Apr 2010 10:41:42 +0000 Subject: [PATCH] Merged revisions 80529 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk ........ r80529 | antoine.pitrou | 2010-04-27 12:32:58 +0200 (mar., 27 avril 2010) | 4 lines Qualify or remove or bare excepts. Simplify exception handling in places. Remove uses of test_support.TestFailed. ........ --- Lib/test/test_ssl.py | 329 ++++++++++++++++++------------------------- 1 file changed, 136 insertions(+), 193 deletions(-) diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index f87da9a4209..e8f03d1c36f 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -65,7 +65,7 @@ class BasicTests(unittest.TestCase): s.connect(("svn.python.org", 443)) c = s.getpeercert() if c: - raise test_support.TestFailed("Peer cert %s shouldn't be here!") + self.fail("Peer cert %s shouldn't be here!") s.close() # this should fail because we have no verification certs @@ -115,8 +115,7 @@ class BasicTests(unittest.TestCase): d1 = ssl.PEM_cert_to_DER_cert(pem) p2 = ssl.DER_cert_to_PEM_cert(d1) d2 = ssl.PEM_cert_to_DER_cert(p2) - if (d1 != d2): - raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") + self.assertEqual(d1, d2) def test_refcycle(self): # Issue #7943: an SSL object doesn't create reference cycles with @@ -136,7 +135,7 @@ class NetworkedTests(unittest.TestCase): s.connect(("svn.python.org", 443)) c = s.getpeercert() if c: - raise test_support.TestFailed("Peer cert %s shouldn't be here!") + self.fail("Peer cert %s shouldn't be here!") s.close() # this should fail because we have no verification certs @@ -155,8 +154,6 @@ class NetworkedTests(unittest.TestCase): ca_certs=SVN_PYTHON_ORG_ROOT_CERT) try: s.connect(("svn.python.org", 443)) - except ssl.SSLError, x: - raise test_support.TestFailed("Unexpected exception %s" % x) finally: s.close() @@ -213,7 +210,7 @@ class NetworkedTests(unittest.TestCase): pem = ssl.get_server_certificate(("svn.python.org", 443)) if not pem: - raise test_support.TestFailed("No server certificate on svn.python.org:443!") + self.fail("No server certificate on svn.python.org:443!") try: pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) @@ -221,11 +218,11 @@ class NetworkedTests(unittest.TestCase): #should fail pass else: - raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) + self.fail("Got server certificate %s for svn.python.org!" % pem) pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) if not pem: - raise test_support.TestFailed("No server certificate on svn.python.org:443!") + self.fail("No server certificate on svn.python.org:443!") if test_support.verbose: sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) @@ -299,20 +296,17 @@ else: ssl_version=self.server.protocol, ca_certs=self.server.cacerts, cert_reqs=self.server.certreqs) - except: + except ssl.SSLError: + # XXX Various errors can have happened here, for example + # a mismatching protocol version, an invalid certificate, + # or a low-level bug. This should be made more discriminating. if self.server.chatty: handle_error("\n server: bad connection attempt from " + str(self.sock.getpeername()) + ":\n") self.close() - if not self.server.expect_bad_connects: - # here, we want to stop the server, because this shouldn't - # happen in the context of our test case - self.running = False - # normally, we'd just stop here, but for the test - # harness, we want to stop the server - self.server.stop() + self.running = False + self.server.stop() return False - else: return True @@ -383,11 +377,9 @@ else: # normally, we'd just stop here, but for the test # harness, we want to stop the server self.server.stop() - except: - handle_error('') def __init__(self, certificate, ssl_version=None, - certreqs=None, cacerts=None, expect_bad_connects=False, + certreqs=None, cacerts=None, chatty=True, connectionchatty=False, starttls_server=False, wrap_accepting_socket=False): @@ -399,7 +391,6 @@ else: self.protocol = ssl_version self.certreqs = certreqs self.cacerts = cacerts - self.expect_bad_connects = expect_bad_connects self.chatty = chatty self.connectionchatty = connectionchatty self.starttls_server = starttls_server @@ -441,9 +432,6 @@ else: pass except KeyboardInterrupt: self.stop() - except: - if self.chatty: - handle_error("Test server failure:\n") self.sock.close() def stop (self): @@ -489,7 +477,8 @@ else: self._do_ssl_handshake() else: data = self.recv(1024) - self.send(data.lower()) + if data and data.strip() != 'over': + self.send(data.lower()) def handle_close(self): self.close() @@ -535,10 +524,7 @@ else: if self.flag: self.flag.set() while self.active: - try: - asyncore.loop(1) - except: - pass + asyncore.loop(0.05) def stop (self): self.active = False @@ -661,12 +647,8 @@ else: except ssl.SSLError, x: if test_support.verbose: sys.stdout.write("\nSSLError is %s\n" % x[1]) - except socket.error, x: - if test_support.verbose: - sys.stdout.write("\nsocket.error is %s\n" % x[1]) else: - raise test_support.TestFailed( - "Use of invalid cert should have failed!") + self.fail("Use of invalid cert should have failed!") finally: server.stop() server.join() @@ -691,37 +673,31 @@ else: if client_protocol is None: client_protocol = protocol try: - try: - s = ssl.wrap_socket(socket.socket(), - certfile=client_certfile, - ca_certs=cacertsfile, - cert_reqs=certreqs, - ssl_version=client_protocol) - s.connect((HOST, server.port)) - except ssl.SSLError, x: - raise test_support.TestFailed("Unexpected SSL error: " + str(x)) - except Exception, x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) - else: - if connectionchatty: - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % (repr(indata))) - s.write(indata) - outdata = s.read() - if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) - if outdata != indata.lower(): - raise test_support.TestFailed( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if connectionchatty: - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - s.close() + s = ssl.wrap_socket(socket.socket(), + certfile=client_certfile, + ca_certs=cacertsfile, + cert_reqs=certreqs, + ssl_version=client_protocol) + s.connect((HOST, server.port)) + if connectionchatty: + if test_support.verbose: + sys.stdout.write( + " client: sending %s...\n" % (repr(indata))) + s.write(indata) + outdata = s.read() + if connectionchatty: + if test_support.verbose: + sys.stdout.write(" client: read %s\n" % repr(outdata)) + if outdata != indata.lower(): + self.fail( + "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" + % (outdata[:min(len(outdata),20)], len(outdata), + indata[:min(len(indata),20)].lower(), len(indata))) + s.write("over\n") + if connectionchatty: + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + s.close() finally: server.stop() server.join() @@ -749,12 +725,17 @@ else: try: serverParamsTest(CERTFILE, server_protocol, certsreqs, CERTFILE, CERTFILE, client_protocol, chatty=False) - except test_support.TestFailed: + # Protocol mismatch can result in either an SSLError, or a + # "Connection reset by peer" error. + except ssl.SSLError: if expectedToWork: raise + except socket.error as e: + if expectedToWork or e.errno != errno.ECONNRESET: + raise else: if not expectedToWork: - raise test_support.TestFailed( + self.fail( "Client protocol %s succeeded with server protocol %s!" % (ssl.get_protocol_name(client_protocol), ssl.get_protocol_name(server_protocol))) @@ -791,8 +772,7 @@ else: except IOError: pass else: - raise test_support.TestFailed( - 'connecting to closed SSL socket should have failed') + self.fail('connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) t.start() @@ -825,41 +805,27 @@ else: flag.wait() # try to connect try: - try: - s = ssl.wrap_socket(socket.socket(), - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_REQUIRED, - ssl_version=ssl.PROTOCOL_SSLv23) - s.connect((HOST, server.port)) - except ssl.SSLError, x: - raise test_support.TestFailed( - "Unexpected SSL error: " + str(x)) - except Exception, x: - raise test_support.TestFailed( - "Unexpected exception: " + str(x)) - else: - if not s: - raise test_support.TestFailed( - "Can't SSL-handshake with test server") - cert = s.getpeercert() - if not cert: - raise test_support.TestFailed( - "Can't get peer certificate.") - cipher = s.cipher() - if test_support.verbose: - sys.stdout.write(pprint.pformat(cert) + '\n') - sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') - if not cert.has_key('subject'): - raise test_support.TestFailed( - "No subject field in certificate: %s." % - pprint.pformat(cert)) - if ((('organizationName', 'Python Software Foundation'),) - not in cert['subject']): - raise test_support.TestFailed( - "Missing or invalid 'organizationName' field in certificate subject; " - "should be 'Python Software Foundation'.") - s.close() + s = ssl.wrap_socket(socket.socket(), + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_REQUIRED, + ssl_version=ssl.PROTOCOL_SSLv23) + s.connect((HOST, server.port)) + cert = s.getpeercert() + self.assertTrue(cert, "Can't get peer certificate.") + cipher = s.cipher() + if test_support.verbose: + sys.stdout.write(pprint.pformat(cert) + '\n') + sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') + if 'subject' not in cert: + self.fail("No subject field in certificate: %s." % + pprint.pformat(cert)) + if ((('organizationName', 'Python Software Foundation'),) + not in cert['subject']): + self.fail( + "Missing or invalid 'organizationName' field in certificate subject; " + "should be 'Python Software Foundation'.") + s.close() finally: server.stop() server.join() @@ -892,7 +858,7 @@ else: sys.stdout.write("\n") try: tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) - except test_support.TestFailed, x: + except (SSLError, socket.error), x: # this fails on some older versions of OpenSSL (0.9.7l, for instance) if test_support.verbose: sys.stdout.write( @@ -946,52 +912,48 @@ else: # try to connect wrapped = False try: - try: - s = socket.socket() - s.setblocking(1) - s.connect((HOST, server.port)) - except Exception, x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) - else: + s = socket.socket() + s.setblocking(1) + s.connect((HOST, server.port)) + if test_support.verbose: + sys.stdout.write("\n") + for indata in msgs: if test_support.verbose: - sys.stdout.write("\n") - for indata in msgs: + sys.stdout.write( + " client: sending %s...\n" % repr(indata)) + if wrapped: + conn.write(indata) + outdata = conn.read() + else: + s.send(indata) + outdata = s.recv(1024) + if (indata == "STARTTLS" and + outdata.strip().lower().startswith("ok")): if test_support.verbose: sys.stdout.write( - " client: sending %s...\n" % repr(indata)) - if wrapped: - conn.write(indata) - outdata = conn.read() - else: - s.send(indata) - outdata = s.recv(1024) - if (indata == "STARTTLS" and - outdata.strip().lower().startswith("ok")): - if test_support.verbose: - sys.stdout.write( - " client: read %s from server, starting TLS...\n" - % repr(outdata)) - conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) - wrapped = True - elif (indata == "ENDTLS" and - outdata.strip().lower().startswith("ok")): - if test_support.verbose: - sys.stdout.write( - " client: read %s from server, ending TLS...\n" - % repr(outdata)) - s = conn.unwrap() - wrapped = False - else: - if test_support.verbose: - sys.stdout.write( - " client: read %s from server\n" % repr(outdata)) - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - if wrapped: - conn.write("over\n") + " client: read %s from server, starting TLS...\n" + % repr(outdata)) + conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) + wrapped = True + elif (indata == "ENDTLS" and + outdata.strip().lower().startswith("ok")): + if test_support.verbose: + sys.stdout.write( + " client: read %s from server, ending TLS...\n" + % repr(outdata)) + s = conn.unwrap() + wrapped = False else: - s.send("over\n") - s.close() + if test_support.verbose: + sys.stdout.write( + " client: read %s from server\n" % repr(outdata)) + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + if wrapped: + conn.write("over\n") + else: + s.send("over\n") + s.close() finally: server.stop() server.join() @@ -1021,15 +983,7 @@ else: " client: read %d bytes from remote server '%s'\n" % (len(d2), server)) f.close() - except: - msg = ''.join(traceback.format_exception(*sys.exc_info())) - if test_support.verbose: - sys.stdout.write('\n' + msg) - raise test_support.TestFailed(msg) - else: - if not (d1 == d2): - raise test_support.TestFailed( - "Couldn't fetch data from HTTPS server") + self.assertEqual(d1, d2) finally: server.stop() server.join() @@ -1057,30 +1011,24 @@ else: flag.wait() # try to connect try: - try: - s = ssl.wrap_socket(socket.socket()) - s.connect(('127.0.0.1', server.port)) - except ssl.SSLError, x: - raise test_support.TestFailed("Unexpected SSL error: " + str(x)) - except Exception, x: - raise test_support.TestFailed("Unexpected exception: " + str(x)) - else: - if test_support.verbose: - sys.stdout.write( - " client: sending %s...\n" % (repr(indata))) - s.write(indata) - outdata = s.read() - if test_support.verbose: - sys.stdout.write(" client: read %s\n" % repr(outdata)) - if outdata != indata.lower(): - raise test_support.TestFailed( - "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" - % (outdata[:min(len(outdata),20)], len(outdata), - indata[:min(len(indata),20)].lower(), len(indata))) - s.write("over\n") - if test_support.verbose: - sys.stdout.write(" client: closing connection.\n") - s.close() + s = ssl.wrap_socket(socket.socket()) + s.connect(('127.0.0.1', server.port)) + if test_support.verbose: + sys.stdout.write( + " client: sending %s...\n" % (repr(indata))) + s.write(indata) + outdata = s.read() + if test_support.verbose: + sys.stdout.write(" client: read %s\n" % repr(outdata)) + if outdata != indata.lower(): + self.fail( + "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" + % (outdata[:min(len(outdata),20)], len(outdata), + indata[:min(len(indata),20)].lower(), len(indata))) + s.write("over\n") + if test_support.verbose: + sys.stdout.write(" client: closing connection.\n") + s.close() finally: server.stop() # wait for server thread to end @@ -1103,19 +1051,14 @@ else: # wait for it to start flag.wait() # try to connect + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) try: - s = ssl.wrap_socket(socket.socket(), - server_side=False, - certfile=CERTFILE, - ca_certs=CERTFILE, - cert_reqs=ssl.CERT_NONE, - ssl_version=ssl.PROTOCOL_TLSv1) - s.connect((HOST, server.port)) - except ssl.SSLError as x: - raise support.TestFailed("Unexpected SSL error: " + str(x)) - except Exception as x: - raise support.TestFailed("Unexpected exception: " + str(x)) - else: # helper methods for standardising recv* method signatures def _recv_into(): b = bytearray("\0"*100)