Fixed issue #3727: poplib module broken by str to unicode conversion

Victor strikes again! Assisted by Barry
This commit is contained in:
Christian Heimes 2008-11-05 19:48:27 +00:00
parent 933238ad85
commit d395629e82
3 changed files with 299 additions and 104 deletions

View File

@ -75,26 +75,30 @@ class POP3:
above.
"""
encoding = 'UTF-8'
def __init__(self, host, port=POP3_PORT,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
self.host = host
self.port = port
self.sock = socket.create_connection((host, port), timeout)
self.sock = self._create_socket(timeout)
self.file = self.sock.makefile('rb')
self._debugging = 0
self.welcome = self._getresp()
def _create_socket(self, timeout):
return socket.create_connection((self.host, self.port), timeout)
def _putline(self, line):
if self._debugging > 1: print('*put*', repr(line))
self.sock.sendall('%s%s' % (line, CRLF))
self.sock.sendall(line + CRLF)
# Internal: send one command to the server (through _putline())
def _putcmd(self, line):
if self._debugging: print('*cmd*', repr(line))
line = bytes(line, self.encoding)
self._putline(line)
@ -123,8 +127,7 @@ class POP3:
def _getresp(self):
resp, o = self._getline()
if self._debugging > 1: print('*resp*', repr(resp))
c = resp[:1]
if c != b'+':
if not resp.startswith(b'+'):
raise error_proto(resp)
return resp
@ -136,7 +139,7 @@ class POP3:
list = []; octets = 0
line, o = self._getline()
while line != b'.':
if line[:2] == b'..':
if line.startswith(b'..'):
o = o-1
line = line[1:]
octets = octets + o
@ -266,25 +269,26 @@ class POP3:
return self._shortcmd('RPOP %s' % user)
timestamp = re.compile(r'\+OK.*(<[^>]+>)')
timestamp = re.compile(br'\+OK.*(<[^>]+>)')
def apop(self, user, secret):
def apop(self, user, password):
"""Authorisation
- only possible if server has supplied a timestamp in initial greeting.
Args:
user - mailbox user;
secret - secret shared between client and server.
user - mailbox user;
password - mailbox password.
NB: mailbox is locked by server from here to 'quit()'
"""
secret = bytes(secret, self.encoding)
m = self.timestamp.match(self.welcome)
if not m:
raise error_proto('-ERR APOP not supported by server')
import hashlib
digest = hashlib.md5(m.group(1)+secret).digest()
digest = ''.join(map(lambda x:'%02x'%ord(x), digest))
digest = m.group(1)+secret
digest = hashlib.md5(digest).hexdigest()
return self._shortcmd('APOP %s %s' % (user, digest))
@ -324,79 +328,19 @@ else:
keyfile - PEM formatted file that countains your private key
certfile - PEM formatted certificate chain file
See the methods of the parent class POP3 for more documentation.
See the methods of the parent class POP3 for more documentation.
"""
def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None):
self.host = host
self.port = port
def __init__(self, host, port=POP3_SSL_PORT,
keyfile=None, certfile=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
self.keyfile = keyfile
self.certfile = certfile
self.buffer = ""
msg = "getaddrinfo returns an empty list"
self.sock = None
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
self.sock = socket.socket(af, socktype, proto)
self.sock.connect(sa)
except socket.error as msg:
if self.sock:
self.sock.close()
self.sock = None
continue
break
if not self.sock:
raise socket.error(msg)
self.file = self.sock.makefile('rb')
self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile)
self._debugging = 0
self.welcome = self._getresp()
POP3.__init__(self, host, port, timeout)
def _fillBuffer(self):
localbuf = self.sslobj.read()
if len(localbuf) == 0:
raise error_proto('-ERR EOF')
self.buffer += localbuf
def _getline(self):
line = ""
renewline = re.compile(r'.*?\n')
match = renewline.match(self.buffer)
while not match:
self._fillBuffer()
match = renewline.match(self.buffer)
line = match.group(0)
self.buffer = renewline.sub('' ,self.buffer, 1)
if self._debugging > 1: print('*get*', repr(line))
octets = len(line)
if line[-2:] == CRLF:
return line[:-2], octets
if line[0] == CR:
return line[1:-1], octets
return line[:-1], octets
def _putline(self, line):
if self._debugging > 1: print('*put*', repr(line))
line += CRLF
bytes = len(line)
while bytes > 0:
sent = self.sslobj.write(line)
if sent == bytes:
break # avoid copy
line = line[sent:]
bytes = bytes - sent
def quit(self):
"""Signoff: commit changes on server, unlock mailbox, close connection."""
try:
resp = self._shortcmd('QUIT')
except error_proto as val:
resp = val
self.sock.close()
del self.sslobj, self.sock
return resp
def _create_socket(self, timeout):
sock = POP3._create_socket(self, timeout)
return ssl.wrap_socket(sock, self.keyfile, self.certfile)
__all__.append("POP3_SSL")

