From e8dc258db5898f5bbeb60c6780d1a3cb41585afe Mon Sep 17 00:00:00 2001 From: "R. David Murray" Date: Thu, 10 Dec 2009 02:08:06 +0000 Subject: [PATCH] Merged revisions 76726-76727 via svnmerge from svn+ssh://pythondev@svn.python.org/python/trunk The merge adds a test with an invalid rather than a missing line end, since the py3K code passed the original issue 5949 test. New test also by Scott Dial. ........ r76726 | r.david.murray | 2009-12-09 10:15:31 -0500 (Wed, 09 Dec 2009) | 6 lines Issue 5949: fixed IMAP4_SSL hang when the IMAP server response is missing proper end-of-line termination. Patch and tests by Scott Dial. The new tests include a test harness which will make it easier to add additional tests. ........ r76727 | r.david.murray | 2009-12-09 11:41:39 -0500 (Wed, 09 Dec 2009) | 2 lines Skip new imaplib SSL tests if ssl is not available. ........ --- Lib/imaplib.py | 2 + Lib/test/test_imaplib.py | 197 ++++++++++++++++++++++++++++++++++++++- Misc/NEWS | 3 + 3 files changed, 199 insertions(+), 3 deletions(-) diff --git a/Lib/imaplib.py b/Lib/imaplib.py index df35cb14b02..f803a5cbfe0 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -1023,6 +1023,8 @@ class IMAP4: raise self.abort('socket error: EOF') # Protocol mandates all lines terminated by CRLF + if not line.endswith(b'\r\n'): + raise self.abort('socket error: unterminated line') line = line[:-2] if __debug__: diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 2105fc2dcfa..fb5cf826277 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -1,11 +1,31 @@ +from test import support +# If we end up with a significant number of tests that don't require +# threading, this test module should be split. Right now we skip +# them all if we don't have threading. +threading = support.import_module('threading') + +from contextlib import contextmanager import imaplib +import os.path +import select +import socket +import socketserver +import sys import time -from test import support +from test.support import reap_threads, verbose import unittest +try: + import ssl +except ImportError: + ssl = None + +CERTFILE = None + class TestImaplib(unittest.TestCase): + def test_that_Time2Internaldate_returns_a_result(self): # We can check only that it successfully produces a result, # not the correctness of the result itself, since the result @@ -17,9 +37,180 @@ class TestImaplib(unittest.TestCase): imaplib.Time2Internaldate(t) +if ssl: + + class SecureTCPServer(socketserver.TCPServer): + + def get_request(self): + newsocket, fromaddr = self.socket.accept() + connstream = ssl.wrap_socket(newsocket, + server_side=True, + certfile=CERTFILE) + return connstream, fromaddr + + IMAP4_SSL = imaplib.IMAP4_SSL + +else: + + class SecureTCPServer: + pass + + IMAP4_SSL = None + + +class SimpleIMAPHandler(socketserver.StreamRequestHandler): + + timeout = 1 + + def _send(self, message): + if verbose: print("SENT:", message.strip()) + self.wfile.write(message) + + def handle(self): + # Send a welcome message. + self._send(b'* OK IMAP4rev1\r\n') + while 1: + # Gather up input until we receive a line terminator or we timeout. + # Accumulate read(1) because it's simpler to handle the differences + # between naked sockets and SSL sockets. + line = b'' + while 1: + try: + part = self.rfile.read(1) + if part == b'': + # Naked sockets return empty strings.. + return + line += part + except IOError: + # ..but SSLSockets throw exceptions. + return + if line.endswith(b'\r\n'): + break + + if verbose: print('GOT:', line.strip()) + splitline = line.split() + tag = splitline[0].decode('ASCII') + cmd = splitline[1].decode('ASCII') + args = splitline[2:] + + if hasattr(self, 'cmd_'+cmd): + getattr(self, 'cmd_'+cmd)(tag, args) + else: + self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII')) + + def cmd_CAPABILITY(self, tag, args): + self._send(b'* CAPABILITY IMAP4rev1\r\n') + self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) + + +class BaseThreadedNetworkedTests(unittest.TestCase): + + def make_server(self, addr, hdlr): + + class MyServer(self.server_class): + def handle_error(self, request, client_address): + self.close_request(request) + self.server_close() + raise + + if verbose: print("creating server") + server = MyServer(addr, hdlr) + self.assertEquals(server.server_address, server.socket.getsockname()) + + if verbose: + print("server created") + print("ADDR =", addr) + print("CLASS =", self.server_class) + print("HDLR =", server.RequestHandlerClass) + + t = threading.Thread( + name='%s serving' % self.server_class, + target=server.serve_forever, + # Short poll interval to make the test finish quickly. + # Time between requests is short enough that we won't wake + # up spuriously too many times. + kwargs={'poll_interval':0.01}) + t.daemon = True # In case this function raises. + t.start() + if verbose: print("server running") + return server, t + + def reap_server(self, server, thread): + if verbose: print("waiting for server") + server.shutdown() + thread.join() + if verbose: print("done") + + @contextmanager + def reaped_server(self, hdlr): + server, thread = self.make_server((support.HOST, 0), hdlr) + try: + yield server + finally: + self.reap_server(server, thread) + + @reap_threads + def test_connect(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + client.shutdown() + + @reap_threads + def test_issue5949(self): + + class EOFHandler(socketserver.StreamRequestHandler): + def handle(self): + # EOF without sending a complete welcome message. + self.wfile.write(b'* OK') + + with self.reaped_server(EOFHandler) as server: + self.assertRaises(imaplib.IMAP4.abort, + self.imap_class, *server.server_address) + + @reap_threads + def test_line_termination(self): + + class BadNewlineHandler(SimpleIMAPHandler): + + def cmd_CAPABILITY(self, tag, args): + self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') + self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) + + with self.reaped_server(BadNewlineHandler) as server: + self.assertRaises(imaplib.IMAP4.abort, + self.imap_class, *server.server_address) + + + +class ThreadedNetworkedTests(BaseThreadedNetworkedTests): + + server_class = socketserver.TCPServer + imap_class = imaplib.IMAP4 + + +@unittest.skipUnless(ssl, "SSL not available") +class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): + + server_class = SecureTCPServer + imap_class = IMAP4_SSL + + def test_main(): - support.run_unittest(TestImaplib) + + tests = [TestImaplib] + + if support.is_resource_enabled('network'): + if ssl: + global CERTFILE + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, + "keycert.pem") + if not os.path.exists(CERTFILE): + raise support.TestFailed("Can't read certificate files!") + tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL]) + + support.run_unittest(*tests) if __name__ == "__main__": - unittest.main() + support.use_resources = ['network'] + test_main() diff --git a/Misc/NEWS b/Misc/NEWS index ddf9a04407c..b99dbeff8e8 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -154,6 +154,9 @@ C-API Library ------- +- Issue #5949: added check for correct lineends in input from IMAP server + in imaplib. + - Add a reverse() method to collections.deque(). - Issue #6986: Fix crash in the JSON C accelerator when called with the