diff --git a/Lib/imaplib.py b/Lib/imaplib.py index 4e8a4bb6fa4..2f7e9331fbc 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -411,7 +411,7 @@ class IMAP4: self.literal = _Authenticator(authobject).process typ, dat = self._simple_command('AUTHENTICATE', mech) if typ != 'OK': - raise self.error(dat[-1]) + raise self.error(dat[-1].decode('utf-8', 'replace')) self.state = 'AUTH' return typ, dat diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 07157f52d6a..6e4a90fc255 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -10,10 +10,12 @@ import os.path import socketserver import time import calendar +import inspect from test.support import (reap_threads, verbose, transient_internet, run_with_tz, run_with_locale) import unittest +from unittest import mock from datetime import datetime, timezone, timedelta try: import ssl @@ -174,6 +176,334 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): self._send_tagged(tag, 'OK', 'LOGIN completed') +class NewIMAPTestsMixin(): + client = None + + def _setup(self, imap_handler, connect=True): + """ + Sets up imap_handler for tests. imap_handler should inherit from either: + - SimpleIMAPHandler - for testing IMAP commands, + - socketserver.StreamRequestHandler - if raw access to stream is needed. + Returns (client, server). + """ + class TestTCPServer(self.server_class): + def handle_error(self, request, client_address): + """ + End request and raise the error if one occurs. + """ + self.close_request(request) + self.server_close() + raise + + self.addCleanup(self._cleanup) + self.server = self.server_class((support.HOST, 0), imap_handler) + self.thread = threading.Thread( + name=self._testMethodName+'-server', + target=self.server.serve_forever, + # Short poll interval to make the test finish quickly. + # Time between requests is short enough that we won't wake + # up spuriously too many times. + kwargs={'poll_interval': 0.01}) + self.thread.daemon = True # In case this function raises. + self.thread.start() + + if connect: + self.client = self.imap_class(*self.server.server_address) + + return self.client, self.server + + def _cleanup(self): + """ + Cleans up the test server. This method should not be called manually, + it is added to the cleanup queue in the _setup method already. + """ + # if logout was called already we'd raise an exception trying to + # shutdown the client once again + if self.client is not None and self.client.state != 'LOGOUT': + self.client.shutdown() + # cleanup the server + self.server.shutdown() + self.server.server_close() + self.thread.join(3.0) + + def test_EOF_without_complete_welcome_message(self): + # http://bugs.python.org/issue5949 + class EOFHandler(socketserver.StreamRequestHandler): + def handle(self): + self.wfile.write(b'* OK') + _, server = self._setup(EOFHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self.imap_class, + *server.server_address) + + def test_line_termination(self): + class BadNewlineHandler(SimpleIMAPHandler): + def cmd_CAPABILITY(self, tag, args): + self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') + self._send_tagged(tag, 'OK', 'CAPABILITY completed') + _, server = self._setup(BadNewlineHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self.imap_class, + *server.server_address) + + def test_enable_raises_error_if_not_AUTH(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + client, _ = self._setup(EnableHandler) + self.assertFalse(client.utf8_enabled) + with self.assertRaisesRegex(imaplib.IMAP4.error, 'ENABLE.*NONAUTH'): + client.enable('foo') + self.assertFalse(client.utf8_enabled) + + def test_enable_raises_error_if_no_capability(self): + client, _ = self._setup(SimpleIMAPHandler) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'does not support ENABLE'): + client.enable('foo') + + def test_enable_UTF8_raises_error_if_not_supported(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'does not support ENABLE'): + client.enable('UTF8=ACCEPT') + + def test_enable_UTF8_True_append(self): + class UTF8AppendServer(SimpleIMAPHandler): + capabilities = 'ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + client, server = self._setup(UTF8AppendServer) + self.assertEqual(client._encoding, 'ascii') + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + code, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(code, 'OK') + self.assertEqual(client._encoding, 'utf-8') + msg_string = 'Subject: üñí©öðé' + typ, data = client.append(None, None, None, msg_string.encode('utf-8')) + self.assertEqual(typ, 'OK') + self.assertEqual(server.response, + ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) + + def test_search_disallows_charset_in_utf8_mode(self): + class UTF8Server(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, _ = self._setup(UTF8Server) + typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(typ, 'OK') + typ, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(typ, 'OK') + self.assertTrue(client.utf8_enabled) + with self.assertRaisesRegex(imaplib.IMAP4.error, 'charset.*UTF8'): + client.search('foo', 'bar') + + def test_bad_auth_name(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_tagged(tag, 'NO', + 'unrecognized authentication type {}'.format(args[0])) + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'unrecognized authentication type METHOD'): + client.authenticate('METHOD', lambda: 1) + + def test_invalid_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + r'\[AUTHENTICATIONFAILED\] invalid'): + client.authenticate('MYAUTH', lambda x: b'fake') + + def test_valid_authentication_bytes(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, server = self._setup(MyServer) + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + + def test_valid_authentication_plain_text(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, server = self._setup(MyServer) + code, _ = client.authenticate('MYAUTH', lambda x: 'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + + def test_login_cram_md5_bytes(self): + class AuthHandler(SimpleIMAPHandler): + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + client, _ = self._setup(AuthHandler) + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + def test_login_cram_md5_plain_text(self): + class AuthHandler(SimpleIMAPHandler): + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + client, _ = self._setup(AuthHandler) + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + def test_aborted_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + if self.response == b'*\r\n': + self._send_tagged( + tag, + 'NO', + '[AUTHENTICATIONFAILED] aborted') + else: + self._send_tagged(tag, 'OK', 'MYAUTH successful') + client, _ = self._setup(MyServer) + with self.assertRaisesRegex(imaplib.IMAP4.error, + r'\[AUTHENTICATIONFAILED\] aborted'): + client.authenticate('MYAUTH', lambda x: None) + + @mock.patch('imaplib._MAXLINE', 10) + def test_linetoolong(self): + class TooLongHandler(SimpleIMAPHandler): + def handle(self): + # send response line longer than the limit set in the next line + self.wfile.write(b'* OK ' + 11 * b'x' + b'\r\n') + _, server = self._setup(TooLongHandler, connect=False) + with self.assertRaisesRegex(imaplib.IMAP4.error, + 'got more than 10 bytes'): + self.imap_class(*server.server_address) + + def test_simple_with_statement(self): + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address): + pass + + def test_with_statement(self): + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) + + def test_with_statement_logout(self): + # It is legal to log out explicitly inside the with block + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self.imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) + + # command tests + + def test_login(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + + def test_logout(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + typ, data = client.logout() + self.assertEqual(typ, 'BYE') + self.assertEqual(data[0], b'IMAP4ref1 Server logging out') + self.assertEqual(client.state, 'LOGOUT') + + def test_lsub(self): + class LsubCmd(SimpleIMAPHandler): + def cmd_LSUB(self, tag, args): + self._send_textline('* LSUB () "." directoryA') + return self._send_tagged(tag, 'OK', 'LSUB completed') + client, _ = self._setup(LsubCmd) + client.login('user', 'pass') + typ, data = client.lsub() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'() "." directoryA') + + +class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): + imap_class = imaplib.IMAP4 + server_class = socketserver.TCPServer + + +@unittest.skipUnless(ssl, "SSL not available") +class NewIMAPSSLTests(NewIMAPTestsMixin, unittest.TestCase): + imap_class = imaplib.IMAP4_SSL + server_class = SecureTCPServer + + def test_ssl_raises(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + with self.assertRaisesRegex(ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + _, server = self._setup(SimpleIMAPHandler) + client = self.imap_class(*server.server_address, + ssl_context=ssl_context) + client.shutdown() + + def test_ssl_verified(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + _, server = self._setup(SimpleIMAPHandler) + client = self.imap_class("localhost", server.server_address[1], + ssl_context=ssl_context) + client.shutdown() + class ThreadedNetworkedTests(unittest.TestCase): server_class = socketserver.TCPServer imap_class = imaplib.IMAP4