View File

@ -1,43 +1,284 @@
import socket
import threading
"""Test script for poplib module."""
# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
# a real test suite
import poplib
import threading
import asyncore
import asynchat
import socket
import os
import time
from unittest import TestCase
from test import support
from test import support as test_support
HOST = support.HOST
HOST = test_support.HOST
PORT = 0
def server(evt, serv):
serv.listen(5)
try:
conn, addr = serv.accept()
except socket.timeout:
pass
else:
conn.send(b"+ Hola mundo\n")
conn.close()
finally:
serv.close()
evt.set()
# the dummy data returned by server when LIST and RETR commands are issued
LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
RETR_RESP = b"""From: postmaster@python.org\
\r\nContent-Type: text/plain\r\n\
MIME-Version: 1.0\r\n\
Subject: Dummy\r\n\
\r\n\
line1\r\n\
line2\r\n\
line3\r\n\
.\r\n"""
class GeneralTests(TestCase):
class DummyPOP3Handler(asynchat.async_chat):
def __init__(self, conn):
asynchat.async_chat.__init__(self, conn)
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready.')
def collect_incoming_data(self, data):
self.in_buffer.append(data)
def found_terminator(self):
line = b''.join(self.in_buffer)
line = str(line, 'ISO-8859-1')
self.in_buffer = []
cmd = line.split(' ')[0].lower()
space = line.find(' ')
if space != -1:
arg = line[space + 1:]
else:
arg = ""
if hasattr(self, 'cmd_' + cmd):
method = getattr(self, 'cmd_' + cmd)
method(arg)
else:
self.push('-ERR unrecognized POP3 command "%s".' %cmd)
def handle_error(self):
raise
def push(self, data):
asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n')
def cmd_echo(self, arg):
# sends back the received string (used by the test suite)
self.push(arg)
def cmd_user(self, arg):
if arg != "guido":
self.push("-ERR no such user")
self.push('+OK password required')
def cmd_pass(self, arg):
if arg != "python":
self.push("-ERR wrong password")
self.push('+OK 10 messages')
def cmd_stat(self, arg):
self.push('+OK 10 100')
def cmd_list(self, arg):
if arg:
self.push('+OK %s %s' %(arg, arg))
else:
self.push('+OK')
asynchat.async_chat.push(self, LIST_RESP)
cmd_uidl = cmd_list
def cmd_retr(self, arg):
self.push('+OK %s bytes' %len(RETR_RESP))
asynchat.async_chat.push(self, RETR_RESP)
cmd_top = cmd_retr
def cmd_dele(self, arg):
self.push('+OK message marked for deletion.')
def cmd_noop(self, arg):
self.push('+OK done nothing.')
def cmd_rpop(self, arg):
self.push('+OK done nothing.')
class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
handler = DummyPOP3Handler
def __init__(self, address, af=socket.AF_INET):
threading.Thread.__init__(self)
asyncore.dispatcher.__init__(self)
self.create_socket(af, socket.SOCK_STREAM)
self.bind(address)
self.listen(5)
self.active = False
self.active_lock = threading.Lock()
self.host, self.port = self.socket.getsockname()[:2]
def start(self):
assert not self.active
self.__flag = threading.Event()
threading.Thread.start(self)
self.__flag.wait()
def run(self):
self.active = True
self.__flag.set()
while self.active and asyncore.socket_map:
self.active_lock.acquire()
asyncore.loop(timeout=0.1, count=1)
self.active_lock.release()
asyncore.close_all(ignore_all=True)
def stop(self):
assert self.active
self.active = False
self.join()
def handle_accept(self):
conn, addr = self.accept()
self.handler = self.handler(conn)
self.close()
def handle_connect(self):
self.close()
handle_read = handle_connect
def writable(self):
return 0
def handle_error(self):
raise
class TestPOP3Class(TestCase):
def assertOK(self, resp):
self.assertTrue(resp.startswith(b"+OK"))
def setUp(self):
self.server = DummyPOP3Server((HOST, PORT))
self.server.start()
self.client = poplib.POP3(self.server.host, self.server.port)
def tearDown(self):
self.client.quit()
self.server.stop()
def test_getwelcome(self):
self.assertEqual(self.client.getwelcome(), b'+OK dummy pop3 server ready.')
def test_exceptions(self):
self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
def test_user(self):
self.assertOK(self.client.user('guido'))
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
def test_pass_(self):
self.assertOK(self.client.pass_('python'))
self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
def test_stat(self):
self.assertEqual(self.client.stat(), (10, 100))
def test_list(self):
self.assertEqual(self.client.list()[1:],
([b'1 1', b'2 2', b'3 3', b'4 4', b'5 5'],
25))
self.assertTrue(self.client.list('1').endswith(b"OK 1 1"))
def test_retr(self):
expected = (b'+OK 116 bytes',
[b'From: postmaster@python.org', b'Content-Type: text/plain',
b'MIME-Version: 1.0', b'Subject: Dummy',
b'', b'line1', b'line2', b'line3'],
113)
foo = self.client.retr('foo')
self.assertEqual(foo, expected)
def test_dele(self):
self.assertOK(self.client.dele('foo'))
def test_noop(self):
self.assertOK(self.client.noop())
def test_rpop(self):
self.assertOK(self.client.rpop('foo'))
def test_top(self):
expected = (b'+OK 116 bytes',
[b'From: postmaster@python.org', b'Content-Type: text/plain',
b'MIME-Version: 1.0', b'Subject: Dummy', b'',
b'line1', b'line2', b'line3'],
113)
self.assertEqual(self.client.top(1, 1), expected)
def test_uidl(self):
self.client.uidl()
self.client.uidl('foo')
SUPPORTS_SSL = False
if hasattr(poplib, 'POP3_SSL'):
import ssl
SUPPORTS_SSL = True
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
class DummyPOP3_SSLHandler(DummyPOP3Handler):
def __init__(self, conn):
asynchat.async_chat.__init__(self, conn)
ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
server_side=True)
self.del_channel()
self.set_socket(ssl_socket)
self.set_terminator(b"\r\n")
self.in_buffer = []
self.push('+OK dummy pop3 server ready.')
class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
def setUp(self):
self.server = DummyPOP3Server((HOST, PORT))
self.server.handler = DummyPOP3_SSLHandler
self.server.start()
self.client = poplib.POP3_SSL(self.server.host, self.server.port)
def test__all__(self):
self.assert_('POP3_SSL' in poplib.__all__)
class TestTimeouts(TestCase):
def setUp(self):
self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = support.bind_port(self.sock)
threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.port = test_support.bind_port(self.sock)
threading.Thread(target=self.server, args=(self.evt,self.sock)).start()
time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic(self):
# connects
pop = poplib.POP3(HOST, self.port)
pop.sock.close()
def server(self, evt, serv):
serv.listen(5)
try:
conn, addr = serv.accept()
except socket.timeout:
pass
else:
conn.send(b"+ Hola mundo\n")
conn.close()
finally:
serv.close()
evt.set()
def testTimeoutDefault(self):
self.assertTrue(socket.getdefaulttimeout() is None)
@ -65,8 +306,16 @@ class GeneralTests(TestCase):
pop.sock.close()
def test_main(verbose=None):
support.run_unittest(GeneralTests)
def test_main():
tests = [TestPOP3Class, TestTimeouts]
if SUPPORTS_SSL:
tests.append(TestPOP3_SSLClass)
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
finally:
test_support.threading_cleanup(*thread_info)
if __name__ == '__main__':
test_main()

View File

@ -15,6 +15,8 @@ What's New in Python 3.0 beta 5
Core and Builtins
-----------------
- Issue #3727: Fixed poplib
- Issue #3714: Fixed nntplib by using bytes where appropriate.
- Issue #1210: Fixed imaplib and its documentation.