# Test just the SSL support in the socket module, in a moderately bogus way. import sys import unittest from test import test_support import socket import errno import threading import subprocess import time import os import urllib # Optionally test SSL support, if we have it in the tested platform skip_expected = not hasattr(socket, "ssl") class ConnectedTests(unittest.TestCase): def urlopen(self, host, *args, **kwargs): # Connecting to remote hosts is flaky. Make it more robust # by retrying the connection several times. for i in range(3): try: return urllib.urlopen(host, *args, **kwargs) except IOError, e: last_exc = e continue except: raise raise last_exc def testBasic(self): socket.RAND_status() try: socket.RAND_egd(1) except TypeError: pass else: print "didn't raise TypeError" socket.RAND_add("this is a random string", 75.0) with test_support.transient_internet(): f = self.urlopen('https://sf.net') buf = f.read() f.close() def testTimeout(self): def error_msg(extra_msg): print >> sys.stderr, """\ WARNING: an attempt to connect to %r %s, in test_timeout. That may be legitimate, but is not the outcome we hoped for. If this message is seen often, testTimeout should be changed to use a more reliable address.""" % (ADDR, extra_msg) # A service which issues a welcome banner (without need to write # anything). ADDR = "pop.gmail.com", 995 s = socket.socket() s.settimeout(30.0) try: s.connect(ADDR) except socket.timeout: error_msg('timed out') return except socket.error, exc: # In case connection is refused. if exc.args[0] == errno.ECONNREFUSED: error_msg('was refused') return else: raise ss = socket.ssl(s) # Read part of return welcome banner twice. ss.read(1) ss.read(1) s.close() class BasicTests(unittest.TestCase): def testRudeShutdown(self): # Some random port to connect to. PORT = [9934] listener_ready = threading.Event() listener_gone = threading.Event() # `listener` runs in a thread. It opens a socket listening on # PORT, and sits in an accept() until the main thread connects. # Then it rudely closes the socket, and sets Event `listener_gone` # to let the main thread know the socket is gone. def listener(): s = socket.socket() PORT[0] = test_support.bind_port(s, '', PORT[0]) s.listen(5) listener_ready.set() s.accept() s = None # reclaim the socket object, which also closes it listener_gone.set() def connector(): listener_ready.wait() s = socket.socket() s.connect(('localhost', PORT[0])) listener_gone.wait() try: ssl_sock = socket.ssl(s) except socket.sslerror: pass else: raise test_support.TestFailed( 'connecting to closed SSL socket should have failed') t = threading.Thread(target=listener) t.start() connector() t.join() def connect(self, s, host_port): # Connecting to remote hosts is flaky. Make it more robust # by retrying the connection several times. for i in range(3): try: return s.connect(host_port) except IOError, e: last_exc = e continue except: raise raise last_exc def test_978833(self): if not test_support.is_resource_enabled("network"): return if test_support.verbose: print "test_978833 ..." import os, httplib, ssl with test_support.transient_internet(): s = socket.socket(socket.AF_INET) self.connect(s, ("svn.python.org", 443)) fd = s._sock.fileno() sock = ssl.wrap_socket(s) s = None sock.close() try: os.fstat(fd) except OSError: pass else: raise test_support.TestFailed("Failed to close socket") class OpenSSLTests(unittest.TestCase): def testBasic(self): s = socket.socket() s.connect(("localhost", 4433)) ss = socket.ssl(s) ss.write("Foo\n") i = ss.read(4) self.assertEqual(i, "Foo\n") s.close() def testMethods(self): # read & write is already tried in the Basic test # now we'll try to get the server info about certificates # this came from the certificate I used, one I found in /usr/share/openssl info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi" s = socket.socket() s.connect(("localhost", 4433)) ss = socket.ssl(s) cert = ss.server() self.assertEqual(cert, info) cert = ss.issuer() self.assertEqual(cert, info) s.close() class OpenSSLServer(threading.Thread): def __init__(self): self.s = None self.keepServing = True self._external() if self.haveServer: threading.Thread.__init__(self) def _external(self): # let's find the .pem files curdir = os.path.dirname(__file__) or os.curdir cert_file = os.path.join(curdir, "ssl_cert.pem") if not os.access(cert_file, os.F_OK): raise ValueError("No cert file found! (tried %r)" % cert_file) key_file = os.path.join(curdir, "ssl_key.pem") if not os.access(key_file, os.F_OK): raise ValueError("No key file found! (tried %r)" % key_file) try: cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file) self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) time.sleep(1) except: self.haveServer = False else: # let's try if it is actually up try: s = socket.socket() s.connect(("localhost", 4433)) s.close() if self.s.stdout.readline() != "ERROR\n": raise ValueError except: self.haveServer = False else: self.haveServer = True def run(self): while self.keepServing: time.sleep(.5) l = self.s.stdout.readline() self.s.stdin.write(l) def shutdown(self): self.keepServing = False if not self.s: return if sys.platform == "win32": subprocess.TerminateProcess(int(self.s._handle), -1) else: os.kill(self.s.pid, 15) def test_main(): if not hasattr(socket, "ssl"): raise test_support.TestSkipped("socket module has no ssl support") tests = [BasicTests] if test_support.is_resource_enabled('network'): tests.append(ConnectedTests) # in these platforms we can kill the openssl process if sys.platform in ("sunos5", "darwin", "linux1", "linux2", "win32", "hp-ux11"): server = OpenSSLServer() if server.haveServer: tests.append(OpenSSLTests) server.start() else: server = None thread_info = test_support.threading_setup() try: test_support.run_unittest(*tests) finally: if server is not None and server.haveServer: server.shutdown() test_support.threading_cleanup(*thread_info) if __name__ == "__main__": test_main()