diff --git a/Lib/smtplib.py b/Lib/smtplib.py index 7e348757f3e..47569738f0d 100755 --- a/Lib/smtplib.py +++ b/Lib/smtplib.py @@ -630,12 +630,12 @@ class SMTP: (code, resp) = self.docmd("AUTH", mechanism + " " + response) else: (code, resp) = self.docmd("AUTH", mechanism) - # Server replies with 334 (challenge) or 535 (not supported) - if code == 334: - challenge = base64.decodebytes(resp) - response = encode_base64( - authobject(challenge).encode('ascii'), eol='') - (code, resp) = self.docmd(response) + # If server responds with a challenge, send the response. + if code == 334: + challenge = base64.decodebytes(resp) + response = encode_base64( + authobject(challenge).encode('ascii'), eol='') + (code, resp) = self.docmd(response) if code in (235, 503): return (code, resp) raise SMTPAuthenticationError(code, resp) @@ -657,11 +657,10 @@ class SMTP: def auth_login(self, challenge=None): """ Authobject to use with LOGIN authentication. Requires self.user and self.password to be set.""" - (code, resp) = self.docmd( - encode_base64(self.user.encode('ascii'), eol='')) - if code == 334: + if challenge is None: + return self.user + else: return self.password - raise SMTPAuthenticationError(code, resp) def login(self, user, password, *, initial_response_ok=True): """Log in on an SMTP server that requires authentication. diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 8e362414288..28539f360f5 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -1,8 +1,10 @@ import asyncore +import base64 import email.mime.text from email.message import EmailMessage from email.base64mime import body_encode as encode_base64 import email.utils +import hmac import socket import smtpd import smtplib @@ -623,20 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A', sim_auth = ('Mr.A@somewhere.com', 'somepassword') sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') -sim_auth_credentials = { - 'login': 'TXIuQUBzb21ld2hlcmUuY29t', - 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=', - 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ' - 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'), - } -sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T' -sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ=' - sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 'list-2':['Ms.B@xn--fo-fka.com',], } # Simulated SMTP channel & server +class ResponseException(Exception): pass class SimSMTPChannel(smtpd.SMTPChannel): quit_response = None @@ -646,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel): rcpt_count = 0 rset_count = 0 disconnect = 0 + AUTH = 99 # Add protocol state to enable auth testing. + authenticated_user = None def __init__(self, extra_features, *args, **kw): self._extrafeatures = ''.join( [ "250-{0}\r\n".format(x) for x in extra_features ]) super(SimSMTPChannel, self).__init__(*args, **kw) + # AUTH related stuff. It would be nice if support for this were in smtpd. + def found_terminator(self): + if self.smtp_state == self.AUTH: + line = self._emptystring.join(self.received_lines) + print('Data:', repr(line), file=smtpd.DEBUGSTREAM) + self.received_lines = [] + try: + self.auth_object(line) + except ResponseException as e: + self.smtp_state = self.COMMAND + self.push('%s %s' % (e.smtp_code, e.smtp_error)) + return + super().found_terminator() + + + def smtp_AUTH(self, arg): + if not self.seen_greeting: + self.push('503 Error: send EHLO first') + return + if not self.extended_smtp or 'AUTH' not in self._extrafeatures: + self.push('500 Error: command "AUTH" not recognized') + return + if self.authenticated_user is not None: + self.push( + '503 Bad sequence of commands: already authenticated') + return + args = arg.split() + if len(args) not in [1, 2]: + self.push('501 Syntax: AUTH [initial-response]') + return + auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_') + try: + self.auth_object = getattr(self, auth_object_name) + except AttributeError: + self.push('504 Command parameter not implemented: unsupported ' + ' authentication mechanism {!r}'.format(auth_object_name)) + return + self.smtp_state = self.AUTH + self.auth_object(args[1] if len(args) == 2 else None) + + def _authenticated(self, user, valid): + if valid: + self.authenticated_user = user + self.push('235 Authentication Succeeded') + else: + self.push('535 Authentication credentials invalid') + self.smtp_state = self.COMMAND + + def _decode_base64(self, string): + return base64.decodebytes(string.encode('ascii')).decode('utf-8') + + def _auth_plain(self, arg=None): + if arg is None: + self.push('334 ') + else: + logpass = self._decode_base64(arg) + try: + *_, user, password = logpass.split('\0') + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + ' failed: {}'.format(logpass, e)) + return + self._authenticated(user, password == sim_auth[1]) + + def _auth_login(self, arg=None): + if arg is None: + # base64 encoded 'Username:' + self.push('334 VXNlcm5hbWU6') + elif not hasattr(self, '_auth_login_user'): + self._auth_login_user = self._decode_base64(arg) + # base64 encoded 'Password:' + self.push('334 UGFzc3dvcmQ6') + else: + password = self._decode_base64(arg) + self._authenticated(self._auth_login_user, password == sim_auth[1]) + del self._auth_login_user + + def _auth_cram_md5(self, arg=None): + if arg is None: + self.push('334 {}'.format(sim_cram_md5_challenge)) + else: + logpass = self._decode_base64(arg) + try: + user, hashed_pass = logpass.split() + except ValueError as e: + self.push('535 Splitting response {!r} into user and password' + 'failed: {}'.format(logpass, e)) + return False + valid_hashed_pass = hmac.HMAC( + sim_auth[1].encode('ascii'), + self._decode_base64(sim_cram_md5_challenge).encode('ascii'), + 'md5').hexdigest() + self._authenticated(user, hashed_pass == valid_hashed_pass) + # end AUTH related stuff. + def smtp_EHLO(self, arg): resp = ('250-testhost\r\n' '250-EXPN\r\n' @@ -683,20 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel): else: self.push('550 No access for you!') - def smtp_AUTH(self, arg): - mech = arg.strip().lower() - if mech=='cram-md5': - self.push('334 {}'.format(sim_cram_md5_challenge)) - elif mech not in sim_auth_credentials: - self.push('504 auth type unimplemented') - return - elif mech=='plain': - self.push('334 ') - elif mech=='login': - self.push('334 ') - else: - self.push('550 No access for you!') - def smtp_QUIT(self, arg): if self.quit_response is None: super(SimSMTPChannel, self).smtp_QUIT(arg) @@ -841,63 +918,49 @@ class SMTPSimTests(unittest.TestCase): self.assertEqual(smtp.expn(u), expected_unknown) smtp.quit() - # SimSMTPChannel doesn't fully support AUTH because it requires a - # synchronous read to obtain the credentials...so instead smtpd - # sees the credential sent by smtplib's login method as an unknown command, - # which results in smtplib raising an auth error. Fortunately the error - # message contains the encoded credential, so we can partially check that it - # was generated correctly (partially, because the 'word' is uppercased in - # the error message). - def testAUTH_PLAIN(self): self.serv.add_feature("AUTH PLAIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_plain, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_LOGIN(self): self.serv.add_feature("AUTH LOGIN") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_user, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_CRAM_MD5(self): self.serv.add_feature("AUTH CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_credentials['cram-md5'], str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def testAUTH_multiple(self): # Test that multiple authentication methods are tried. self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5") smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15) - try: smtp.login(sim_auth[0], sim_auth[1]) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_login_user, str(err)) + resp = smtp.login(sim_auth[0], sim_auth[1]) + self.assertEqual(resp, (235, b'Authentication Succeeded')) smtp.close() def test_auth_function(self): - smtp = smtplib.SMTP(HOST, self.port, - local_hostname='localhost', timeout=15) - self.serv.add_feature("AUTH CRAM-MD5") - smtp.user, smtp.password = sim_auth[0], sim_auth[1] - supported = {'CRAM-MD5': smtp.auth_cram_md5, - 'PLAIN': smtp.auth_plain, - 'LOGIN': smtp.auth_login, - } - for mechanism, method in supported.items(): - try: smtp.auth(mechanism, method, initial_response_ok=False) - except smtplib.SMTPAuthenticationError as err: - self.assertIn(sim_auth_credentials[mechanism.lower()].upper(), - str(err)) - smtp.close() + supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'} + for mechanism in supported: + self.serv.add_feature("AUTH {}".format(mechanism)) + for mechanism in supported: + with self.subTest(mechanism=mechanism): + smtp = smtplib.SMTP(HOST, self.port, + local_hostname='localhost', timeout=15) + smtp.ehlo('foo') + smtp.user, smtp.password = sim_auth[0], sim_auth[1] + method = 'auth_' + mechanism.lower().replace('-', '_') + resp = smtp.auth(mechanism, getattr(smtp, method)) + self.assertEqual(resp, (235, b'Authentication Succeeded')) + smtp.close() def test_quit_resets_greeting(self): smtp = smtplib.SMTP(HOST, self.port, diff --git a/Misc/NEWS b/Misc/NEWS index 1b308a12541..249278a5d33 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -79,6 +79,8 @@ Core and Builtins Library ------- +- Issue #25446: Fix regression in smtplib's AUTH LOGIN support. + - Issue #18010: Fix the pydoc web server's module search function to handle exceptions from importing packages.