#10686: recode non-ASCII headers to 'unknown-8bit' instead of ?s.

This applies only when generating strings from non-RFC compliant binary
input; it makes the existing recoding behavior more consistent (ie:
now no data is lost when recoding).
This commit is contained in:
R. David Murray 2011-01-07 23:25:30 +00:00
parent 6f0022d84a
commit 9253214fd9
9 changed files with 109 additions and 62 deletions

View File

@ -79,8 +79,8 @@ Here are the public methods of the :class:`Generator` class, imported from the
Messages parsed with a Bytes parser that have a Messages parsed with a Bytes parser that have a
:mailheader:`Content-Transfer-Encoding` of 8bit will be converted to a :mailheader:`Content-Transfer-Encoding` of 8bit will be converted to a
use a 7bit Content-Transfer-Encoding. Any other non-ASCII bytes in the use a 7bit Content-Transfer-Encoding. Non-ASCII bytes in the headers
message structure will be converted to '?' characters. will be :rfc:`2047` encoded with a charset of `unknown-8bit`.
.. versionchanged:: 3.2 .. versionchanged:: 3.2
Added support for re-encoding 8bit message bodies, and the *linesep* Added support for re-encoding 8bit message bodies, and the *linesep*

View File

@ -130,8 +130,14 @@ Here is the :class:`Header` class description:
.. method:: __str__() .. method:: __str__()
A helper for :class:`str`'s :func:`encode` method. Returns the header as Returns an approximation of the :class:`Header` as a string, using an
a Unicode string. unlimited line length. All pieces are converted to unicode using the
specified encoding and joined together appropriately. Any pieces with a
charset of `unknown-8bit` are decoded as `ASCII` using the `replace`
error handler.
.. versionchanged:: 3.2
Added handling for the `unknown-8bit` charset.
.. method:: __eq__(other) .. method:: __eq__(other)

View File

@ -169,9 +169,10 @@ Here are the methods of the :class:`Message` class:
Note that in all cases, any envelope header present in the message is not Note that in all cases, any envelope header present in the message is not
included in the mapping interface. included in the mapping interface.
In a model generated from bytes, any header values that (in contravention In a model generated from bytes, any header values that (in contravention of
of the RFCs) contain non-ASCII bytes will have those bytes transformed the RFCs) contain non-ASCII bytes will, when retrieved through this
into '?' characters when the values are retrieved through this interface. interface, be represented as :class:`~email.header.Header` objects with
a charset of `unknown-8bit`.
.. method:: __len__() .. method:: __len__()

View File

@ -618,6 +618,8 @@ format.
* Given bytes input to the model, :class:`~email.generator.Generator` will * Given bytes input to the model, :class:`~email.generator.Generator` will
convert message bodies that have a :mailheader:`Content-Transfer-Encoding` of convert message bodies that have a :mailheader:`Content-Transfer-Encoding` of
*8bit* to instead have a *7bit* :mailheader:`Content-Transfer-Encoding`. *8bit* to instead have a *7bit* :mailheader:`Content-Transfer-Encoding`.
XXX: Headers with Un-encoded non-ASCII bytes will be :rfc:`2047`\ -encoded
using the charset `unknown-8bit`.
* A new class :class:`~email.generator.BytesGenerator` produces bytes as output, * A new class :class:`~email.generator.BytesGenerator` produces bytes as output,
preserving any unchanged non-ASCII data that was present in the input used to preserving any unchanged non-ASCII data that was present in the input used to

View File

