Merge: #25446: Fix regression in smtplib's AUTH LOGIN support.
This commit is contained in:
commit
7861667c8e
|
@ -630,7 +630,7 @@ class SMTP:
|
|||
(code, resp) = self.docmd("AUTH", mechanism + " " + response)
|
||||
else:
|
||||
(code, resp) = self.docmd("AUTH", mechanism)
|
||||
# Server replies with 334 (challenge) or 535 (not supported)
|
||||
# If server responds with a challenge, send the response.
|
||||
if code == 334:
|
||||
challenge = base64.decodebytes(resp)
|
||||
response = encode_base64(
|
||||
|
@ -657,11 +657,10 @@ class SMTP:
|
|||
def auth_login(self, challenge=None):
|
||||
""" Authobject to use with LOGIN authentication. Requires self.user and
|
||||
self.password to be set."""
|
||||
(code, resp) = self.docmd(
|
||||
encode_base64(self.user.encode('ascii'), eol=''))
|
||||
if code == 334:
|
||||
if challenge is None:
|
||||
return self.user
|
||||
else:
|
||||
return self.password
|
||||
raise SMTPAuthenticationError(code, resp)
|
||||
|
||||
def login(self, user, password, *, initial_response_ok=True):
|
||||
"""Log in on an SMTP server that requires authentication.
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import asyncore
|
||||
import base64
|
||||
import email.mime.text
|
||||
from email.message import EmailMessage
|
||||
from email.base64mime import body_encode as encode_base64
|
||||
import email.utils
|
||||
import hmac
|
||||
import socket
|
||||
import smtpd
|
||||
import smtplib
|
||||
|
@ -623,20 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A',
|
|||
sim_auth = ('Mr.A@somewhere.com', 'somepassword')
|
||||
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
|
||||
'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
|
||||
sim_auth_credentials = {
|
||||
'login': 'TXIuQUBzb21ld2hlcmUuY29t',
|
||||
'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
|
||||
'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
|
||||
'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
|
||||
}
|
||||
sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
|
||||
sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
|
||||
|
||||
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
|
||||
'list-2':['Ms.B@xn--fo-fka.com',],
|
||||
}
|
||||
|
||||
# Simulated SMTP channel & server
|
||||
class ResponseException(Exception): pass
|
||||
class SimSMTPChannel(smtpd.SMTPChannel):
|
||||
|
||||
quit_response = None
|
||||
|
@ -646,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel):
|
|||
rcpt_count = 0
|
||||
rset_count = 0
|
||||
disconnect = 0
|
||||
AUTH = 99 # Add protocol state to enable auth testing.
|
||||
authenticated_user = None
|
||||
|
||||
def __init__(self, extra_features, *args, **kw):
|
||||
self._extrafeatures = ''.join(
|
||||
[ "250-{0}\r\n".format(x) for x in extra_features ])
|
||||
super(SimSMTPChannel, self).__init__(*args, **kw)
|
||||
|
||||
# AUTH related stuff. It would be nice if support for this were in smtpd.
|
||||
def found_terminator(self):
|
||||
if self.smtp_state == self.AUTH:
|
||||
line = self._emptystring.join(self.received_lines)
|
||||
print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
|
||||
self.received_lines = []
|
||||
try:
|
||||
self.auth_object(line)
|
||||
except ResponseException as e:
|
||||
self.smtp_state = self.COMMAND
|
||||
self.push('%s %s' % (e.smtp_code, e.smtp_error))
|
||||
return
|
||||
super().found_terminator()
|
||||
|
||||
|
||||
def smtp_AUTH(self, arg):
|
||||
if not self.seen_greeting:
|
||||
self.push('503 Error: send EHLO first')
|
||||
return
|
||||
if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
|
||||
self.push('500 Error: command "AUTH" not recognized')
|
||||
return
|
||||
if self.authenticated_user is not None:
|
||||
self.push(
|
||||
'503 Bad sequence of commands: already authenticated')
|
||||
return
|
||||
args = arg.split()
|
||||
if len(args) not in [1, 2]:
|
||||
self.push('501 Syntax: AUTH <mechanism> [initial-response]')
|
||||
return
|
||||
auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
|
||||
try:
|
||||
self.auth_object = getattr(self, auth_object_name)
|
||||
except AttributeError:
|
||||
self.push('504 Command parameter not implemented: unsupported '
|
||||
' authentication mechanism {!r}'.format(auth_object_name))
|
||||
return
|
||||
self.smtp_state = self.AUTH
|
||||
self.auth_object(args[1] if len(args) == 2 else None)
|
||||
|
||||
def _authenticated(self, user, valid):
|
||||
if valid:
|
||||
self.authenticated_user = user
|
||||
self.push('235 Authentication Succeeded')
|
||||
else:
|
||||
self.push('535 Authentication credentials invalid')
|
||||
self.smtp_state = self.COMMAND
|
||||
|
||||
def _decode_base64(self, string):
|
||||
return base64.decodebytes(string.encode('ascii')).decode('utf-8')
|
||||
|
||||
def _auth_plain(self, arg=None):
|
||||
if arg is None:
|
||||
self.push('334 ')
|
||||
else:
|
||||
logpass = self._decode_base64(arg)
|
||||
try:
|
||||
*_, user, password = logpass.split('\0')
|
||||
except ValueError as e:
|
||||
self.push('535 Splitting response {!r} into user and password'
|
||||
' failed: {}'.format(logpass, e))
|
||||
return
|
||||
self._authenticated(user, password == sim_auth[1])
|
||||
|
||||
def _auth_login(self, arg=None):
|
||||
if arg is None:
|
||||
# base64 encoded 'Username:'
|
||||
self.push('334 VXNlcm5hbWU6')
|
||||
elif not hasattr(self, '_auth_login_user'):
|
||||
self._auth_login_user = self._decode_base64(arg)
|
||||
# base64 encoded 'Password:'
|
||||
self.push('334 UGFzc3dvcmQ6')
|
||||
else:
|
||||
password = self._decode_base64(arg)
|
||||
self._authenticated(self._auth_login_user, password == sim_auth[1])
|
||||
del self._auth_login_user
|
||||
|
||||
def _auth_cram_md5(self, arg=None):
|
||||
if arg is None:
|
||||
self.push('334 {}'.format(sim_cram_md5_challenge))
|
||||
else:
|
||||
logpass = self._decode_base64(arg)
|
||||
try:
|
||||
user, hashed_pass = logpass.split()
|
||||
except ValueError as e:
|
||||
self.push('535 Splitting response {!r} into user and password'
|
||||
'failed: {}'.format(logpass, e))
|
||||
return False
|
||||
valid_hashed_pass = hmac.HMAC(
|
||||
sim_auth[1].encode('ascii'),
|
||||
self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
|
||||
'md5').hexdigest()
|
||||
self._authenticated(user, hashed_pass == valid_hashed_pass)
|
||||
# end AUTH related stuff.
|
||||
|
||||
def smtp_EHLO(self, arg):
|
||||
resp = ('250-testhost\r\n'
|
||||
'250-EXPN\r\n'
|
||||
|
@ -683,20 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel):
|
|||
else:
|
||||
self.push('550 No access for you!')
|
||||
|
||||
def smtp_AUTH(self, arg):
|
||||
mech = arg.strip().lower()
|
||||
if mech=='cram-md5':
|
||||
self.push('334 {}'.format(sim_cram_md5_challenge))
|
||||
elif mech not in sim_auth_credentials:
|
||||
self.push('504 auth type unimplemented')
|
||||
return
|
||||
elif mech=='plain':
|
||||
self.push('334 ')
|
||||
elif mech=='login':
|
||||
self.push('334 ')
|
||||
else:
|
||||
self.push('550 No access for you!')
|
||||
|
||||
def smtp_QUIT(self, arg):
|
||||
if self.quit_response is None:
|
||||
super(SimSMTPChannel, self).smtp_QUIT(arg)
|
||||
|
@ -841,62 +918,48 @@ class SMTPSimTests(unittest.TestCase):
|
|||
self.assertEqual(smtp.expn(u), expected_unknown)
|
||||
smtp.quit()
|
||||
|
||||
# SimSMTPChannel doesn't fully support AUTH because it requires a
|
||||
# synchronous read to obtain the credentials...so instead smtpd
|
||||
# sees the credential sent by smtplib's login method as an unknown command,
|
||||
# which results in smtplib raising an auth error. Fortunately the error
|
||||
# message contains the encoded credential, so we can partially check that it
|
||||
# was generated correctly (partially, because the 'word' is uppercased in
|
||||
# the error message).
|
||||
|
||||
def testAUTH_PLAIN(self):
|
||||
self.serv.add_feature("AUTH PLAIN")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_plain, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_LOGIN(self):
|
||||
self.serv.add_feature("AUTH LOGIN")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_login_user, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_CRAM_MD5(self):
|
||||
self.serv.add_feature("AUTH CRAM-MD5")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_credentials['cram-md5'], str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_multiple(self):
|
||||
# Test that multiple authentication methods are tried.
|
||||
self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_login_user, str(err))
|
||||
resp = smtp.login(sim_auth[0], sim_auth[1])
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def test_auth_function(self):
|
||||
supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
|
||||
for mechanism in supported:
|
||||
self.serv.add_feature("AUTH {}".format(mechanism))
|
||||
for mechanism in supported:
|
||||
with self.subTest(mechanism=mechanism):
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
local_hostname='localhost', timeout=15)
|
||||
self.serv.add_feature("AUTH CRAM-MD5")
|
||||
smtp.ehlo('foo')
|
||||
smtp.user, smtp.password = sim_auth[0], sim_auth[1]
|
||||
supported = {'CRAM-MD5': smtp.auth_cram_md5,
|
||||
'PLAIN': smtp.auth_plain,
|
||||
'LOGIN': smtp.auth_login,
|
||||
}
|
||||
for mechanism, method in supported.items():
|
||||
try: smtp.auth(mechanism, method, initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
|
||||
str(err))
|
||||
method = 'auth_' + mechanism.lower().replace('-', '_')
|
||||
resp = smtp.auth(mechanism, getattr(smtp, method))
|
||||
self.assertEqual(resp, (235, b'Authentication Succeeded'))
|
||||
smtp.close()
|
||||
|
||||
def test_quit_resets_greeting(self):
|
||||
|
|
Loading…
Reference in New Issue