Qualify or remove or bare excepts. Simplify exception handling in places.
Remove uses of test_support.TestFailed.
This commit is contained in:
parent
435ba0cfb8
commit
db187847fb
|
@ -62,7 +62,7 @@ class BasicTests(unittest.TestCase):
|
|||
s.connect(("svn.python.org", 443))
|
||||
c = s.getpeercert()
|
||||
if c:
|
||||
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
self.fail("Peer cert %s shouldn't be here!")
|
||||
s.close()
|
||||
|
||||
# this should fail because we have no verification certs
|
||||
|
@ -112,8 +112,7 @@ class BasicTests(unittest.TestCase):
|
|||
d1 = ssl.PEM_cert_to_DER_cert(pem)
|
||||
p2 = ssl.DER_cert_to_PEM_cert(d1)
|
||||
d2 = ssl.PEM_cert_to_DER_cert(p2)
|
||||
if (d1 != d2):
|
||||
raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
|
||||
self.assertEqual(d1, d2)
|
||||
|
||||
def test_openssl_version(self):
|
||||
n = ssl.OPENSSL_VERSION_NUMBER
|
||||
|
@ -178,7 +177,7 @@ class NetworkedTests(unittest.TestCase):
|
|||
s.connect(("svn.python.org", 443))
|
||||
c = s.getpeercert()
|
||||
if c:
|
||||
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
self.fail("Peer cert %s shouldn't be here!")
|
||||
s.close()
|
||||
|
||||
# this should fail because we have no verification certs
|
||||
|
@ -197,8 +196,6 @@ class NetworkedTests(unittest.TestCase):
|
|||
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
||||
try:
|
||||
s.connect(("svn.python.org", 443))
|
||||
except ssl.SSLError, x:
|
||||
raise test_support.TestFailed("Unexpected exception %s" % x)
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
|
@ -249,7 +246,7 @@ class NetworkedTests(unittest.TestCase):
|
|||
|
||||
pem = ssl.get_server_certificate(("svn.python.org", 443))
|
||||
if not pem:
|
||||
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
self.fail("No server certificate on svn.python.org:443!")
|
||||
|
||||
try:
|
||||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
|
||||
|
@ -257,11 +254,11 @@ class NetworkedTests(unittest.TestCase):
|
|||
#should fail
|
||||
pass
|
||||
else:
|
||||
raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
|
||||
self.fail("Got server certificate %s for svn.python.org!" % pem)
|
||||
|
||||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
||||
if not pem:
|
||||
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
self.fail("No server certificate on svn.python.org:443!")
|
||||
if test_support.verbose:
|
||||
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
|
||||
|
||||
|
@ -334,20 +331,17 @@ else:
|
|||
ca_certs=self.server.cacerts,
|
||||
cert_reqs=self.server.certreqs,
|
||||
ciphers=self.server.ciphers)
|
||||
except:
|
||||
except ssl.SSLError:
|
||||
# XXX Various errors can have happened here, for example
|
||||
# a mismatching protocol version, an invalid certificate,
|
||||
# or a low-level bug. This should be made more discriminating.
|
||||
if self.server.chatty:
|
||||
handle_error("\n server: bad connection attempt from " +
|
||||
str(self.sock.getpeername()) + ":\n")
|
||||
self.close()
|
||||
if not self.server.expect_bad_connects:
|
||||
# here, we want to stop the server, because this shouldn't
|
||||
# happen in the context of our test case
|
||||
self.running = False
|
||||
# normally, we'd just stop here, but for the test
|
||||
# harness, we want to stop the server
|
||||
self.server.stop()
|
||||
self.running = False
|
||||
self.server.stop()
|
||||
return False
|
||||
|
||||
else:
|
||||
return True
|
||||
|
||||
|
@ -418,11 +412,9 @@ else:
|
|||
# normally, we'd just stop here, but for the test
|
||||
# harness, we want to stop the server
|
||||
self.server.stop()
|
||||
except:
|
||||
handle_error('')
|
||||
|
||||
def __init__(self, certificate, ssl_version=None,
|
||||
certreqs=None, cacerts=None, expect_bad_connects=False,
|
||||
certreqs=None, cacerts=None,
|
||||
chatty=True, connectionchatty=False, starttls_server=False,
|
||||
wrap_accepting_socket=False, ciphers=None):
|
||||
|
||||
|
@ -435,7 +427,6 @@ else:
|
|||
self.certreqs = certreqs
|
||||
self.cacerts = cacerts
|
||||
self.ciphers = ciphers
|
||||
self.expect_bad_connects = expect_bad_connects
|
||||
self.chatty = chatty
|
||||
self.connectionchatty = connectionchatty
|
||||
self.starttls_server = starttls_server
|
||||
|
@ -478,9 +469,6 @@ else:
|
|||
pass
|
||||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
except:
|
||||
if self.chatty:
|
||||
handle_error("Test server failure:\n")
|
||||
self.sock.close()
|
||||
|
||||
def stop (self):
|
||||
|
@ -526,7 +514,8 @@ else:
|
|||
self._do_ssl_handshake()
|
||||
else:
|
||||
data = self.recv(1024)
|
||||
self.send(data.lower())
|
||||
if data and data.strip() != 'over':
|
||||
self.send(data.lower())
|
||||
|
||||
def handle_close(self):
|
||||
self.close()
|
||||
|
@ -572,10 +561,7 @@ else:
|
|||
if self.flag:
|
||||
self.flag.set()
|
||||
while self.active:
|
||||
try:
|
||||
asyncore.loop(1)
|
||||
except:
|
||||
pass
|
||||
asyncore.loop(0.05)
|
||||
|
||||
def stop (self):
|
||||
self.active = False
|
||||
|
@ -698,12 +684,8 @@ else:
|
|||
except ssl.SSLError, x:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write("\nSSLError is %s\n" % x[1])
|
||||
except socket.error, x:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write("\nsocket.error is %s\n" % x[1])
|
||||
else:
|
||||
raise test_support.TestFailed(
|
||||
"Use of invalid cert should have failed!")
|
||||
self.fail("Use of invalid cert should have failed!")
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
@ -729,39 +711,33 @@ else:
|
|||
if client_protocol is None:
|
||||
client_protocol = protocol
|
||||
try:
|
||||
try:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
certfile=client_certfile,
|
||||
ca_certs=cacertsfile,
|
||||
ciphers=ciphers,
|
||||
cert_reqs=certreqs,
|
||||
ssl_version=client_protocol)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError, x:
|
||||
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
except Exception, x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
for arg in [indata, bytearray(indata), memoryview(indata)]:
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(arg)))
|
||||
s.write(arg)
|
||||
outdata = s.read()
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
if outdata != indata.lower():
|
||||
raise test_support.TestFailed(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (outdata[:min(len(outdata),20)], len(outdata),
|
||||
indata[:min(len(indata),20)].lower(), len(indata)))
|
||||
s.write("over\n")
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
certfile=client_certfile,
|
||||
ca_certs=cacertsfile,
|
||||
ciphers=ciphers,
|
||||
cert_reqs=certreqs,
|
||||
ssl_version=client_protocol)
|
||||
s.connect((HOST, server.port))
|
||||
for arg in [indata, bytearray(indata), memoryview(indata)]:
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(arg)))
|
||||
s.write(arg)
|
||||
outdata = s.read()
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
if outdata != indata.lower():
|
||||
self.fail(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (outdata[:min(len(outdata),20)], len(outdata),
|
||||
indata[:min(len(indata),20)].lower(), len(indata)))
|
||||
s.write("over\n")
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
@ -793,12 +769,17 @@ else:
|
|||
serverParamsTest(CERTFILE, server_protocol, certsreqs,
|
||||
CERTFILE, CERTFILE, client_protocol,
|
||||
ciphers="ALL", chatty=False)
|
||||
except test_support.TestFailed:
|
||||
# Protocol mismatch can result in either an SSLError, or a
|
||||
# "Connection reset by peer" error.
|
||||
except ssl.SSLError:
|
||||
if expectedToWork:
|
||||
raise
|
||||
except socket.error as e:
|
||||
if expectedToWork or e.errno != errno.ECONNRESET:
|
||||
raise
|
||||
else:
|
||||
if not expectedToWork:
|
||||
raise test_support.TestFailed(
|
||||
self.fail(
|
||||
"Client protocol %s succeeded with server protocol %s!"
|
||||
% (ssl.get_protocol_name(client_protocol),
|
||||
ssl.get_protocol_name(server_protocol)))
|
||||
|
@ -835,8 +816,7 @@ else:
|
|||
except IOError:
|
||||
pass
|
||||
else:
|
||||
raise test_support.TestFailed(
|
||||
'connecting to closed SSL socket should have failed')
|
||||
self.fail('connecting to closed SSL socket should have failed')
|
||||
|
||||
t = threading.Thread(target=listener)
|
||||
t.start()
|
||||
|
@ -869,41 +849,27 @@ else:
|
|||
flag.wait()
|
||||
# try to connect
|
||||
try:
|
||||
try:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ssl_version=ssl.PROTOCOL_SSLv23)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError, x:
|
||||
raise test_support.TestFailed(
|
||||
"Unexpected SSL error: " + str(x))
|
||||
except Exception, x:
|
||||
raise test_support.TestFailed(
|
||||
"Unexpected exception: " + str(x))
|
||||
else:
|
||||
if not s:
|
||||
raise test_support.TestFailed(
|
||||
"Can't SSL-handshake with test server")
|
||||
cert = s.getpeercert()
|
||||
if not cert:
|
||||
raise test_support.TestFailed(
|
||||
"Can't get peer certificate.")
|
||||
cipher = s.cipher()
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(pprint.pformat(cert) + '\n')
|
||||
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
|
||||
if 'subject' not in cert:
|
||||
raise test_support.TestFailed(
|
||||
"No subject field in certificate: %s." %
|
||||
pprint.pformat(cert))
|
||||
if ((('organizationName', 'Python Software Foundation'),)
|
||||
not in cert['subject']):
|
||||
raise test_support.TestFailed(
|
||||
"Missing or invalid 'organizationName' field in certificate subject; "
|
||||
"should be 'Python Software Foundation'.")
|
||||
s.close()
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_REQUIRED,
|
||||
ssl_version=ssl.PROTOCOL_SSLv23)
|
||||
s.connect((HOST, server.port))
|
||||
cert = s.getpeercert()
|
||||
self.assertTrue(cert, "Can't get peer certificate.")
|
||||
cipher = s.cipher()
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(pprint.pformat(cert) + '\n')
|
||||
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
|
||||
if 'subject' not in cert:
|
||||
self.fail("No subject field in certificate: %s." %
|
||||
pprint.pformat(cert))
|
||||
if ((('organizationName', 'Python Software Foundation'),)
|
||||
not in cert['subject']):
|
||||
self.fail(
|
||||
"Missing or invalid 'organizationName' field in certificate subject; "
|
||||
"should be 'Python Software Foundation'.")
|
||||
s.close()
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
@ -936,7 +902,7 @@ else:
|
|||
sys.stdout.write("\n")
|
||||
try:
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
|
||||
except test_support.TestFailed, x:
|
||||
except (SSLError, socket.error), x:
|
||||
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
|
@ -990,52 +956,48 @@ else:
|
|||
# try to connect
|
||||
wrapped = False
|
||||
try:
|
||||
try:
|
||||
s = socket.socket()
|
||||
s.setblocking(1)
|
||||
s.connect((HOST, server.port))
|
||||
except Exception, x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
s = socket.socket()
|
||||
s.setblocking(1)
|
||||
s.connect((HOST, server.port))
|
||||
if test_support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
for indata in msgs:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
for indata in msgs:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % repr(indata))
|
||||
if wrapped:
|
||||
conn.write(indata)
|
||||
outdata = conn.read()
|
||||
else:
|
||||
s.send(indata)
|
||||
outdata = s.recv(1024)
|
||||
if (indata == "STARTTLS" and
|
||||
outdata.strip().lower().startswith("ok")):
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % repr(indata))
|
||||
if wrapped:
|
||||
conn.write(indata)
|
||||
outdata = conn.read()
|
||||
else:
|
||||
s.send(indata)
|
||||
outdata = s.recv(1024)
|
||||
if (indata == "STARTTLS" and
|
||||
outdata.strip().lower().startswith("ok")):
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %s from server, starting TLS...\n"
|
||||
% repr(outdata))
|
||||
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
wrapped = True
|
||||
elif (indata == "ENDTLS" and
|
||||
outdata.strip().lower().startswith("ok")):
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %s from server, ending TLS...\n"
|
||||
% repr(outdata))
|
||||
s = conn.unwrap()
|
||||
wrapped = False
|
||||
else:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %s from server\n" % repr(outdata))
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
if wrapped:
|
||||
conn.write("over\n")
|
||||
" client: read %s from server, starting TLS...\n"
|
||||
% repr(outdata))
|
||||
conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
wrapped = True
|
||||
elif (indata == "ENDTLS" and
|
||||
outdata.strip().lower().startswith("ok")):
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %s from server, ending TLS...\n"
|
||||
% repr(outdata))
|
||||
s = conn.unwrap()
|
||||
wrapped = False
|
||||
else:
|
||||
s.send("over\n")
|
||||
s.close()
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %s from server\n" % repr(outdata))
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
if wrapped:
|
||||
conn.write("over\n")
|
||||
else:
|
||||
s.send("over\n")
|
||||
s.close()
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
@ -1066,15 +1028,7 @@ else:
|
|||
" client: read %d bytes from remote server '%s'\n"
|
||||
% (len(d2), server))
|
||||
f.close()
|
||||
except:
|
||||
msg = ''.join(traceback.format_exception(*sys.exc_info()))
|
||||
if test_support.verbose:
|
||||
sys.stdout.write('\n' + msg)
|
||||
raise test_support.TestFailed(msg)
|
||||
else:
|
||||
if not (d1 == d2):
|
||||
raise test_support.TestFailed(
|
||||
"Couldn't fetch data from HTTPS server")
|
||||
self.assertEqual(d1, d2)
|
||||
finally:
|
||||
server.stop()
|
||||
server.join()
|
||||
|
@ -1102,30 +1056,24 @@ else:
|
|||
flag.wait()
|
||||
# try to connect
|
||||
try:
|
||||
try:
|
||||
s = ssl.wrap_socket(socket.socket())
|
||||
s.connect(('127.0.0.1', server.port))
|
||||
except ssl.SSLError, x:
|
||||
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
except Exception, x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(indata)))
|
||||
s.write(indata)
|
||||
outdata = s.read()
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
if outdata != indata.lower():
|
||||
raise test_support.TestFailed(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (outdata[:min(len(outdata),20)], len(outdata),
|
||||
indata[:min(len(indata),20)].lower(), len(indata)))
|
||||
s.write("over\n")
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
s = ssl.wrap_socket(socket.socket())
|
||||
s.connect(('127.0.0.1', server.port))
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(indata)))
|
||||
s.write(indata)
|
||||
outdata = s.read()
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
if outdata != indata.lower():
|
||||
self.fail(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (outdata[:min(len(outdata),20)], len(outdata),
|
||||
indata[:min(len(indata),20)].lower(), len(indata)))
|
||||
s.write("over\n")
|
||||
if test_support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
finally:
|
||||
server.stop()
|
||||
# wait for server thread to end
|
||||
|
@ -1148,19 +1096,14 @@ else:
|
|||
# wait for it to start
|
||||
flag.wait()
|
||||
# try to connect
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
s.connect((HOST, server.port))
|
||||
try:
|
||||
s = ssl.wrap_socket(socket.socket(),
|
||||
server_side=False,
|
||||
certfile=CERTFILE,
|
||||
ca_certs=CERTFILE,
|
||||
cert_reqs=ssl.CERT_NONE,
|
||||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError as x:
|
||||
self.fail("Unexpected SSL error: " + str(x))
|
||||
except Exception as x:
|
||||
self.fail("Unexpected exception: " + str(x))
|
||||
else:
|
||||
# helper methods for standardising recv* method signatures
|
||||
def _recv_into():
|
||||
b = bytearray("\0"*100)
|
||||
|
|
Loading…
Reference in New Issue