@ -28,6 +28,7 @@ SHORTEST = 3 # the shorter of QP and base64, but only for headers
RFC2047_CHROME_LEN = 7 RFC2047_CHROME_LEN = 7
DEFAULT_CHARSET = 'us-ascii' DEFAULT_CHARSET = 'us-ascii'
UNKNOWN8BIT = 'unknown-8bit'
EMPTYSTRING = '' EMPTYSTRING = ''
@ -152,6 +153,16 @@ def add_codec(charset, codecname):
CODEC_MAP[charset] = codecname CODEC_MAP[charset] = codecname
# Convenience function for encoding strings, taking into account
# that they might be unknown-8bit (ie: have surrogate-escaped bytes)
def _encode(string, codec):
if codec == UNKNOWN8BIT:
return string.encode('ascii', 'surrogateescape')
else:
return string.encode(codec)
class Charset: class Charset:
"""Map character sets to their email properties. """Map character sets to their email properties.
@ -282,8 +293,7 @@ class Charset:
:return: The encoded string, with RFC 2047 chrome. :return: The encoded string, with RFC 2047 chrome.
""" """
codec = self.output_codec or 'us-ascii' codec = self.output_codec or 'us-ascii'
charset = self.get_output_charset() header_bytes = _encode(string, codec)
header_bytes = string.encode(codec)
# 7bit/8bit encodings return the string unchanged (modulo conversions) # 7bit/8bit encodings return the string unchanged (modulo conversions)
encoder_module = self._get_encoder(header_bytes) encoder_module = self._get_encoder(header_bytes)
if encoder_module is None: if encoder_module is None:
@ -309,7 +319,7 @@ class Charset:
""" """
# See which encoding we should use. # See which encoding we should use.
codec = self.output_codec or 'us-ascii' codec = self.output_codec or 'us-ascii'
header_bytes = string.encode(codec) header_bytes = _encode(string, codec)
encoder_module = self._get_encoder(header_bytes) encoder_module = self._get_encoder(header_bytes)
encoder = partial(encoder_module.header_encode, charset=str(self)) encoder = partial(encoder_module.header_encode, charset=str(self))
# Calculate the number of characters that the RFC 2047 chrome will # Calculate the number of characters that the RFC 2047 chrome will
@ -333,7 +343,7 @@ class Charset:
for character in string: for character in string:
current_line.append(character) current_line.append(character)
this_line = EMPTYSTRING.join(current_line) this_line = EMPTYSTRING.join(current_line)
length = encoder_module.header_length(this_line.encode(charset)) length = encoder_module.header_length(_encode(this_line, charset))
if length > maxlen: if length > maxlen:
# This last character doesn't fit so pop it off. # This last character doesn't fit so pop it off.
current_line.pop() current_line.pop()
@ -343,12 +353,12 @@ class Charset:
else: else:
separator = (' ' if lines else '') separator = (' ' if lines else '')
joined_line = EMPTYSTRING.join(current_line) joined_line = EMPTYSTRING.join(current_line)
header_bytes = joined_line.encode(codec) header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes)) lines.append(encoder(header_bytes))
current_line = [character] current_line = [character]
maxlen = next(maxlengths) - extra maxlen = next(maxlengths) - extra
joined_line = EMPTYSTRING.join(current_line) joined_line = EMPTYSTRING.join(current_line)
header_bytes = joined_line.encode(codec) header_bytes = _encode(joined_line, codec)
lines.append(encoder(header_bytes)) lines.append(encoder(header_bytes))
return lines return lines

View File

@ -17,7 +17,8 @@ import email.quoprimime
import email.base64mime import email.base64mime
from email.errors import HeaderParseError from email.errors import HeaderParseError
from email.charset import Charset from email import charset as _charset
Charset = _charset.Charset
NL = '\n' NL = '\n'
SPACE = ' ' SPACE = ' '
@ -210,6 +211,9 @@ class Header:
# from a charset to None/us-ascii, or from None/us-ascii to a # from a charset to None/us-ascii, or from None/us-ascii to a
# charset. Only do this for the second and subsequent chunks. # charset. Only do this for the second and subsequent chunks.
nextcs = charset nextcs = charset
if nextcs == _charset.UNKNOWN8BIT:
original_bytes = string.encode('ascii', 'surrogateescape')
string = original_bytes.decode('ascii', 'replace')
if uchunks: if uchunks:
if lastcs not in (None, 'us-ascii'): if lastcs not in (None, 'us-ascii'):
if nextcs in (None, 'us-ascii'): if nextcs in (None, 'us-ascii'):
@ -263,7 +267,8 @@ class Header:
# Ensure that the bytes we're storing can be decoded to the output # Ensure that the bytes we're storing can be decoded to the output
# character set, otherwise an early error is thrown. # character set, otherwise an early error is thrown.
output_charset = charset.output_codec or 'us-ascii' output_charset = charset.output_codec or 'us-ascii'
s.encode(output_charset, errors) if output_charset != _charset.UNKNOWN8BIT:
s.encode(output_charset, errors)
self._chunks.append((s, charset)) self._chunks.append((s, charset))
def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'): def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):

