cpython/Lib/test/test_smtplib.py

183 lines
4.8 KiB
Python
Raw Normal View History

import asyncore
import socket
import threading
import smtpd
import smtplib
import StringIO
import sys
import time
import select
from unittest import TestCase
from test import test_support
HOST = "localhost"
PORT = 54328
def server(evt, buf):
serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv.settimeout(3)
serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv.bind(("", PORT))
serv.listen(5)
try:
conn, addr = serv.accept()
except socket.timeout:
pass
else:
n = 200
while buf and n > 0:
r, w, e = select.select([], [conn], [])
if w:
sent = conn.send(buf)
buf = buf[sent:]
n -= 1
time.sleep(0.01)
conn.close()
finally:
serv.close()
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
servargs = (self.evt, "220 Hola mundo\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic1(self):
# connects
smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testNonnumericPort(self):
# check that non-numeric port raises ValueError
self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")
def testTimeoutDefault(self):
# default
smtp = smtplib.SMTP(HOST, PORT)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
smtp = smtplib.SMTP(HOST, PORT, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
def testTimeoutNone(self):
# None, having other default
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
smtp = smtplib.SMTP(HOST, PORT, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
# Test server using smtpd.DebuggingServer
def debugging_server(evt):
serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))
try:
asyncore.loop(timeout=.01, count=300)
except socket.timeout:
pass
finally:
# allow some time for the client to read the result
time.sleep(0.5)
asyncore.close_all()
evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'
# Test behavior of smtpd.DebuggingServer
class DebuggingServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
threading.Thread(target=debugging_server, args=(self.evt,)).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testBasic(self):
# connect
smtp = smtplib.SMTP(HOST, PORT)
smtp.sock.close()
def testEHLO(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
smtp.sock.close()
def testHELP(self):
smtp = smtplib.SMTP(HOST, PORT)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.sock.close()
def testSend(self):
# connect and send mail
m = 'A test message'
smtp = smtplib.SMTP(HOST, PORT)
smtp.sendmail('John', 'Sally', m)
smtp.sock.close()
self.evt.wait()
self.output.flush()
mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
self.assertEqual(self.output.getvalue(), mexpect)
class BadHELOServerTests(TestCase):
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
servargs = (self.evt, "199 no hello for you!\n")
threading.Thread(target=server, args=servargs).start()
time.sleep(.5)
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)
if __name__ == '__main__':
test_main()