From d41595b920d098e8f05246cef0af6175d6c045be Mon Sep 17 00:00:00 2001 From: R David Murray Date: Mon, 28 May 2012 20:14:10 -0400 Subject: [PATCH] Refactor test_email/test_defect_handling. --- Lib/test/test_email/test_defect_handling.py | 383 ++++++++++---------- Lib/test/test_email/test_email.py | 2 +- 2 files changed, 184 insertions(+), 201 deletions(-) diff --git a/Lib/test/test_email/test_defect_handling.py b/Lib/test/test_email/test_defect_handling.py index fe725998a54..305432df37a 100644 --- a/Lib/test/test_email/test_defect_handling.py +++ b/Lib/test/test_email/test_defect_handling.py @@ -1,84 +1,89 @@ import textwrap import unittest -from email._policybase import Compat32 +import contextlib +from email import policy from email import errors from test.test_email import TestEmailBase -class TestMessageDefectDetectionBase: +class TestDefectsBase: - dup_boundary_msg = textwrap.dedent("""\ - Subject: XX - From: xx@xx.dk - To: XX - Mime-version: 1.0 - Content-type: multipart/mixed; - boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + policy = policy.default + raise_expected = False - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: multipart/alternative; - boundary="MS_Mac_OE_3071477847_720252_MIME_Part" - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: text/plain; charset="ISO-8859-1" - Content-transfer-encoding: quoted-printable - - text - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: text/html; charset="ISO-8859-1" - Content-transfer-encoding: quoted-printable - - - - --MS_Mac_OE_3071477847_720252_MIME_Part-- - - --MS_Mac_OE_3071477847_720252_MIME_Part - Content-type: image/gif; name="xx.gif"; - Content-disposition: attachment - Content-transfer-encoding: base64 - - Some removed base64 encoded chars. - - --MS_Mac_OE_3071477847_720252_MIME_Part-- - - """) + @contextlib.contextmanager + def _raise_point(self, defect): + yield def test_same_boundary_inner_outer(self): + source = textwrap.dedent("""\ + Subject: XX + From: xx@xx.dk + To: XX + Mime-version: 1.0 + Content-type: multipart/mixed; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: multipart/alternative; + boundary="MS_Mac_OE_3071477847_720252_MIME_Part" + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/plain; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + text + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: text/html; charset="ISO-8859-1" + Content-transfer-encoding: quoted-printable + + + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + --MS_Mac_OE_3071477847_720252_MIME_Part + Content-type: image/gif; name="xx.gif"; + Content-disposition: attachment + Content-transfer-encoding: base64 + + Some removed base64 encoded chars. + + --MS_Mac_OE_3071477847_720252_MIME_Part-- + + """) # XXX better would be to actually detect the duplicate. - msg = self._str_msg(self.dup_boundary_msg) + with self._raise_point(errors.StartBoundaryNotFoundDefect): + msg = self._str_msg(source) + if self.raise_expected: return inner = msg.get_payload(0) self.assertTrue(hasattr(inner, 'defects')) self.assertEqual(len(self.get_defects(inner)), 1) self.assertTrue(isinstance(self.get_defects(inner)[0], errors.StartBoundaryNotFoundDefect)) - def test_same_boundary_inner_outer_raises_on_defect(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._str_msg(self.dup_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - - no_boundary_msg = textwrap.dedent("""\ - Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) - From: foobar - Subject: broken mail - MIME-Version: 1.0 - Content-Type: multipart/report; report-type=delivery-status; - - --JAB03225.986577786/zinfandel.lacita.com - - One part - - --JAB03225.986577786/zinfandel.lacita.com - Content-Type: message/delivery-status - - Header: Another part - - --JAB03225.986577786/zinfandel.lacita.com-- - """) - def test_multipart_no_boundary(self): - msg = self._str_msg(self.no_boundary_msg) + source = textwrap.dedent("""\ + Date: Fri, 6 Apr 2001 09:23:06 -0800 (GMT-0800) + From: foobar + Subject: broken mail + MIME-Version: 1.0 + Content-Type: multipart/report; report-type=delivery-status; + + --JAB03225.986577786/zinfandel.lacita.com + + One part + + --JAB03225.986577786/zinfandel.lacita.com + Content-Type: message/delivery-status + + Header: Another part + + --JAB03225.986577786/zinfandel.lacita.com-- + """) + with self._raise_point(errors.NoBoundaryInMultipartDefect): + msg = self._str_msg(source) + if self.raise_expected: return self.assertTrue(isinstance(msg.get_payload(), str)) self.assertEqual(len(self.get_defects(msg)), 2) self.assertTrue(isinstance(self.get_defects(msg)[0], @@ -86,11 +91,6 @@ class TestMessageDefectDetectionBase: self.assertTrue(isinstance(self.get_defects(msg)[1], errors.MultipartInvariantViolationDefect)) - def test_multipart_no_boundary_raise_on_defect(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._str_msg(self.no_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - multipart_msg = textwrap.dedent("""\ Date: Wed, 14 Nov 2007 12:56:23 GMT From: foo@bar.invalid @@ -115,43 +115,42 @@ class TestMessageDefectDetectionBase: """) def test_multipart_invalid_cte(self): - msg = self._str_msg( - self.multipart_msg.format("\nContent-Transfer-Encoding: base64")) + with self._raise_point( + errors.InvalidMultipartContentTransferEncodingDefect): + msg = self._str_msg( + self.multipart_msg.format( + "\nContent-Transfer-Encoding: base64")) + if self.raise_expected: return self.assertEqual(len(self.get_defects(msg)), 1) self.assertIsInstance(self.get_defects(msg)[0], errors.InvalidMultipartContentTransferEncodingDefect) - def test_multipart_invalid_cte_raise_on_defect(self): - with self.assertRaises( - errors.InvalidMultipartContentTransferEncodingDefect): - self._str_msg( - self.multipart_msg.format( - "\nContent-Transfer-Encoding: base64"), - policy=self.policy.clone(raise_on_defect=True)) - def test_multipart_no_cte_no_defect(self): + if self.raise_expected: return msg = self._str_msg(self.multipart_msg.format('')) self.assertEqual(len(self.get_defects(msg)), 0) def test_multipart_valid_cte_no_defect(self): + if self.raise_expected: return for cte in ('7bit', '8bit', 'BINary'): msg = self._str_msg( self.multipart_msg.format("\nContent-Transfer-Encoding: "+cte)) self.assertEqual(len(self.get_defects(msg)), 0, "cte="+cte) - lying_multipart_msg = textwrap.dedent("""\ - From: "Allison Dunlap" - To: yyy@example.com - Subject: 64423 - Date: Sun, 11 Jul 2004 16:09:27 -0300 - MIME-Version: 1.0 - Content-Type: multipart/alternative; - - Blah blah blah - """) - def test_lying_multipart(self): - msg = self._str_msg(self.lying_multipart_msg) + source = textwrap.dedent("""\ + From: "Allison Dunlap" + To: yyy@example.com + Subject: 64423 + Date: Sun, 11 Jul 2004 16:09:27 -0300 + MIME-Version: 1.0 + Content-Type: multipart/alternative; + + Blah blah blah + """) + with self._raise_point(errors.NoBoundaryInMultipartDefect): + msg = self._str_msg(source) + if self.raise_expected: return self.assertTrue(hasattr(msg, 'defects')) self.assertEqual(len(self.get_defects(msg)), 2) self.assertTrue(isinstance(self.get_defects(msg)[0], @@ -159,34 +158,28 @@ class TestMessageDefectDetectionBase: self.assertTrue(isinstance(self.get_defects(msg)[1], errors.MultipartInvariantViolationDefect)) - def test_lying_multipart_raise_on_defect(self): - with self.assertRaises(errors.NoBoundaryInMultipartDefect): - self._str_msg(self.lying_multipart_msg, - policy=self.policy.clone(raise_on_defect=True)) - - missing_start_boundary_msg = textwrap.dedent("""\ - Content-Type: multipart/mixed; boundary="AAA" - From: Mail Delivery Subsystem - To: yyy@example.com - - --AAA - - Stuff - - --AAA - Content-Type: message/rfc822 - - From: webmaster@python.org - To: zzz@example.com - Content-Type: multipart/mixed; boundary="BBB" - - --BBB-- - - --AAA-- - - """) - def test_missing_start_boundary(self): + source = textwrap.dedent("""\ + Content-Type: multipart/mixed; boundary="AAA" + From: Mail Delivery Subsystem + To: yyy@example.com + + --AAA + + Stuff + + --AAA + Content-Type: message/rfc822 + + From: webmaster@python.org + To: zzz@example.com + Content-Type: multipart/mixed; boundary="BBB" + + --BBB-- + + --AAA-- + + """) # The message structure is: # # multipart/mixed @@ -195,19 +188,18 @@ class TestMessageDefectDetectionBase: # multipart/mixed [*] # # [*] This message is missing its start boundary - outer = self._str_msg(self.missing_start_boundary_msg) + with self._raise_point(errors.StartBoundaryNotFoundDefect): + outer = self._str_msg(source) + if self.raise_expected: return bad = outer.get_payload(1).get_payload(0) self.assertEqual(len(self.get_defects(bad)), 1) self.assertTrue(isinstance(self.get_defects(bad)[0], errors.StartBoundaryNotFoundDefect)) - def test_missing_start_boundary_raise_on_defect(self): - with self.assertRaises(errors.StartBoundaryNotFoundDefect): - self._str_msg(self.missing_start_boundary_msg, - policy=self.policy.clone(raise_on_defect=True)) - def test_first_line_is_continuation_header(self): - msg = self._str_msg(' Line 1\nSubject: test\n\nbody') + with self._raise_point(errors.FirstHeaderLineIsContinuationDefect): + msg = self._str_msg(' Line 1\nSubject: test\n\nbody') + if self.raise_expected: return self.assertEqual(msg.keys(), ['Subject']) self.assertEqual(msg.get_payload(), 'body') self.assertEqual(len(self.get_defects(msg)), 1) @@ -215,113 +207,92 @@ class TestMessageDefectDetectionBase: [errors.FirstHeaderLineIsContinuationDefect]) self.assertEqual(self.get_defects(msg)[0].line, ' Line 1\n') - def test_first_line_is_continuation_header_raise_on_defect(self): - with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect): - self._str_msg(' Line 1\nSubject: test\n\nbody\n', - policy=self.policy.clone(raise_on_defect=True)) - def test_missing_header_body_separator(self): # Our heuristic if we see a line that doesn't look like a header (no # leading whitespace but no ':') is to assume that the blank line that # separates the header from the body is missing, and to stop parsing # headers and start parsing the body. - msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') + with self._raise_point(errors.MissingHeaderBodySeparatorDefect): + msg = self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n') + if self.raise_expected: return self.assertEqual(msg.keys(), ['Subject']) self.assertEqual(msg.get_payload(), 'not a header\nTo: abc\n\nb\n') self.assertDefectsEqual(self.get_defects(msg), [errors.MissingHeaderBodySeparatorDefect]) - def test_missing_header_body_separator_raise_on_defect(self): - with self.assertRaises(errors.MissingHeaderBodySeparatorDefect): - self._str_msg('Subject: test\nnot a header\nTo: abc\n\nb\n', - policy=self.policy.clone(raise_on_defect=True)) - - badly_padded_base64_payload = textwrap.dedent("""\ - Subject: test - MIME-Version: 1.0 - Content-Type: text/plain; charset="utf-8" - Content-Transfer-Encoding: base64 - - dmk - """) - def test_bad_padding_in_base64_payload(self): - msg = self._str_msg(self.badly_padded_base64_payload) - self.assertEqual(msg.get_payload(decode=True), b'vi') + source = textwrap.dedent("""\ + Subject: test + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + dmk + """) + msg = self._str_msg(source) + with self._raise_point(errors.InvalidBase64PaddingDefect): + payload = msg.get_payload(decode=True) + if self.raise_expected: return + self.assertEqual(payload, b'vi') self.assertDefectsEqual(self.get_defects(msg), [errors.InvalidBase64PaddingDefect]) - def test_bad_padding_in_base64_payload_raise_on_defect(self): - msg = self._str_msg(self.badly_padded_base64_payload, - policy=self.policy.clone(raise_on_defect=True)) - with self.assertRaises(errors.InvalidBase64PaddingDefect): - msg.get_payload(decode=True) - - invalid_chars_in_base64_payload = textwrap.dedent("""\ - Subject: test - MIME-Version: 1.0 - Content-Type: text/plain; charset="utf-8" - Content-Transfer-Encoding: base64 - - dm\x01k=== - """) - def test_invalid_chars_in_base64_payload(self): - msg = self._str_msg(self.invalid_chars_in_base64_payload) - self.assertEqual(msg.get_payload(decode=True), b'vi') + source = textwrap.dedent("""\ + Subject: test + MIME-Version: 1.0 + Content-Type: text/plain; charset="utf-8" + Content-Transfer-Encoding: base64 + + dm\x01k=== + """) + msg = self._str_msg(source) + with self._raise_point(errors.InvalidBase64CharactersDefect): + payload = msg.get_payload(decode=True) + if self.raise_expected: return + self.assertEqual(payload, b'vi') self.assertDefectsEqual(self.get_defects(msg), [errors.InvalidBase64CharactersDefect]) - def test_invalid_chars_in_base64_payload_raise_on_defect(self): - msg = self._str_msg(self.invalid_chars_in_base64_payload, - policy=self.policy.clone(raise_on_defect=True)) - with self.assertRaises(errors.InvalidBase64CharactersDefect): - msg.get_payload(decode=True) - - missing_ending_boundary = textwrap.dedent("""\ - To: 1@harrydomain4.com - Subject: Fwd: 1 - MIME-Version: 1.0 - Content-Type: multipart/alternative; - boundary="------------000101020201080900040301" - - --------------000101020201080900040301 - Content-Type: text/plain; charset=ISO-8859-1 - Content-Transfer-Encoding: 7bit - - Alternative 1 - - --------------000101020201080900040301 - Content-Type: text/html; charset=ISO-8859-1 - Content-Transfer-Encoding: 7bit - - Alternative 2 - - """) - def test_missing_ending_boundary(self): - msg = self._str_msg(self.missing_ending_boundary) + source = textwrap.dedent("""\ + To: 1@harrydomain4.com + Subject: Fwd: 1 + MIME-Version: 1.0 + Content-Type: multipart/alternative; + boundary="------------000101020201080900040301" + + --------------000101020201080900040301 + Content-Type: text/plain; charset=ISO-8859-1 + Content-Transfer-Encoding: 7bit + + Alternative 1 + + --------------000101020201080900040301 + Content-Type: text/html; charset=ISO-8859-1 + Content-Transfer-Encoding: 7bit + + Alternative 2 + + """) + with self._raise_point(errors.CloseBoundaryNotFoundDefect): + msg = self._str_msg(source) + if self.raise_expected: return self.assertEqual(len(msg.get_payload()), 2) self.assertEqual(msg.get_payload(1).get_payload(), 'Alternative 2\n') self.assertDefectsEqual(self.get_defects(msg), [errors.CloseBoundaryNotFoundDefect]) - def test_missing_ending_boundary_raise_on_defect(self): - with self.assertRaises(errors.CloseBoundaryNotFoundDefect): - self._str_msg(self.missing_ending_boundary, - policy=self.policy.clone(raise_on_defect=True)) - -class TestMessageDefectDetection(TestMessageDefectDetectionBase, TestEmailBase): +class TestDefectDetection(TestDefectsBase, TestEmailBase): def get_defects(self, obj): return obj.defects -class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, - TestEmailBase): +class TestDefectCapture(TestDefectsBase, TestEmailBase): - class CapturePolicy(Compat32): + class CapturePolicy(policy.EmailPolicy): captured = None def register_defect(self, obj, defect): self.captured.append(defect) @@ -333,5 +304,17 @@ class TestMessageDefectDetectionCapture(TestMessageDefectDetectionBase, return self.policy.captured +class TestDefectRaising(TestDefectsBase, TestEmailBase): + + policy = TestDefectsBase.policy + policy = policy.clone(raise_on_defect=True) + raise_expected = True + + @contextlib.contextmanager + def _raise_point(self, defect): + with self.assertRaises(defect): + yield + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_email/test_email.py b/Lib/test/test_email/test_email.py index 5131b65f701..3dda9213104 100644 --- a/Lib/test/test_email/test_email.py +++ b/Lib/test/test_email/test_email.py @@ -1973,7 +1973,7 @@ counter to RFC 2822, there's no separating newline here [errors.FirstHeaderLineIsContinuationDefect]) eq(msg.defects[0].line, ' Line 1\n') - # test_parser.TestMessageDefectDetectionBase + # test_defect_handling def test_missing_header_body_separator(self): # Our heuristic if we see a line that doesn't look like a header (no # leading whitespace but no ':') is to assume that the blank line that