View File

@ -16,7 +16,9 @@ from io import BytesIO, StringIO
# Intrapackage imports # Intrapackage imports
from email import utils from email import utils
from email import errors from email import errors
from email.charset import Charset from email import header
from email import charset as _charset
Charset = _charset.Charset
SEMISPACE = '; ' SEMISPACE = '; '
@ -31,16 +33,15 @@ _has_surrogates = re.compile(
# Helper functions # Helper functions
def _sanitize_surrogates(value): def _sanitize_header(name, value):
# If the value contains surrogates, re-decode and replace the original # If the header value contains surrogates, return a Header using
# non-ascii bytes with '?'s. Used to sanitize header values before letting # the unknown-8bit charset to encode the bytes as encoded words.
# them escape as strings.
if not isinstance(value, str): if not isinstance(value, str):
# Header object # Assume it is already a header object
return value return value
if _has_surrogates(value): if _has_surrogates(value):
original_bytes = value.encode('ascii', 'surrogateescape') return header.Header(value, charset=_charset.UNKNOWN8BIT,
return original_bytes.decode('ascii', 'replace').replace('\ufffd', '?') header_name=name)
else: else:
return value return value
@ -398,7 +399,7 @@ class Message:
Any fields deleted and re-inserted are always appended to the header Any fields deleted and re-inserted are always appended to the header
list. list.
""" """
return [_sanitize_surrogates(v) for k, v in self._headers] return [_sanitize_header(k, v) for k, v in self._headers]
def items(self): def items(self):
"""Get all the message's header fields and values. """Get all the message's header fields and values.
@ -408,7 +409,7 @@ class Message:
Any fields deleted and re-inserted are always appended to the header Any fields deleted and re-inserted are always appended to the header
list. list.
""" """
return [(k, _sanitize_surrogates(v)) for k, v in self._headers] return [(k, _sanitize_header(k, v)) for k, v in self._headers]
def get(self, name, failobj=None): def get(self, name, failobj=None):
"""Get a header value. """Get a header value.
@ -419,7 +420,7 @@ class Message:
name = name.lower() name = name.lower()
for k, v in self._headers: for k, v in self._headers:
if k.lower() == name: if k.lower() == name:
return _sanitize_surrogates(v) return _sanitize_header(k, v)
return failobj return failobj
# #
@ -439,7 +440,7 @@ class Message:
name = name.lower() name = name.lower()
for k, v in self._headers: for k, v in self._headers:
if k.lower() == name: if k.lower() == name:
values.append(_sanitize_surrogates(v)) values.append(_sanitize_header(k, v))
if not values: if not values:
return failobj return failobj
return values return values

View File

@ -2841,7 +2841,7 @@ class Test8BitBytesHandling(unittest.TestCase):
cte='8bit', cte='8bit',
bodyline='pöstal').encode('utf-8') bodyline='pöstal').encode('utf-8')
msg = email.message_from_bytes(m) msg = email.message_from_bytes(m)
self.assertEqual(msg.get_payload(), "p<EFBFBD><EFBFBD>stal\n") self.assertEqual(msg.get_payload(), "p\uFFFD\uFFFDstal\n")
self.assertEqual(msg.get_payload(decode=True), self.assertEqual(msg.get_payload(decode=True),
"pöstal\n".encode('utf-8')) "pöstal\n".encode('utf-8'))
@ -2874,7 +2874,7 @@ class Test8BitBytesHandling(unittest.TestCase):
cte='quoted-printable', cte='quoted-printable',
bodyline='p=C3=B6stál').encode('utf-8') bodyline='p=C3=B6stál').encode('utf-8')
msg = email.message_from_bytes(m) msg = email.message_from_bytes(m)
self.assertEqual(msg.get_payload(), 'p=C3=B6st<EFBFBD><EFBFBD>l\n') self.assertEqual(msg.get_payload(), 'p=C3=B6st\uFFFD\uFFFDl\n')
self.assertEqual(msg.get_payload(decode=True), self.assertEqual(msg.get_payload(decode=True),
'pöstál\n'.encode('utf-8')) 'pöstál\n'.encode('utf-8'))
@ -2899,52 +2899,65 @@ class Test8BitBytesHandling(unittest.TestCase):
'<,.V<W1A; á \n'.encode('utf-8')) '<,.V<W1A; á \n'.encode('utf-8'))
headertest_msg = textwrap.dedent("""\ headertest_headers = (
From: foo@bar.com ('From: foo@bar.com', ('From', 'foo@bar.com')),
To: báz ('To: báz', ('To', '=?unknown-8bit?q?b=C3=A1z?=')),
Subject: Maintenant je vous présente mon collègue, le pouf célèbre ('Subject: Maintenant je vous présente mon collègue, le pouf célèbre\n'
\tJean de Baddie '\tJean de Baddie',
From: göst ('Subject', '=?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_'
'coll=C3=A8gue=2C_le_pouf_c=C3=A9l=C3=A8bre?=\n'
Yes, they are flying. ' =?unknown-8bit?q?_Jean_de_Baddie?=')),
""").encode('utf-8') ('From: göst', ('From', '=?unknown-8bit?b?Z8O2c3Q=?=')),
)
headertest_msg = ('\n'.join([src for (src, _) in headertest_headers]) +
'\nYes, they are flying.\n').encode('utf-8')
def test_get_8bit_header(self): def test_get_8bit_header(self):
msg = email.message_from_bytes(self.headertest_msg) msg = email.message_from_bytes(self.headertest_msg)
self.assertEqual(msg.get('to'), 'b??z') self.assertEqual(str(msg.get('to')), 'b\uFFFD\uFFFDz')
self.assertEqual(msg['to'], 'b??z') self.assertEqual(str(msg['to']), 'b\uFFFD\uFFFDz')
def test_print_8bit_headers(self): def test_print_8bit_headers(self):
msg = email.message_from_bytes(self.headertest_msg) msg = email.message_from_bytes(self.headertest_msg)
self.assertEqual(str(msg), self.assertEqual(str(msg),
self.headertest_msg.decode( textwrap.dedent("""\
'ascii', 'replace').replace('<EFBFBD>', '?')) From: {}
To: {}
Subject: {}
From: {}
Yes, they are flying.
""").format(*[expected[1] for (_, expected) in
self.headertest_headers]))
def test_values_with_8bit_headers(self): def test_values_with_8bit_headers(self):
msg = email.message_from_bytes(self.headertest_msg) msg = email.message_from_bytes(self.headertest_msg)
self.assertListEqual(msg.values(), self.assertListEqual([str(x) for x in msg.values()],
['foo@bar.com', ['foo@bar.com',
'b??z', 'b\uFFFD\uFFFDz',
'Maintenant je vous pr??sente mon ' 'Maintenant je vous pr\uFFFD\uFFFDsente mon '
'coll??gue, le pouf c??l??bre\n' 'coll\uFFFD\uFFFDgue, le pouf '
'c\uFFFD\uFFFDl\uFFFD\uFFFDbre\n'
'\tJean de Baddie', '\tJean de Baddie',
"g??st"]) "g\uFFFD\uFFFDst"])
def test_items_with_8bit_headers(self): def test_items_with_8bit_headers(self):
msg = email.message_from_bytes(self.headertest_msg) msg = email.message_from_bytes(self.headertest_msg)
self.assertListEqual(msg.items(), self.assertListEqual([(str(x), str(y)) for (x, y) in msg.items()],
[('From', 'foo@bar.com'), [('From', 'foo@bar.com'),
('To', 'b??z'), ('To', 'b\uFFFD\uFFFDz'),
('Subject', 'Maintenant je vous pr??sente mon ' ('Subject', 'Maintenant je vous '
'coll??gue, le pouf c??l??bre\n' 'pr\uFFFD\uFFFDsente '
'\tJean de Baddie'), 'mon coll\uFFFD\uFFFDgue, le pouf '
('From', 'g??st')]) 'c\uFFFD\uFFFDl\uFFFD\uFFFDbre\n'
'\tJean de Baddie'),
('From', 'g\uFFFD\uFFFDst')])
def test_get_all_with_8bit_headers(self): def test_get_all_with_8bit_headers(self):
msg = email.message_from_bytes(self.headertest_msg) msg = email.message_from_bytes(self.headertest_msg)
self.assertListEqual(msg.get_all('from'), self.assertListEqual([str(x) for x in msg.get_all('from')],
['foo@bar.com', ['foo@bar.com',
'g??st']) 'g\uFFFD\uFFFDst'])
non_latin_bin_msg = textwrap.dedent("""\ non_latin_bin_msg = textwrap.dedent("""\
From: foo@bar.com From: foo@bar.com
@ -2964,13 +2977,12 @@ class Test8BitBytesHandling(unittest.TestCase):
email.generator.BytesGenerator(out).flatten(msg) email.generator.BytesGenerator(out).flatten(msg)
self.assertEqual(out.getvalue(), self.non_latin_bin_msg) self.assertEqual(out.getvalue(), self.non_latin_bin_msg)
# XXX: ultimately the '?' should turn into CTE encoded bytes non_latin_bin_msg_as7bit_wrapped = textwrap.dedent("""\
# using 'unknown-8bit' charset.
non_latin_bin_msg_as7bit = textwrap.dedent("""\
From: foo@bar.com From: foo@bar.com
To: b??z To: =?unknown-8bit?q?b=C3=A1z?=
Subject: Maintenant je vous pr??sente mon coll??gue, le pouf c??l??bre Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_coll=C3=A8gue?=
\tJean de Baddie =?unknown-8bit?q?=2C_le_pouf_c=C3=A9l=C3=A8bre?=
=?unknown-8bit?q?_Jean_de_Baddie?=
Mime-Version: 1.0 Mime-Version: 1.0
Content-Type: text/plain; charset="utf-8" Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64 Content-Transfer-Encoding: base64
@ -2982,7 +2994,7 @@ class Test8BitBytesHandling(unittest.TestCase):
msg = email.message_from_bytes(self.non_latin_bin_msg) msg = email.message_from_bytes(self.non_latin_bin_msg)
out = StringIO() out = StringIO()
email.generator.Generator(out).flatten(msg) email.generator.Generator(out).flatten(msg)
self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit) self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit_wrapped)
def test_bytes_generator_with_unix_from(self): def test_bytes_generator_with_unix_from(self):
# The unixfrom contains a current date, so we can't check it # The unixfrom contains a current date, so we can't check it
@ -2995,6 +3007,12 @@ class Test8BitBytesHandling(unittest.TestCase):
self.assertEqual(lines[0].split()[0], b'From') self.assertEqual(lines[0].split()[0], b'From')
self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg) self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg)
non_latin_bin_msg_as7bit = non_latin_bin_msg_as7bit_wrapped.split('\n')
non_latin_bin_msg_as7bit[2:4] = [
'Subject: =?unknown-8bit?q?Maintenant_je_vous_pr=C3=A9sente_mon_'
'coll=C3=A8gue=2C_le_pouf_c=C3=A9l=C3=A8bre?=']
non_latin_bin_msg_as7bit = '\n'.join(non_latin_bin_msg_as7bit)
def test_message_from_binary_file(self): def test_message_from_binary_file(self):
fn = 'test.msg' fn = 'test.msg'
self.addCleanup(unlink, fn) self.addCleanup(unlink, fn)

View File

@ -40,6 +40,10 @@ Core and Builtins
Library Library
------- -------
- Issue #10686: the email package now :rfc:`2047`\ -encodes headers with
non-ASCII bytes (parsed by a Bytes Parser) when doing conversion to
7bit-clean presentation, instead of replacing them with ?s.
- email.header.Header was incorrectly encoding folding white space when - email.header.Header was incorrectly encoding folding white space when
rfc2047-encoding header values with embedded newlines, leaving them rfc2047-encoding header values with embedded newlines, leaving them
without folding whitespace. It now uses the continuation_ws, as it without folding whitespace. It now uses the continuation_ws, as it