Merge: #13700: Make imap.authenticate with authobject work.
This fixes a bytes/string confusion in the API which prevented custom authobjects from working at all. Original patch by Erno Tukia.
This commit is contained in:
commit
8aa164b395
|
@ -185,9 +185,10 @@ An :class:`IMAP4` instance has the following methods:
|
||||||
|
|
||||||
data = authobject(response)
|
data = authobject(response)
|
||||||
|
|
||||||
It will be called to process server continuation responses. It should return
|
It will be called to process server continuation responses; the *response*
|
||||||
``data`` that will be encoded and sent to server. It should return ``None`` if
|
argument it is passed will be ``bytes``. It should return ``bytes`` *data*
|
||||||
the client abort response ``*`` should be sent instead.
|
that will be base64 encoded and sent to the server. It should return
|
||||||
|
``None`` if the client abort response ``*`` should be sent instead.
|
||||||
|
|
||||||
|
|
||||||
.. method:: IMAP4.check()
|
.. method:: IMAP4.check()
|
||||||
|
|
|
@ -352,10 +352,10 @@ class IMAP4:
|
||||||
|
|
||||||
data = authobject(response)
|
data = authobject(response)
|
||||||
|
|
||||||
It will be called to process server continuation responses.
|
It will be called to process server continuation responses; the
|
||||||
It should return data that will be encoded and sent to server.
|
response argument it is passed will be a bytes. It should return bytes
|
||||||
It should return None if the client abort response '*' should
|
data that will be base64 encoded and sent to the server. It should
|
||||||
be sent instead.
|
return None if the client abort response '*' should be sent instead.
|
||||||
"""
|
"""
|
||||||
mech = mechanism.upper()
|
mech = mechanism.upper()
|
||||||
# XXX: shouldn't this code be removed, not commented out?
|
# XXX: shouldn't this code be removed, not commented out?
|
||||||
|
@ -538,7 +538,9 @@ class IMAP4:
|
||||||
def _CRAM_MD5_AUTH(self, challenge):
|
def _CRAM_MD5_AUTH(self, challenge):
|
||||||
""" Authobject to use with CRAM-MD5 authentication. """
|
""" Authobject to use with CRAM-MD5 authentication. """
|
||||||
import hmac
|
import hmac
|
||||||
return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
|
pwd = (self.password.encode('ASCII') if isinstance(self.password, str)
|
||||||
|
else self.password)
|
||||||
|
return self.user + " " + hmac.HMAC(pwd, challenge).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
def logout(self):
|
def logout(self):
|
||||||
|
@ -1295,14 +1297,16 @@ class _Authenticator:
|
||||||
# so when it gets to the end of the 8-bit input
|
# so when it gets to the end of the 8-bit input
|
||||||
# there's no partial 6-bit output.
|
# there's no partial 6-bit output.
|
||||||
#
|
#
|
||||||
oup = ''
|
oup = b''
|
||||||
|
if isinstance(inp, str):
|
||||||
|
inp = inp.encode('ASCII')
|
||||||
while inp:
|
while inp:
|
||||||
if len(inp) > 48:
|
if len(inp) > 48:
|
||||||
t = inp[:48]
|
t = inp[:48]
|
||||||
inp = inp[48:]
|
inp = inp[48:]
|
||||||
else:
|
else:
|
||||||
t = inp
|
t = inp
|
||||||
inp = ''
|
inp = b''
|
||||||
e = binascii.b2a_base64(t)
|
e = binascii.b2a_base64(t)
|
||||||
if e:
|
if e:
|
||||||
oup = oup + e[:-1]
|
oup = oup + e[:-1]
|
||||||
|
@ -1310,7 +1314,7 @@ class _Authenticator:
|
||||||
|
|
||||||
def decode(self, inp):
|
def decode(self, inp):
|
||||||
if not inp:
|
if not inp:
|
||||||
return ''
|
return b''
|
||||||
return binascii.a2b_base64(inp)
|
return binascii.a2b_base64(inp)
|
||||||
|
|
||||||
Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
|
Months = ' Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split(' ')
|
||||||
|
|
|
@ -94,14 +94,25 @@ else:
|
||||||
class SimpleIMAPHandler(socketserver.StreamRequestHandler):
|
class SimpleIMAPHandler(socketserver.StreamRequestHandler):
|
||||||
|
|
||||||
timeout = 1
|
timeout = 1
|
||||||
|
continuation = None
|
||||||
|
capabilities = ''
|
||||||
|
|
||||||
def _send(self, message):
|
def _send(self, message):
|
||||||
if verbose: print("SENT: %r" % message.strip())
|
if verbose: print("SENT: %r" % message.strip())
|
||||||
self.wfile.write(message)
|
self.wfile.write(message)
|
||||||
|
|
||||||
|
def _send_line(self, message):
|
||||||
|
self._send(message + b'\r\n')
|
||||||
|
|
||||||
|
def _send_textline(self, message):
|
||||||
|
self._send_line(message.encode('ASCII'))
|
||||||
|
|
||||||
|
def _send_tagged(self, tag, code, message):
|
||||||
|
self._send_textline(' '.join((tag, code, message)))
|
||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
# Send a welcome message.
|
# Send a welcome message.
|
||||||
self._send(b'* OK IMAP4rev1\r\n')
|
self._send_textline('* OK IMAP4rev1')
|
||||||
while 1:
|
while 1:
|
||||||
# Gather up input until we receive a line terminator or we timeout.
|
# Gather up input until we receive a line terminator or we timeout.
|
||||||
# Accumulate read(1) because it's simpler to handle the differences
|
# Accumulate read(1) because it's simpler to handle the differences
|
||||||
|
@ -121,19 +132,33 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
|
||||||
break
|
break
|
||||||
|
|
||||||
if verbose: print('GOT: %r' % line.strip())
|
if verbose: print('GOT: %r' % line.strip())
|
||||||
splitline = line.split()
|
if self.continuation:
|
||||||
tag = splitline[0].decode('ASCII')
|
try:
|
||||||
cmd = splitline[1].decode('ASCII')
|
self.continuation.send(line)
|
||||||
|
except StopIteration:
|
||||||
|
self.continuation = None
|
||||||
|
continue
|
||||||
|
splitline = line.decode('ASCII').split()
|
||||||
|
tag = splitline[0]
|
||||||
|
cmd = splitline[1]
|
||||||
args = splitline[2:]
|
args = splitline[2:]
|
||||||
|
|
||||||
if hasattr(self, 'cmd_'+cmd):
|
if hasattr(self, 'cmd_'+cmd):
|
||||||
getattr(self, 'cmd_'+cmd)(tag, args)
|
continuation = getattr(self, 'cmd_'+cmd)(tag, args)
|
||||||
|
if continuation:
|
||||||
|
self.continuation = continuation
|
||||||
|
next(continuation)
|
||||||
else:
|
else:
|
||||||
self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
|
self._send_tagged(tag, 'BAD', cmd + ' unknown')
|
||||||
|
|
||||||
def cmd_CAPABILITY(self, tag, args):
|
def cmd_CAPABILITY(self, tag, args):
|
||||||
self._send(b'* CAPABILITY IMAP4rev1\r\n')
|
caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1'
|
||||||
self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
|
self._send_textline('* CAPABILITY ' + caps)
|
||||||
|
self._send_tagged(tag, 'OK', 'CAPABILITY completed')
|
||||||
|
|
||||||
|
def cmd_LOGOUT(self, tag, args):
|
||||||
|
self._send_textline('* BYE IMAP4ref1 Server logging out')
|
||||||
|
self._send_tagged(tag, 'OK', 'LOGOUT completed')
|
||||||
|
|
||||||
|
|
||||||
class BaseThreadedNetworkedTests(unittest.TestCase):
|
class BaseThreadedNetworkedTests(unittest.TestCase):
|
||||||
|
@ -183,6 +208,16 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
self.reap_server(server, thread)
|
self.reap_server(server, thread)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def reaped_pair(self, hdlr):
|
||||||
|
server, thread = self.make_server((support.HOST, 0), hdlr)
|
||||||
|
client = self.imap_class(*server.server_address)
|
||||||
|
try:
|
||||||
|
yield server, client
|
||||||
|
finally:
|
||||||
|
client.logout()
|
||||||
|
self.reap_server(server, thread)
|
||||||
|
|
||||||
@reap_threads
|
@reap_threads
|
||||||
def test_connect(self):
|
def test_connect(self):
|
||||||
with self.reaped_server(SimpleIMAPHandler) as server:
|
with self.reaped_server(SimpleIMAPHandler) as server:
|
||||||
|
@ -208,12 +243,86 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
|
||||||
|
|
||||||
def cmd_CAPABILITY(self, tag, args):
|
def cmd_CAPABILITY(self, tag, args):
|
||||||
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
|
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
|
||||||
self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
|
self._send_tagged(tag, 'OK', 'CAPABILITY completed')
|
||||||
|
|
||||||
with self.reaped_server(BadNewlineHandler) as server:
|
with self.reaped_server(BadNewlineHandler) as server:
|
||||||
self.assertRaises(imaplib.IMAP4.abort,
|
self.assertRaises(imaplib.IMAP4.abort,
|
||||||
self.imap_class, *server.server_address)
|
self.imap_class, *server.server_address)
|
||||||
|
|
||||||
|
@reap_threads
|
||||||
|
def test_bad_auth_name(self):
|
||||||
|
|
||||||
|
class MyServer(SimpleIMAPHandler):
|
||||||
|
|
||||||
|
def cmd_AUTHENTICATE(self, tag, args):
|
||||||
|
self._send_tagged(tag, 'NO', 'unrecognized authentication '
|
||||||
|
'type {}'.format(args[0]))
|
||||||
|
|
||||||
|
with self.reaped_pair(MyServer) as (server, client):
|
||||||
|
with self.assertRaises(imaplib.IMAP4.error):
|
||||||
|
client.authenticate('METHOD', lambda: 1)
|
||||||
|
|
||||||
|
@reap_threads
|
||||||
|
def test_invalid_authentication(self):
|
||||||
|
|
||||||
|
class MyServer(SimpleIMAPHandler):
|
||||||
|
|
||||||
|
def cmd_AUTHENTICATE(self, tag, args):
|
||||||
|
self._send_textline('+')
|
||||||
|
self.response = yield
|
||||||
|
self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
|
||||||
|
|
||||||
|
with self.reaped_pair(MyServer) as (server, client):
|
||||||
|
with self.assertRaises(imaplib.IMAP4.error):
|
||||||
|
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
|
||||||
|
|
||||||
|
@reap_threads
|
||||||
|
def test_valid_authentication(self):
|
||||||
|
|
||||||
|
class MyServer(SimpleIMAPHandler):
|
||||||
|
|
||||||
|
def cmd_AUTHENTICATE(self, tag, args):
|
||||||
|
self._send_textline('+')
|
||||||
|
self.server.response = yield
|
||||||
|
self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
|
||||||
|
|
||||||
|
with self.reaped_pair(MyServer) as (server, client):
|
||||||
|
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
|
||||||
|
self.assertEqual(code, 'OK')
|
||||||
|
self.assertEqual(server.response,
|
||||||
|
b'ZmFrZQ==\r\n') #b64 encoded 'fake'
|
||||||
|
|
||||||
|
with self.reaped_pair(MyServer) as (server, client):
|
||||||
|
code, data = client.authenticate('MYAUTH', lambda x: 'fake')
|
||||||
|
self.assertEqual(code, 'OK')
|
||||||
|
self.assertEqual(server.response,
|
||||||
|
b'ZmFrZQ==\r\n') #b64 encoded 'fake'
|
||||||
|
|
||||||
|
@reap_threads
|
||||||
|
def test_login_cram_md5(self):
|
||||||
|
|
||||||
|
class AuthHandler(SimpleIMAPHandler):
|
||||||
|
|
||||||
|
capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
|
||||||
|
|
||||||
|
def cmd_AUTHENTICATE(self, tag, args):
|
||||||
|
self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
|
||||||
|
'VzdG9uLm1jaS5uZXQ=')
|
||||||
|
r = yield
|
||||||
|
if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n':
|
||||||
|
self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
|
||||||
|
else:
|
||||||
|
self._send_tagged(tag, 'NO', 'No access')
|
||||||
|
|
||||||
|
with self.reaped_pair(AuthHandler) as (server, client):
|
||||||
|
self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
|
||||||
|
ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
|
||||||
|
self.assertEqual(ret, "OK")
|
||||||
|
|
||||||
|
with self.reaped_pair(AuthHandler) as (server, client):
|
||||||
|
self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
|
||||||
|
ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
|
||||||
|
self.assertEqual(ret, "OK")
|
||||||
|
|
||||||
|
|
||||||
class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
|
class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
|
||||||
|
|
|
@ -260,6 +260,9 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #13700: Fix byte/string handling in imaplib authentication when an
|
||||||
|
authobject is specified.
|
||||||
|
|
||||||
- Issue #13153: Tkinter functions now raise TclError instead of ValueError when
|
- Issue #13153: Tkinter functions now raise TclError instead of ValueError when
|
||||||
a string argument contains non-BMP character.
|
a string argument contains non-BMP character.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue