added new IMAP4_stream class; added proxyauth command; added login_cram_md5 method
This commit is contained in:
parent
d3c821ee71
commit
e0273de432
121
Lib/imaplib.py
121
Lib/imaplib.py
|
@ -17,10 +17,11 @@ Public functions: Internaldate2tuple
|
|||
# GET/SETACL contributed by Anthony Baxter <anthony@interlink.com.au> April 2001.
|
||||
# IMAP4_SSL contributed by Tino Lange <Tino.Lange@isg.de> March 2002.
|
||||
# GET/SETQUOTA contributed by Andreas Zeidler <az@kreativkombinat.de> June 2002.
|
||||
# PROXYAUTH contributed by Rick Holbert <holbert.13@osu.edu> November 2002.
|
||||
|
||||
__version__ = "2.53"
|
||||
__version__ = "2.54"
|
||||
|
||||
import binascii, re, socket, time, random, sys
|
||||
import binascii, os, random, re, socket, sys, time
|
||||
|
||||
__all__ = ["IMAP4", "IMAP4_SSL", "Internaldate2tuple",
|
||||
"Int2AP", "ParseFlags", "Time2Internaldate"]
|
||||
|
@ -58,6 +59,7 @@ Commands = {
|
|||
'NAMESPACE': ('AUTH', 'SELECTED'),
|
||||
'NOOP': ('NONAUTH', 'AUTH', 'SELECTED', 'LOGOUT'),
|
||||
'PARTIAL': ('SELECTED',), # NB: obsolete
|
||||
'PROXYAUTH': ('AUTH',),
|
||||
'RENAME': ('AUTH', 'SELECTED'),
|
||||
'SEARCH': ('SELECTED',),
|
||||
'SELECT': ('AUTH', 'SELECTED'),
|
||||
|
@ -111,7 +113,10 @@ class IMAP4:
|
|||
|
||||
Each command returns a tuple: (type, [data, ...]) where 'type'
|
||||
is usually 'OK' or 'NO', and 'data' is either the text from the
|
||||
tagged response, or untagged results from command.
|
||||
tagged response, or untagged results from command. Each 'data'
|
||||
is either a string, or a tuple. If a tuple, then the first part
|
||||
is the header of the response, and the second part contains
|
||||
the data (ie: 'literal' value).
|
||||
|
||||
Errors raise the exception class <instance>.error("<reason>").
|
||||
IMAP4 server errors raise <instance>.abort("<reason>"),
|
||||
|
@ -325,8 +330,8 @@ class IMAP4:
|
|||
"""
|
||||
mech = mechanism.upper()
|
||||
cap = 'AUTH=%s' % mech
|
||||
if not cap in self.capabilities:
|
||||
raise self.error("Server doesn't allow %s authentication." % mech)
|
||||
#if not cap in self.capabilities: # Let the server decide!
|
||||
# raise self.error("Server doesn't allow %s authentication." % mech)
|
||||
self.literal = _Authenticator(authobject).process
|
||||
typ, dat = self._simple_command('AUTHENTICATE', mech)
|
||||
if typ != 'OK':
|
||||
|
@ -461,8 +466,6 @@ class IMAP4:
|
|||
|
||||
NB: 'password' will be quoted.
|
||||
"""
|
||||
#if not 'AUTH=LOGIN' in self.capabilities:
|
||||
# raise self.error("Server doesn't allow LOGIN authentication." % mech)
|
||||
typ, dat = self._simple_command('LOGIN', user, self._quote(password))
|
||||
if typ != 'OK':
|
||||
raise self.error(dat[-1])
|
||||
|
@ -470,6 +473,21 @@ class IMAP4:
|
|||
return typ, dat
|
||||
|
||||
|
||||
def login_cram_md5(self, user, password):
|
||||
""" Force use of CRAM-MD5 authentication.
|
||||
|
||||
(typ, [data]) = <instance>.login_cram_md5(user, password)
|
||||
"""
|
||||
self.user, self.password = user, password
|
||||
return self.authenticate('CRAM-MD5', self._CRAM_MD5_AUTH)
|
||||
|
||||
|
||||
def _CRAM_MD5_AUTH(self, challenge):
|
||||
""" Authobject to use with CRAM-MD5 authentication. """
|
||||
import hmac
|
||||
return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest()
|
||||
|
||||
|
||||
def logout(self):
|
||||
"""Shutdown connection to server.
|
||||
|
||||
|
@ -511,7 +529,7 @@ class IMAP4:
|
|||
def noop(self):
|
||||
"""Send NOOP command.
|
||||
|
||||
(typ, data) = <instance>.noop()
|
||||
(typ, [data]) = <instance>.noop()
|
||||
"""
|
||||
if __debug__:
|
||||
if self.debug >= 3:
|
||||
|
@ -531,10 +549,23 @@ class IMAP4:
|
|||
return self._untagged_response(typ, dat, 'FETCH')
|
||||
|
||||
|
||||
def proxyauth(self, user):
|
||||
"""Assume authentication as "user".
|
||||
|
||||
Allows an authorised administrator to proxy into any user's
|
||||
mailbox.
|
||||
|
||||
(typ, [data]) = <instance>.proxyauth(user)
|
||||
"""
|
||||
|
||||
name = 'PROXYAUTH'
|
||||
return self._simple_command('PROXYAUTH', user)
|
||||
|
||||
|
||||
def rename(self, oldmailbox, newmailbox):
|
||||
"""Rename old mailbox name to new.
|
||||
|
||||
(typ, data) = <instance>.rename(oldmailbox, newmailbox)
|
||||
(typ, [data]) = <instance>.rename(oldmailbox, newmailbox)
|
||||
"""
|
||||
return self._simple_command('RENAME', oldmailbox, newmailbox)
|
||||
|
||||
|
@ -1107,6 +1138,58 @@ class IMAP4_SSL(IMAP4):
|
|||
|
||||
|
||||
|
||||
class IMAP4_stream(IMAP4):
|
||||
|
||||
"""IMAP4 client class over a stream
|
||||
|
||||
Instantiate with: IMAP4_stream(command)
|
||||
|
||||
where "command" is a string that can be passed to os.popen2()
|
||||
|
||||
for more documentation see the docstring of the parent class IMAP4.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, command):
|
||||
self.command = command
|
||||
IMAP4.__init__(self)
|
||||
|
||||
|
||||
def open(self, host = None, port = None):
|
||||
"""Setup a stream connection.
|
||||
This connection will be used by the routines:
|
||||
read, readline, send, shutdown.
|
||||
"""
|
||||
self.host = None # For compatibility with parent class
|
||||
self.port = None
|
||||
self.sock = None
|
||||
self.file = None
|
||||
self.writefile, self.readfile = os.popen2(self.command)
|
||||
|
||||
|
||||
def read(self, size):
|
||||
"""Read 'size' bytes from remote."""
|
||||
return self.readfile.read(size)
|
||||
|
||||
|
||||
def readline(self):
|
||||
"""Read line from remote."""
|
||||
return self.readfile.readline()
|
||||
|
||||
|
||||
def send(self, data):
|
||||
"""Send data to remote."""
|
||||
self.writefile.write(data)
|
||||
self.writefile.flush()
|
||||
|
||||
|
||||
def shutdown(self):
|
||||
"""Close I/O established in "open"."""
|
||||
self.readfile.close()
|
||||
self.writefile.close()
|
||||
|
||||
|
||||
|
||||
class _Authenticator:
|
||||
|
||||
"""Private class to provide en/decoding
|
||||
|
@ -1251,16 +1334,24 @@ def Time2Internaldate(date_time):
|
|||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# To test: invoke either as 'python imaplib.py [IMAP4_server_hostname]'
|
||||
# or 'python imaplib.py -s "rsh IMAP4_server_hostname exec /etc/rimapd"'
|
||||
# to test the IMAP4_stream class
|
||||
|
||||
import getopt, getpass
|
||||
|
||||
try:
|
||||
optlist, args = getopt.getopt(sys.argv[1:], 'd:')
|
||||
optlist, args = getopt.getopt(sys.argv[1:], 'd:s:')
|
||||
except getopt.error, val:
|
||||
pass
|
||||
optlist, args = (), ()
|
||||
|
||||
stream_command = None
|
||||
for opt,val in optlist:
|
||||
if opt == '-d':
|
||||
Debug = int(val)
|
||||
elif opt == '-s':
|
||||
stream_command = val
|
||||
if not args: args = (stream_command,)
|
||||
|
||||
if not args: args = ('',)
|
||||
|
||||
|
@ -1301,10 +1392,16 @@ if __name__ == '__main__':
|
|||
M._mesg('%s %s' % (cmd, args))
|
||||
typ, dat = apply(getattr(M, cmd), args)
|
||||
M._mesg('%s => %s %s' % (cmd, typ, dat))
|
||||
if typ == 'NO': raise dat[0]
|
||||
return dat
|
||||
|
||||
try:
|
||||
M = IMAP4(host)
|
||||
if stream_command:
|
||||
M = IMAP4_stream(stream_command)
|
||||
else:
|
||||
M = IMAP4(host)
|
||||
if M.state == 'AUTH':
|
||||
test_seq1 = test_seq1[1:] # Login not needed
|
||||
M._mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
|
||||
M._mesg('CAPABILITIES = %s' % `M.capabilities`)
|
||||
|
||||
|
|
Loading…
Reference in New Issue