Refactor test_email/test_defect_handling.

This commit is contained in:
R David Murray 2012-05-28 20:14:10 -04:00
parent 3e0a1eb889
commit d41595b920
2 changed files with 184 additions and 201 deletions

View File

@ -1,84 +1,89 @@
import textwrap import textwrap
import unittest import unittest
from email._policybase import Compat32 import contextlib
from email import policy
from email import errors from email import errors
from test.test_email import TestEmailBase from test.test_email import TestEmailBase
class TestMessageDefectDetectionBase: class TestDefectsBase:
dup_boundary_msg = textwrap.dedent("""\ policy = policy.default
Subject: XX raise_expected = False
From: xx@xx.dk
To: XX
Mime-version: 1.0
Content-type: multipart/mixed;
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
--MS_Mac_OE_3071477847_720252_MIME_Part @contextlib.contextmanager
Content-type: multipart/alternative; def _raise_point(self, defect):
boundary="MS_Mac_OE_3071477847_720252_MIME_Part" yield
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: text/plain; charset="ISO-8859-1"
Content-transfer-encoding: quoted-printable
text
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: text/html; charset="ISO-8859-1"
Content-transfer-encoding: quoted-printable
<HTML></HTML>
--MS_Mac_OE_3071477847_720252_MIME_Part--
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: image/gif; name="xx.gif";
Content-disposition: attachment
Content-transfer-encoding: base64
Some removed base64 encoded chars.
--MS_Mac_OE_3071477847_720252_MIME_Part--
""")
def test_same_boundary_inner_outer(self): def test_same_boundary_inner_outer(self):
source = textwrap.dedent("""\
Subject: XX
From: xx@xx.dk
To: XX
Mime-version: 1.0
Content-type: multipart/mixed;
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: multipart/alternative;
boundary="MS_Mac_OE_3071477847_720252_MIME_Part"
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: text/plain; charset="ISO-8859-1"
Content-transfer-encoding: quoted-printable
text
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: text/html; charset="ISO-8859-1"
Content-transfer-encoding: quoted-printable
<HTML></HTML>
--MS_Mac_OE_3071477847_720252_MIME_Part--
--MS_Mac_OE_3071477847_720252_MIME_Part
Content-type: image/gif; name="xx.gif";
Content-disposition: attachment
Content-transfer-encoding: base64
Some removed base64 encoded chars.
--MS_Mac_OE_3071477847_720252_MIME_Part--
""")
# XXX better would be to actually detect the duplicate. # XXX better would be to actually detect the duplicate.
msg = self._str_msg(self.dup_boundary_msg) with self._raise_point(errors.StartBoundaryNotFoundDefect):
msg = self._str_msg(source)
if self.raise_expected: return
inner = msg.get_payload(0) inner = msg.get_payload(0)
self.assertTrue(hasattr(inner, 'defects')) self.assertTrue(hasattr(inner, 'defects'))
self.assertEqual(len(self.get_defects(inner)), 1) self.assertEqual(len(self.get_defects(inner)), 1)
self.assertTrue(isinstance(self.get_defects(inner)[0], self.assertTrue(isinstance(self.get_defects(inner)[0],
errors.StartBoundaryNotFoundDefect)) errors.StartBoundaryNotFoundDefect))
def test_same_boundary_inner_outer_raises_on_defect(self):
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
self._str_msg(self.dup_boundary_msg,
policy=self.policy.clone(raise_on_defect=True))
no_boundary_msg = textwrap.dedent("""\
Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
From: foobar
Subject: broken mail
MIME-Version: 1.0
Content-Type: multipart/report; report-type=delivery-status;
--JAB03225.986577786/zinfandel.lacita.com
One part
--JAB03225.986577786/zinfandel.lacita.com
Content-Type: message/delivery-status
Header: Another part
--JAB03225.986577786/zinfandel.lacita.com--
""")
def test_multipart_no_boundary(self): def test_multipart_no_boundary(self):
msg = self._str_msg(self.no_boundary_msg) source = textwrap.dedent("""\
Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800)
From: foobar
Subject: broken mail
MIME-Version: 1.0
Content-Type: multipart/report; report-type=delivery-status;
--JAB03225.986577786/zinfandel.lacita.com
One part
--JAB03225.986577786/zinfandel.lacita.com
Content-Type: message/delivery-status
Header: Another part
--JAB03225.986577786/zinfandel.lacita.com--
""")
with self._raise_point(errors.NoBoundaryInMultipartDefect):
msg = self._str_msg(source)
if self.raise_expected: return
self.assertTrue(isinstance(msg.get_payload(), str)) self.assertTrue(isinstance(msg.get_payload(), str))
self.assertEqual(len(self.get_defects(msg)), 2) self.assertEqual(len(self.get_defects(msg)), 2)
self.assertTrue(isinstance(self.get_defects(msg)[0], self.assertTrue(isinstance(self.get_defects(msg)[0],
@ -86,11 +91,6 @@ class TestMessageDefectDetectionBase:
self.assertTrue(isinstance(self.get_defects(msg)[1], self.assertTrue(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect)) errors.MultipartInvariantViolationDefect))
def test_multipart_no_boundary_raise_on_defect(self):
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
self._str_msg(self.no_boundary_msg,
policy=self.policy.clone(raise_on_defect=True))
multipart_msg = textwrap.dedent("""\ multipart_msg = textwrap.dedent("""\
Date: Wed, 14 Nov 2007 12:56:23 GMT Date: Wed, 14 Nov 2007 12:56:23 GMT
From: foo@bar.invalid From: foo@bar.invalid
@ -115,43 +115,42 @@ class TestMessageDefectDetectionBase:
""") """)
def test_multipart_invalid_cte(self): def test_multipart_invalid_cte(self):
msg = self._str_msg( with self._raise_point(
self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) errors.InvalidMultipartContentTransferEncodingDefect):
msg = self._str_msg(
self.multipart_msg.format(
"\nContent-Transfer-Encoding: base64"))
if self.raise_expected: return
self.assertEqual(len(self.get_defects(msg)), 1) self.assertEqual(len(self.get_defects(msg)), 1)
self.assertIsInstance(self.get_defects(msg)[0], self.assertIsInstance(self.get_defects(msg)[0],
errors.InvalidMultipartContentTransferEncodingDefect) errors.InvalidMultipartContentTransferEncodingDefect)
def test_multipart_invalid_cte_raise_on_defect(self):
with self.assertRaises(
errors.InvalidMultipartContentTransferEncodingDefect):
self._str_msg(
self.multipart_msg.format(
"\nContent-Transfer-Encoding: base64"),
policy=self.policy.clone(raise_on_defect=True))
def test_multipart_no_cte_no_defect(self): def test_multipart_no_cte_no_defect(self):
if self.raise_expected: return
msg = self._str_msg(self.multipart_msg.format('')) msg = self._str_msg(self.multipart_msg.format(''))
self.assertEqual(len(self.get_defects(msg)), 0) self.assertEqual(len(self.get_defects(msg)), 0)
def test_multipart_valid_cte_no_defect(self): def test_multipart_valid_cte_no_defect(self):
if self.raise_expected: return
for cte in ('7bit', '8bit', 'BINary'): for cte in ('7bit', '8bit', 'BINary'):
msg = self._str_msg( msg = self._str_msg(
self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte))
self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte)
lying_multipart_msg = textwrap.dedent("""\
From: "Allison Dunlap" <xxx@example.com>
To: yyy@example.com
Subject: 64423
Date: Sun, 11 Jul 2004 16:09:27 -0300
MIME-Version: 1.0
Content-Type: multipart/alternative;
Blah blah blah
""")
def test_lying_multipart(self): def test_lying_multipart(self):
msg = self._str_msg(self.lying_multipart_msg) source = textwrap.dedent("""\
From: "Allison Dunlap" <xxx@example.com>
To: yyy@example.com
Subject: 64423
Date: Sun, 11 Jul 2004 16:09:27 -0300
MIME-Version: 1.0
Content-Type: multipart/alternative;
Blah blah blah
""")
with self._raise_point(errors.NoBoundaryInMultipartDefect):
msg = self._str_msg(source)
if self.raise_expected: return
self.assertTrue(hasattr(msg, 'defects')) self.assertTrue(hasattr(msg, 'defects'))
self.assertEqual(len(self.get_defects(msg)), 2) self.assertEqual(len(self.get_defects(msg)), 2)
self.assertTrue(isinstance(self.get_defects(msg)[0], self.assertTrue(isinstance(self.get_defects(msg)[0],
@ -159,34 +158,28 @@ class TestMessageDefectDetectionBase:
self.assertTrue(isinstance(self.get_defects(msg)[1], self.assertTrue(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect)) errors.MultipartInvariantViolationDefect))
def test_lying_multipart_raise_on_defect(self):
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
self._str_msg(self.lying_multipart_msg,
policy=self.policy.clone(raise_on_defect=True))
missing_start_boundary_msg = textwrap.dedent("""\
Content-Type: multipart/mixed; boundary="AAA"
From: Mail Delivery Subsystem <xxx@example.com>
To: yyy@example.com
--AAA
Stuff
--AAA
Content-Type: message/rfc822
From: webmaster@python.org
To: zzz@example.com
Content-Type: multipart/mixed; boundary="BBB"
--BBB--
--AAA--
""")
def test_missing_start_boundary(self): def test_missing_start_boundary(self):
source = textwrap.dedent("""\
Content-Type: multipart/mixed; boundary="AAA"
From: Mail Delivery Subsystem <xxx@example.com>
To: yyy@example.com
--AAA
Stuff
--AAA
Content-Type: message/rfc822
From: webmaster@python.org
To: zzz@example.com
Content-Type: multipart/mixed; boundary="BBB"
--BBB--
--AAA--
""")
# The message structure is: # The message structure is:
# #
# multipart/mixed # multipart/mixed
@ -195,19 +188,18 @@ class TestMessageDefectDetectionBase:
# multipart/mixed [*] # multipart/mixed [*]
# #
# [*] This message is missing its start boundary # [*] This message is missing its start boundary
outer = self._str_msg(self.missing_start_boundary_msg) with self._raise_point(errors.StartBoundaryNotFoundDefect):
outer = self._str_msg(source)
if self.raise_expected: return
bad = outer.get_payload(1).get_payload(0) bad = outer.get_payload(1).get_payload(0)
self.assertEqual(len(self.get_defects(bad)), 1) self.assertEqual(len(self.get_defects(bad)), 1)
self.assertTrue(isinstance(self.get_defects(bad)[0], self.assertTrue(isinstance(self.get_defects(bad)[0],
errors.StartBoundaryNotFoundDefect)) errors.StartBoundaryNotFoundDefect))
def test_missing_start_boundary_raise_on_defect(self):
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
self._str_msg(self.missing_start_boundary_msg,
policy=self.policy.clone(raise_on_defect=True))
def test_first_line_is_continuation_header(self): def test_first_line_is_continuation_header(self):
msg = self._str_msg(' Line 1\nSubject: test\n\nbody') with self._raise_point(errors.FirstHeaderLineIsContinuationDefect):
msg = self._str_msg(' Line 1\nSubject: test\n\nbody')
if self.raise_expected: return
self.assertEqual(msg.keys(), ['Subject']) self.assertEqual(msg.keys(), ['Subject'])
self.assertEqual(msg.get_payload(), 'body') self.assertEqual(msg.get_payload(), 'body')
self.assertEqual(len(self.get_defects(msg)), 1) self.assertEqual(len(self.get_defects(msg)), 1)
@ -215,113 +207,92 @@ class TestMessageDefectDetectionBase:
[errors.FirstHeaderLineIsContinuationDefect]) [errors.FirstHeaderLineIsContinuationDefect])
self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n')
def test_first_line_is_continuation_header_raise_on_defect(self):
with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
self._str_msg(' Line 1\nSubject: test\n\nbody\n',
policy=self.policy.clone(raise_on_defect=True))
def test_missing_header_body_separator(self): def test_missing_header_body_separator(self):
# Our heuristic if we see a line that doesn't look like a header (no # Our heuristic if we see a line that doesn't look like a header (no
# leading whitespace but no ':') is to assume that the blank line that # leading whitespace but no ':') is to assume that the blank line that
# separates the header from the body is missing, and to stop parsing # separates the header from the body is missing, and to stop parsing
# headers and start parsing the body. # headers and start parsing the body.
msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') with self._raise_point(errors.MissingHeaderBodySeparatorDefect):
msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n')
if self.raise_expected: return
self.assertEqual(msg.keys(), ['Subject']) self.assertEqual(msg.keys(), ['Subject'])
self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n')
self.assertDefectsEqual(self.get_defects(msg), self.assertDefectsEqual(self.get_defects(msg),
[errors.MissingHeaderBodySeparatorDefect]) [errors.MissingHeaderBodySeparatorDefect])
def test_missing_header_body_separator_raise_on_defect(self):
with self.assertRaises(errors.MissingHeaderBodySeparatorDefect):
self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n',
policy=self.policy.clone(raise_on_defect=True))
badly_padded_base64_payload = textwrap.dedent("""\
Subject: test
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64
dmk
""")
def test_bad_padding_in_base64_payload(self): def test_bad_padding_in_base64_payload(self):
msg = self._str_msg(self.badly_padded_base64_payload) source = textwrap.dedent("""\
self.assertEqual(msg.get_payload(decode=True), b'vi') Subject: test
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64
dmk
""")
msg = self._str_msg(source)
with self._raise_point(errors.InvalidBase64PaddingDefect):
payload = msg.get_payload(decode=True)
if self.raise_expected: return
self.assertEqual(payload, b'vi')
self.assertDefectsEqual(self.get_defects(msg), self.assertDefectsEqual(self.get_defects(msg),
[errors.InvalidBase64PaddingDefect]) [errors.InvalidBase64PaddingDefect])
def test_bad_padding_in_base64_payload_raise_on_defect(self):
msg = self._str_msg(self.badly_padded_base64_payload,
policy=self.policy.clone(raise_on_defect=True))
with self.assertRaises(errors.InvalidBase64PaddingDefect):
msg.get_payload(decode=True)
invalid_chars_in_base64_payload = textwrap.dedent("""\
Subject: test
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64
dm\x01k===
""")
def test_invalid_chars_in_base64_payload(self): def test_invalid_chars_in_base64_payload(self):
msg = self._str_msg(self.invalid_chars_in_base64_payload) source = textwrap.dedent("""\
self.assertEqual(msg.get_payload(decode=True), b'vi') Subject: test
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64
dm\x01k===
""")
msg = self._str_msg(source)
with self._raise_point(errors.InvalidBase64CharactersDefect):
payload = msg.get_payload(decode=True)
if self.raise_expected: return
self.assertEqual(payload, b'vi')
self.assertDefectsEqual(self.get_defects(msg), self.assertDefectsEqual(self.get_defects(msg),
[errors.InvalidBase64CharactersDefect]) [errors.InvalidBase64CharactersDefect])
def test_invalid_chars_in_base64_payload_raise_on_defect(self):
msg = self._str_msg(self.invalid_chars_in_base64_payload,
policy=self.policy.clone(raise_on_defect=True))
with self.assertRaises(errors.InvalidBase64CharactersDefect):
msg.get_payload(decode=True)
missing_ending_boundary = textwrap.dedent("""\
To: 1@harrydomain4.com
Subject: Fwd: 1
MIME-Version: 1.0
Content-Type: multipart/alternative;
boundary="------------000101020201080900040301"
--------------000101020201080900040301
Content-Type: text/plain; charset=ISO-8859-1
Content-Transfer-Encoding: 7bit
Alternative 1
--------------000101020201080900040301
Content-Type: text/html; charset=ISO-8859-1
Content-Transfer-Encoding: 7bit
Alternative 2
""")
def test_missing_ending_boundary(self): def test_missing_ending_boundary(self):
msg = self._str_msg(self.missing_ending_boundary) source = textwrap.dedent("""\
To: 1@harrydomain4.com
Subject: Fwd: 1
MIME-Version: 1.0
Content-Type: multipart/alternative;
boundary="------------000101020201080900040301"
--------------000101020201080900040301
Content-Type: text/plain; charset=ISO-8859-1
Content-Transfer-Encoding: 7bit
Alternative 1
--------------000101020201080900040301
Content-Type: text/html; charset=ISO-8859-1
Content-Transfer-Encoding: 7bit
Alternative 2
""")
with self._raise_point(errors.CloseBoundaryNotFoundDefect):
msg = self._str_msg(source)
if self.raise_expected: return
self.assertEqual(len(msg.get_payload()), 2) self.assertEqual(len(msg.get_payload()), 2)
self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n') self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n')
self.assertDefectsEqual(self.get_defects(msg), self.assertDefectsEqual(self.get_defects(msg),
[errors.CloseBoundaryNotFoundDefect]) [errors.CloseBoundaryNotFoundDefect])
def test_missing_ending_boundary_raise_on_defect(self):
with self.assertRaises(errors.CloseBoundaryNotFoundDefect):
self._str_msg(self.missing_ending_boundary,
policy=self.policy.clone(raise_on_defect=True))
class TestDefectDetection(TestDefectsBase, TestEmailBase):
class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase):
def get_defects(self, obj): def get_defects(self, obj):
return obj.defects return obj.defects
class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, class TestDefectCapture(TestDefectsBase, TestEmailBase):
TestEmailBase):
class CapturePolicy(Compat32): class CapturePolicy(policy.EmailPolicy):
captured = None captured = None
def register_defect(self, obj, defect): def register_defect(self, obj, defect):
self.captured.append(defect) self.captured.append(defect)
@ -333,5 +304,17 @@ class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase,
return self.policy.captured return self.policy.captured
class TestDefectRaising(TestDefectsBase, TestEmailBase):
policy = TestDefectsBase.policy
policy = policy.clone(raise_on_defect=True)
raise_expected = True
@contextlib.contextmanager
def _raise_point(self, defect):
with self.assertRaises(defect):
yield
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -1973,7 +1973,7 @@ counter to RFC 2822, there's no separating newline here
[errors.FirstHeaderLineIsContinuationDefect]) [errors.FirstHeaderLineIsContinuationDefect])
eq(msg.defects[0].line, ' Line 1\n') eq(msg.defects[0].line, ' Line 1\n')
# test_parser.TestMessageDefectDetectionBase # test_defect_handling
def test_missing_header_body_separator(self): def test_missing_header_body_separator(self):
# Our heuristic if we see a line that doesn't look like a header (no # Our heuristic if we see a line that doesn't look like a header (no
# leading whitespace but no ':') is to assume that the blank line that # leading whitespace but no ':') is to assume that the blank line that