Improved test coverage.

This commit is contained in:
Vinay Sajip 2011-04-30 21:52:48 +01:00
parent de19e08de4
commit a463d25930
1 changed files with 105 additions and 4 deletions

View File

@ -25,8 +25,11 @@ import logging
import logging.handlers
import logging.config
import asynchat
import asyncore
import codecs
import datetime
import errno
import pickle
import io
import gc
@ -35,6 +38,7 @@ import os
import queue
import re
import select
import smtpd
import socket
from socketserver import ThreadingTCPServer, StreamRequestHandler
import struct
@ -547,9 +551,6 @@ class HandlerTest(BaseTest):
h.close()
except socket.error: # syslogd might not be available
pass
h = logging.handlers.SMTPHandler('localhost', 'me', 'you', 'Log')
self.assertEqual(h.toaddrs, ['you'])
h.close()
for method in ('GET', 'POST', 'PUT'):
if method == 'PUT':
self.assertRaises(ValueError, logging.handlers.HTTPHandler,
@ -595,6 +596,106 @@ class StreamHandlerTest(BaseTest):
logging.raiseExceptions = old_raise
sys.stderr = old_stderr
TEST_SMTP_PORT = 9025
class TestSMTPChannel(smtpd.SMTPChannel):
def __init__(self, server, conn, addr, sockmap):
asynchat.async_chat.__init__(self, conn, sockmap)
self.smtp_server = server
self.conn = conn
self.addr = addr
self.received_lines = []
self.smtp_state = self.COMMAND
self.seen_greeting = ''
self.mailfrom = None
self.rcpttos = []
self.received_data = ''
self.fqdn = socket.getfqdn()
self.num_bytes = 0
try:
self.peer = conn.getpeername()
except socket.error as err:
# a race condition may occur if the other end is closing
# before we can get the peername
self.close()
if err.args[0] != errno.ENOTCONN:
raise
return
self.push('220 %s %s' % (self.fqdn, smtpd.__version__))
self.set_terminator(b'\r\n')
class TestSMTPServer(smtpd.SMTPServer):
channel_class = TestSMTPChannel
def __init__(self, addr, handler, poll_interval, sockmap):
self._localaddr = addr
self._remoteaddr = None
self.sockmap = sockmap
asyncore.dispatcher.__init__(self, map=sockmap)
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
self.set_socket(sock, map=sockmap)
# try to re-use a server port if possible
self.set_reuse_addr()
self.bind(addr)
self.listen(5)
except:
self.close()
raise
self._handler = handler
self._thread = None
self.poll_interval = poll_interval
def handle_accepted(self, conn, addr):
print('Incoming connection from %s' % repr(addr), file=smtpd.DEBUGSTREAM)
channel = self.channel_class(self, conn, addr, self.sockmap)
def process_message(self, peer, mailfrom, rcpttos, data):
self._handler(peer, mailfrom, rcpttos, data)
def start(self):
self._thread = t = threading.Thread(target=self.serve_forever,
args=(self.poll_interval,))
t.setDaemon(True)
t.start()
def serve_forever(self, poll_interval):
asyncore.loop(poll_interval, map=self.sockmap)
def stop(self):
self.close()
self._thread.join()
self._thread = None
class SMTPHandlerTest(BaseTest):
def test_basic(self):
addr = ('localhost', TEST_SMTP_PORT)
h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log')
self.assertEqual(h.toaddrs, ['you'])
self.messages = []
sockmap = {}
server = TestSMTPServer(addr, self.process_message, 0.001, sockmap)
server.start()
r = logging.makeLogRecord({'msg': 'Hello'})
h.handle(r)
server.stop()
self.assertEqual(len(self.messages), 1)
peer, mailfrom, rcpttos, data = self.messages[0]
self.assertEqual(mailfrom, 'me')
self.assertEqual(rcpttos, ['you'])
self.assertTrue('\nSubject: Log\n' in data)
self.assertTrue(data.endswith('\n\nHello'))
h.close()
def process_message(self, *args):
self.messages.append(args)
class MemoryHandlerTest(BaseTest):
"""Tests for the MemoryHandler."""
@ -3142,7 +3243,7 @@ def test_main():
FormatterTest, BufferingFormatterTest, StreamHandlerTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
LoggerAdapterTest, LoggerTest,
LoggerAdapterTest, LoggerTest, SMTPHandlerTest,
FileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LogRecordTest, ExceptionTest,
TimedRotatingFileHandlerTest