Merged revisions 80529 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r80529 | antoine.pitrou | 2010-04-27 12:32:58 +0200 (mar., 27 avril 2010) | 4 lines

  Qualify or remove or bare excepts.  Simplify exception handling in places.
  Remove uses of test_support.TestFailed.
........
This commit is contained in:
Antoine Pitrou 2010-04-27 10:41:42 +00:00
parent 77a1c36fb7
commit 6ab7f009ba
1 changed files with 136 additions and 193 deletions

View File

@ -65,7 +65,7 @@ class BasicTests(unittest.TestCase):
s.connect(("svn.python.org", 443)) s.connect(("svn.python.org", 443))
c = s.getpeercert() c = s.getpeercert()
if c: if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!") self.fail("Peer cert %s shouldn't be here!")
s.close() s.close()
# this should fail because we have no verification certs # this should fail because we have no verification certs
@ -115,8 +115,7 @@ class BasicTests(unittest.TestCase):
d1 = ssl.PEM_cert_to_DER_cert(pem) d1 = ssl.PEM_cert_to_DER_cert(pem)
p2 = ssl.DER_cert_to_PEM_cert(d1) p2 = ssl.DER_cert_to_PEM_cert(d1)
d2 = ssl.PEM_cert_to_DER_cert(p2) d2 = ssl.PEM_cert_to_DER_cert(p2)
if (d1 != d2): self.assertEqual(d1, d2)
raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
def test_refcycle(self): def test_refcycle(self):
# Issue #7943: an SSL object doesn't create reference cycles with # Issue #7943: an SSL object doesn't create reference cycles with
@ -136,7 +135,7 @@ class NetworkedTests(unittest.TestCase):
s.connect(("svn.python.org", 443)) s.connect(("svn.python.org", 443))
c = s.getpeercert() c = s.getpeercert()
if c: if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!") self.fail("Peer cert %s shouldn't be here!")
s.close() s.close()
# this should fail because we have no verification certs # this should fail because we have no verification certs
@ -155,8 +154,6 @@ class NetworkedTests(unittest.TestCase):
ca_certs=SVN_PYTHON_ORG_ROOT_CERT) ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try: try:
s.connect(("svn.python.org", 443)) s.connect(("svn.python.org", 443))
except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected exception %s" % x)
finally: finally:
s.close() s.close()
@ -213,7 +210,7 @@ class NetworkedTests(unittest.TestCase):
pem = ssl.get_server_certificate(("svn.python.org", 443)) pem = ssl.get_server_certificate(("svn.python.org", 443))
if not pem: if not pem:
raise test_support.TestFailed("No server certificate on svn.python.org:443!") self.fail("No server certificate on svn.python.org:443!")
try: try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
@ -221,11 +218,11 @@ class NetworkedTests(unittest.TestCase):
#should fail #should fail
pass pass
else: else:
raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem: if not pem:
raise test_support.TestFailed("No server certificate on svn.python.org:443!") self.fail("No server certificate on svn.python.org:443!")
if test_support.verbose: if test_support.verbose:
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
@ -299,20 +296,17 @@ else:
ssl_version=self.server.protocol, ssl_version=self.server.protocol,
ca_certs=self.server.cacerts, ca_certs=self.server.cacerts,
cert_reqs=self.server.certreqs) cert_reqs=self.server.certreqs)
except: except ssl.SSLError:
# XXX Various errors can have happened here, for example
# a mismatching protocol version, an invalid certificate,
# or a low-level bug. This should be made more discriminating.
if self.server.chatty: if self.server.chatty:
handle_error("\n server: bad connection attempt from " + handle_error("\n server: bad connection attempt from " +
str(self.sock.getpeername()) + ":\n") str(self.sock.getpeername()) + ":\n")
self.close() self.close()
if not self.server.expect_bad_connects: self.running = False
# here, we want to stop the server, because this shouldn't self.server.stop()
# happen in the context of our test case
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
return False return False
else: else:
return True return True
@ -383,11 +377,9 @@ else:
# normally, we'd just stop here, but for the test # normally, we'd just stop here, but for the test
# harness, we want to stop the server # harness, we want to stop the server
self.server.stop() self.server.stop()
except:
handle_error('')
def __init__(self, certificate, ssl_version=None, def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False, certreqs=None, cacerts=None,
chatty=True, connectionchatty=False, starttls_server=False, chatty=True, connectionchatty=False, starttls_server=False,
wrap_accepting_socket=False): wrap_accepting_socket=False):
@ -399,7 +391,6 @@ else:
self.protocol = ssl_version self.protocol = ssl_version
self.certreqs = certreqs self.certreqs = certreqs
self.cacerts = cacerts self.cacerts = cacerts
self.expect_bad_connects = expect_bad_connects
self.chatty = chatty self.chatty = chatty
self.connectionchatty = connectionchatty self.connectionchatty = connectionchatty
self.starttls_server = starttls_server self.starttls_server = starttls_server
@ -441,9 +432,6 @@ else:
pass pass
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop() self.stop()
except:
if self.chatty:
handle_error("Test server failure:\n")
self.sock.close() self.sock.close()
def stop (self): def stop (self):
@ -489,7 +477,8 @@ else:
self._do_ssl_handshake() self._do_ssl_handshake()
else: else:
data = self.recv(1024) data = self.recv(1024)
self.send(data.lower()) if data and data.strip() != 'over':
self.send(data.lower())
def handle_close(self): def handle_close(self):
self.close() self.close()
@ -535,10 +524,7 @@ else:
if self.flag: if self.flag:
self.flag.set() self.flag.set()
while self.active: while self.active:
try: asyncore.loop(0.05)
asyncore.loop(1)
except:
pass
def stop (self): def stop (self):
self.active = False self.active = False
@ -661,12 +647,8 @@ else:
except ssl.SSLError, x: except ssl.SSLError, x:
if test_support.verbose: if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x[1]) sys.stdout.write("\nSSLError is %s\n" % x[1])
except socket.error, x:
if test_support.verbose:
sys.stdout.write("\nsocket.error is %s\n" % x[1])
else: else:
raise test_support.TestFailed( self.fail("Use of invalid cert should have failed!")
"Use of invalid cert should have failed!")
finally: finally:
server.stop() server.stop()
server.join() server.join()
@ -691,37 +673,31 @@ else:
if client_protocol is None: if client_protocol is None:
client_protocol = protocol client_protocol = protocol
try: try:
try: s = ssl.wrap_socket(socket.socket(),
s = ssl.wrap_socket(socket.socket(), certfile=client_certfile,
certfile=client_certfile, ca_certs=cacertsfile,
ca_certs=cacertsfile, cert_reqs=certreqs,
cert_reqs=certreqs, ssl_version=client_protocol)
ssl_version=client_protocol) s.connect((HOST, server.port))
s.connect((HOST, server.port)) if connectionchatty:
except ssl.SSLError, x: if test_support.verbose:
raise test_support.TestFailed("Unexpected SSL error: " + str(x)) sys.stdout.write(
except Exception, x: " client: sending %s...\n" % (repr(indata)))
raise test_support.TestFailed("Unexpected exception: " + str(x)) s.write(indata)
else: outdata = s.read()
if connectionchatty: if connectionchatty:
if test_support.verbose: if test_support.verbose:
sys.stdout.write( sys.stdout.write(" client: read %s\n" % repr(outdata))
" client: sending %s...\n" % (repr(indata))) if outdata != indata.lower():
s.write(indata) self.fail(
outdata = s.read() "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
if connectionchatty: % (outdata[:min(len(outdata),20)], len(outdata),
if test_support.verbose: indata[:min(len(indata),20)].lower(), len(indata)))
sys.stdout.write(" client: read %s\n" % repr(outdata)) s.write("over\n")
if outdata != indata.lower(): if connectionchatty:
raise test_support.TestFailed( if test_support.verbose:
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" sys.stdout.write(" client: closing connection.\n")
% (outdata[:min(len(outdata),20)], len(outdata), s.close()
indata[:min(len(indata),20)].lower(), len(indata)))
s.write("over\n")
if connectionchatty:
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.close()
finally: finally:
server.stop() server.stop()
server.join() server.join()
@ -749,12 +725,17 @@ else:
try: try:
serverParamsTest(CERTFILE, server_protocol, certsreqs, serverParamsTest(CERTFILE, server_protocol, certsreqs,
CERTFILE, CERTFILE, client_protocol, chatty=False) CERTFILE, CERTFILE, client_protocol, chatty=False)
except test_support.TestFailed: # Protocol mismatch can result in either an SSLError, or a
# "Connection reset by peer" error.
except ssl.SSLError:
if expectedToWork: if expectedToWork:
raise raise
except socket.error as e:
if expectedToWork or e.errno != errno.ECONNRESET:
raise
else: else:
if not expectedToWork: if not expectedToWork:
raise test_support.TestFailed( self.fail(
"Client protocol %s succeeded with server protocol %s!" "Client protocol %s succeeded with server protocol %s!"
% (ssl.get_protocol_name(client_protocol), % (ssl.get_protocol_name(client_protocol),
ssl.get_protocol_name(server_protocol))) ssl.get_protocol_name(server_protocol)))
@ -791,8 +772,7 @@ else:
except IOError: except IOError:
pass pass
else: else:
raise test_support.TestFailed( self.fail('connecting to closed SSL socket should have failed')
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener) t = threading.Thread(target=listener)
t.start() t.start()
@ -825,41 +805,27 @@ else:
flag.wait() flag.wait()
# try to connect # try to connect
try: try:
try: s = ssl.wrap_socket(socket.socket(),
s = ssl.wrap_socket(socket.socket(), certfile=CERTFILE,
certfile=CERTFILE, ca_certs=CERTFILE,
ca_certs=CERTFILE, cert_reqs=ssl.CERT_REQUIRED,
cert_reqs=ssl.CERT_REQUIRED, ssl_version=ssl.PROTOCOL_SSLv23)
ssl_version=ssl.PROTOCOL_SSLv23) s.connect((HOST, server.port))
s.connect((HOST, server.port)) cert = s.getpeercert()
except ssl.SSLError, x: self.assertTrue(cert, "Can't get peer certificate.")
raise test_support.TestFailed( cipher = s.cipher()
"Unexpected SSL error: " + str(x)) if test_support.verbose:
except Exception, x: sys.stdout.write(pprint.pformat(cert) + '\n')
raise test_support.TestFailed( sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
"Unexpected exception: " + str(x)) if 'subject' not in cert:
else: self.fail("No subject field in certificate: %s." %
if not s: pprint.pformat(cert))
raise test_support.TestFailed( if ((('organizationName', 'Python Software Foundation'),)
"Can't SSL-handshake with test server") not in cert['subject']):
cert = s.getpeercert() self.fail(
if not cert: "Missing or invalid 'organizationName' field in certificate subject; "
raise test_support.TestFailed( "should be 'Python Software Foundation'.")
"Can't get peer certificate.") s.close()
cipher = s.cipher()
if test_support.verbose:
sys.stdout.write(pprint.pformat(cert) + '\n')
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
if not cert.has_key('subject'):
raise test_support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if ((('organizationName', 'Python Software Foundation'),)
not in cert['subject']):
raise test_support.TestFailed(
"Missing or invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.")
s.close()
finally: finally:
server.stop() server.stop()
server.join() server.join()
@ -892,7 +858,7 @@ else:
sys.stdout.write("\n") sys.stdout.write("\n")
try: try:
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
except test_support.TestFailed, x: except (SSLError, socket.error), x:
# this fails on some older versions of OpenSSL (0.9.7l, for instance) # this fails on some older versions of OpenSSL (0.9.7l, for instance)
if test_support.verbose: if test_support.verbose:
sys.stdout.write( sys.stdout.write(
@ -946,52 +912,48 @@ else:
# try to connect # try to connect
wrapped = False wrapped = False
try: try:
try: s = socket.socket()
s = socket.socket() s.setblocking(1)
s.setblocking(1) s.connect((HOST, server.port))
s.connect((HOST, server.port)) if test_support.verbose:
except Exception, x: sys.stdout.write("\n")
raise test_support.TestFailed("Unexpected exception: " + str(x)) for indata in msgs:
else:
if test_support.verbose: if test_support.verbose:
sys.stdout.write("\n") sys.stdout.write(
for indata in msgs: " client: sending %s...\n" % repr(indata))
if wrapped:
conn.write(indata)
outdata = conn.read()
else:
s.send(indata)
outdata = s.recv(1024)
if (indata == "STARTTLS" and
outdata.strip().lower().startswith("ok")):
if test_support.verbose: if test_support.verbose:
sys.stdout.write( sys.stdout.write(
" client: sending %s...\n" % repr(indata)) " client: read %s from server, starting TLS...\n"
if wrapped: % repr(outdata))
conn.write(indata) conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
outdata = conn.read() wrapped = True
else: elif (indata == "ENDTLS" and
s.send(indata) outdata.strip().lower().startswith("ok")):
outdata = s.recv(1024) if test_support.verbose:
if (indata == "STARTTLS" and sys.stdout.write(
outdata.strip().lower().startswith("ok")): " client: read %s from server, ending TLS...\n"
if test_support.verbose: % repr(outdata))
sys.stdout.write( s = conn.unwrap()
" client: read %s from server, starting TLS...\n" wrapped = False
% repr(outdata))
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
wrapped = True
elif (indata == "ENDTLS" and
outdata.strip().lower().startswith("ok")):
if test_support.verbose:
sys.stdout.write(
" client: read %s from server, ending TLS...\n"
% repr(outdata))
s = conn.unwrap()
wrapped = False
else:
if test_support.verbose:
sys.stdout.write(
" client: read %s from server\n" % repr(outdata))
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
if wrapped:
conn.write("over\n")
else: else:
s.send("over\n") if test_support.verbose:
s.close() sys.stdout.write(
" client: read %s from server\n" % repr(outdata))
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
if wrapped:
conn.write("over\n")
else:
s.send("over\n")
s.close()
finally: finally:
server.stop() server.stop()
server.join() server.join()
@ -1021,15 +983,7 @@ else:
" client: read %d bytes from remote server '%s'\n" " client: read %d bytes from remote server '%s'\n"
% (len(d2), server)) % (len(d2), server))
f.close() f.close()
except: self.assertEqual(d1, d2)
msg = ''.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
sys.stdout.write('\n' + msg)
raise test_support.TestFailed(msg)
else:
if not (d1 == d2):
raise test_support.TestFailed(
"Couldn't fetch data from HTTPS server")
finally: finally:
server.stop() server.stop()
server.join() server.join()
@ -1057,30 +1011,24 @@ else:
flag.wait() flag.wait()
# try to connect # try to connect
try: try:
try: s = ssl.wrap_socket(socket.socket())
s = ssl.wrap_socket(socket.socket()) s.connect(('127.0.0.1', server.port))
s.connect(('127.0.0.1', server.port)) if test_support.verbose:
except ssl.SSLError, x: sys.stdout.write(
raise test_support.TestFailed("Unexpected SSL error: " + str(x)) " client: sending %s...\n" % (repr(indata)))
except Exception, x: s.write(indata)
raise test_support.TestFailed("Unexpected exception: " + str(x)) outdata = s.read()
else: if test_support.verbose:
if test_support.verbose: sys.stdout.write(" client: read %s\n" % repr(outdata))
sys.stdout.write( if outdata != indata.lower():
" client: sending %s...\n" % (repr(indata))) self.fail(
s.write(indata) "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
outdata = s.read() % (outdata[:min(len(outdata),20)], len(outdata),
if test_support.verbose: indata[:min(len(indata),20)].lower(), len(indata)))
sys.stdout.write(" client: read %s\n" % repr(outdata)) s.write("over\n")
if outdata != indata.lower(): if test_support.verbose:
raise test_support.TestFailed( sys.stdout.write(" client: closing connection.\n")
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n" s.close()
% (outdata[:min(len(outdata),20)], len(outdata),
indata[:min(len(indata),20)].lower(), len(indata)))
s.write("over\n")
if test_support.verbose:
sys.stdout.write(" client: closing connection.\n")
s.close()
finally: finally:
server.stop() server.stop()
# wait for server thread to end # wait for server thread to end
@ -1103,19 +1051,14 @@ else:
# wait for it to start # wait for it to start
flag.wait() flag.wait()
# try to connect # try to connect
s = ssl.wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_TLSv1)
s.connect((HOST, server.port))
try: try:
s = ssl.wrap_socket(socket.socket(),
server_side=False,
certfile=CERTFILE,
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_TLSv1)
s.connect((HOST, server.port))
except ssl.SSLError as x:
raise support.TestFailed("Unexpected SSL error: " + str(x))
except Exception as x:
raise support.TestFailed("Unexpected exception: " + str(x))
else:
# helper methods for standardising recv* method signatures # helper methods for standardising recv* method signatures
def _recv_into(): def _recv_into():
b = bytearray("\0"*100) b = bytearray("\0"*100)