From 8f19e8e1c52843176c5ee9c4b6ea117be961ad59 Mon Sep 17 00:00:00 2001 From: Martin Panter Date: Tue, 19 Jan 2016 01:10:58 +0000 Subject: [PATCH] Issue #25859: Reimplement NNTP test_starttls() using local server The previous test relied on a remote server, which currently seems to be shutting the connection down once TLS has been set up, causing an EOFError. Now the test is implemented using a minimal NNTP server running in a background thread. --- Lib/test/test_nntplib.py | 82 +++++++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py index ae3618ffbca..3c69c3e51eb 100644 --- a/Lib/test/test_nntplib.py +++ b/Lib/test/test_nntplib.py @@ -5,6 +5,7 @@ import textwrap import unittest import functools import contextlib +import os.path from test import support from nntplib import NNTP, GroupInfo import nntplib @@ -13,8 +14,13 @@ try: import ssl except ImportError: ssl = None +try: + import threading +except ImportError: + threading = None TIMEOUT = 30 +certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') # TODO: # - test the `file` arg to more commands @@ -202,24 +208,6 @@ class NetworkedNNTPTestsMixin: resp, caps = self.server.capabilities() _check_caps(caps) - @unittest.skipUnless(ssl, 'requires SSL support') - def test_starttls(self): - file = self.server.file - sock = self.server.sock - try: - self.server.starttls() - except nntplib.NNTPPermanentError: - self.skipTest("STARTTLS not supported by server.") - else: - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.server.file) - self.assertNotEqual(sock, self.server.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.server.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.server.starttls) - def test_zlogin(self): # This test must be the penultimate because further commands will be # refused. @@ -1520,6 +1508,64 @@ class MockSslTests(MockSocketTests): def nntp_class(*pos, **kw): return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw) +@unittest.skipUnless(threading, 'requires multithreading') +class LocalServerTests(unittest.TestCase): + def setUp(self): + sock = socket.socket() + port = support.bind_port(sock) + sock.listen() + self.background = threading.Thread( + target=self.run_server, args=(sock,)) + self.background.start() + self.addCleanup(self.background.join) + + self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() + self.addCleanup(self.nntp.__exit__, None, None, None) + + def run_server(self, sock): + # Could be generalized to handle more commands in separate methods + with sock: + [client, _] = sock.accept() + with contextlib.ExitStack() as cleanup: + cleanup.enter_context(client) + reader = cleanup.enter_context(client.makefile('rb')) + client.sendall(b'200 Server ready\r\n') + while True: + cmd = reader.readline() + if cmd == b'CAPABILITIES\r\n': + client.sendall( + b'101 Capability list:\r\n' + b'VERSION 2\r\n' + b'STARTTLS\r\n' + b'.\r\n' + ) + elif cmd == b'STARTTLS\r\n': + reader.close() + client.sendall(b'382 Begin TLS negotiation now\r\n') + client = ssl.wrap_socket( + client, server_side=True, certfile=certfile) + cleanup.enter_context(client) + reader = cleanup.enter_context(client.makefile('rb')) + elif cmd == b'QUIT\r\n': + client.sendall(b'205 Bye!\r\n') + break + else: + raise ValueError('Unexpected command {!r}'.format(cmd)) + + @unittest.skipUnless(ssl, 'requires SSL support') + def test_starttls(self): + file = self.nntp.file + sock = self.nntp.sock + self.nntp.starttls() + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.nntp.file) + self.assertNotEqual(sock, self.nntp.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.nntp.starttls) + if __name__ == "__main__": unittest.main()