gh-121650: Encode newlines in headers, and verify headers are sound (GH-122233)

## Encode header parts that contain newlines

Per RFC 2047:

> [...] these encoding schemes allow the
> encoding of arbitrary octet values, mail readers that implement this
> decoding should also ensure that display of the decoded data on the
> recipient's terminal will not cause unwanted side-effects

It seems that the "quoted-word" scheme is a valid way to include
a newline character in a header value, just like we already allow
undecodable bytes or control characters.
They do need to be properly quoted when serialized to text, though.


## Verify that email headers are well-formed

This should fail for custom fold() implementations that aren't careful
about newlines.


Co-authored-by: Bas Bloemsaat <bas@bloemsaat.org>
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
Petr Viktorin 2024-07-31 00:19:48 +02:00 committed by GitHub
parent 5912487938
commit 0976339818
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 160 additions and 4 deletions

View File

@ -58,6 +58,13 @@ The following exception classes are defined in the :mod:`email.errors` module:
:class:`~email.mime.nonmultipart.MIMENonMultipart` (e.g. :class:`~email.mime.nonmultipart.MIMENonMultipart` (e.g.
:class:`~email.mime.image.MIMEImage`). :class:`~email.mime.image.MIMEImage`).
.. exception:: HeaderWriteError()
Raised when an error occurs when the :mod:`~email.generator` outputs
headers.
.. exception:: MessageDefect() .. exception:: MessageDefect()
This is the base class for all defects found when parsing email messages. This is the base class for all defects found when parsing email messages.

View File

@ -229,6 +229,24 @@ added matters. To illustrate::
.. versionadded:: 3.6 .. versionadded:: 3.6
.. attribute:: verify_generated_headers
If ``True`` (the default), the generator will raise
:exc:`~email.errors.HeaderWriteError` instead of writing a header
that is improperly folded or delimited, such that it would
be parsed as multiple headers or joined with adjacent data.
Such headers can be generated by custom header classes or bugs
in the ``email`` module.
As it's a security feature, this defaults to ``True`` even in the
:class:`~email.policy.Compat32` policy.
For backwards compatible, but unsafe, behavior, it must be set to
``False`` explicitly.
.. versionadded:: 3.13
The following :class:`Policy` method is intended to be called by code using The following :class:`Policy` method is intended to be called by code using
the email library to create policy instances with custom settings: the email library to create policy instances with custom settings:

View File

@ -736,6 +736,15 @@ doctest
email email
----- -----
* Headers with embedded newlines are now quoted on output.
The :mod:`~email.generator` will now refuse to serialize (write) headers
that are improperly folded or delimited, such that they would be parsed as
multiple headers or joined with adjacent data.
If you need to turn this safety feature off,
set :attr:`~email.policy.Policy.verify_generated_headers`.
(Contributed by Bas Bloemsaat and Petr Viktorin in :gh:`121650`.)
* :func:`email.utils.getaddresses` and :func:`email.utils.parseaddr` now return * :func:`email.utils.getaddresses` and :func:`email.utils.parseaddr` now return
``('', '')`` 2-tuples in more situations where invalid email addresses are ``('', '')`` 2-tuples in more situations where invalid email addresses are
encountered instead of potentially inaccurate values. Add optional *strict* encountered instead of potentially inaccurate values. Add optional *strict*

View File

@ -92,6 +92,8 @@ TOKEN_ENDS = TSPECIALS | WSP
ASPECIALS = TSPECIALS | set("*'%") ASPECIALS = TSPECIALS | set("*'%")
ATTRIBUTE_ENDS = ASPECIALS | WSP ATTRIBUTE_ENDS = ASPECIALS | WSP
EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%') EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%')
NLSET = {'\n', '\r'}
SPECIALSNL = SPECIALS | NLSET
def quote_string(value): def quote_string(value):
return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"' return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
@ -2802,9 +2804,13 @@ def _refold_parse_tree(parse_tree, *, policy):
wrap_as_ew_blocked -= 1 wrap_as_ew_blocked -= 1
continue continue
tstr = str(part) tstr = str(part)
if part.token_type == 'ptext' and set(tstr) & SPECIALS: if not want_encoding:
# Encode if tstr contains special characters. if part.token_type == 'ptext':
want_encoding = True # Encode if tstr contains special characters.
want_encoding = not SPECIALSNL.isdisjoint(tstr)
else:
# Encode if tstr contains newlines.
want_encoding = not NLSET.isdisjoint(tstr)
try: try:
tstr.encode(encoding) tstr.encode(encoding)
charset = encoding charset = encoding

View File

@ -157,6 +157,13 @@ class Policy(_PolicyBase, metaclass=abc.ABCMeta):
message_factory -- the class to use to create new message objects. message_factory -- the class to use to create new message objects.
If the value is None, the default is Message. If the value is None, the default is Message.
verify_generated_headers
-- if true, the generator verifies that each header
they are properly folded, so that a parser won't
treat it as multiple headers, start-of-body, or
part of another header.
This is a check against custom Header & fold()
implementations.
""" """
raise_on_defect = False raise_on_defect = False
@ -165,6 +172,7 @@ class Policy(_PolicyBase, metaclass=abc.ABCMeta):
max_line_length = 78 max_line_length = 78
mangle_from_ = False mangle_from_ = False
message_factory = None message_factory = None
verify_generated_headers = True
def handle_defect(self, obj, defect): def handle_defect(self, obj, defect):
"""Based on policy, either raise defect or call register_defect. """Based on policy, either raise defect or call register_defect.

View File

@ -29,6 +29,10 @@ class CharsetError(MessageError):
"""An illegal charset was given.""" """An illegal charset was given."""
class HeaderWriteError(MessageError):
"""Error while writing headers."""
# These are parsing defects which the parser was able to work around. # These are parsing defects which the parser was able to work around.
class MessageDefect(ValueError): class MessageDefect(ValueError):
"""Base class for a message defect.""" """Base class for a message defect."""

View File

@ -14,12 +14,14 @@ import random
from copy import deepcopy from copy import deepcopy
from io import StringIO, BytesIO from io import StringIO, BytesIO
from email.utils import _has_surrogates from email.utils import _has_surrogates
from email.errors import HeaderWriteError
UNDERSCORE = '_' UNDERSCORE = '_'
NL = '\n' # XXX: no longer used by the code below. NL = '\n' # XXX: no longer used by the code below.
NLCRE = re.compile(r'\r\n|\r|\n') NLCRE = re.compile(r'\r\n|\r|\n')
fcre = re.compile(r'^From ', re.MULTILINE) fcre = re.compile(r'^From ', re.MULTILINE)
NEWLINE_WITHOUT_FWSP = re.compile(r'\r\n[^ \t]|\r[^ \n\t]|\n[^ \t]')
class Generator: class Generator:
@ -222,7 +224,16 @@ class Generator:
def _write_headers(self, msg): def _write_headers(self, msg):
for h, v in msg.raw_items(): for h, v in msg.raw_items():
self.write(self.policy.fold(h, v)) folded = self.policy.fold(h, v)
if self.policy.verify_generated_headers:
linesep = self.policy.linesep
if not folded.endswith(self.policy.linesep):
raise HeaderWriteError(
f'folded header does not end with {linesep!r}: {folded!r}')
if NEWLINE_WITHOUT_FWSP.search(folded.removesuffix(linesep)):
raise HeaderWriteError(
f'folded header contains newline: {folded!r}')
self.write(folded)
# A blank line always separates headers from body # A blank line always separates headers from body
self.write(self._NL) self.write(self._NL)

View File

@ -6,6 +6,7 @@ from email.message import EmailMessage
from email.generator import Generator, BytesGenerator from email.generator import Generator, BytesGenerator
from email.headerregistry import Address from email.headerregistry import Address
from email import policy from email import policy
import email.errors
from test.test_email import TestEmailBase, parameterize from test.test_email import TestEmailBase, parameterize
@ -249,6 +250,44 @@ class TestGeneratorBase:
g.flatten(msg) g.flatten(msg)
self.assertEqual(s.getvalue(), self.typ(expected)) self.assertEqual(s.getvalue(), self.typ(expected))
def test_keep_encoded_newlines(self):
msg = self.msgmaker(self.typ(textwrap.dedent("""\
To: nobody
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
None
""")))
expected = textwrap.dedent("""\
To: nobody
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
None
""")
s = self.ioclass()
g = self.genclass(s, policy=self.policy.clone(max_line_length=80))
g.flatten(msg)
self.assertEqual(s.getvalue(), self.typ(expected))
def test_keep_long_encoded_newlines(self):
msg = self.msgmaker(self.typ(textwrap.dedent("""\
To: nobody
Subject: Bad subject=?UTF-8?Q?=0A?=Bcc: injection@example.com
None
""")))
expected = textwrap.dedent("""\
To: nobody
Subject: Bad subject
=?utf-8?q?=0A?=Bcc:
injection@example.com
None
""")
s = self.ioclass()
g = self.genclass(s, policy=self.policy.clone(max_line_length=30))
g.flatten(msg)
self.assertEqual(s.getvalue(), self.typ(expected))
class TestGenerator(TestGeneratorBase, TestEmailBase): class TestGenerator(TestGeneratorBase, TestEmailBase):
@ -273,6 +312,29 @@ class TestGenerator(TestGeneratorBase, TestEmailBase):
g.flatten(msg) g.flatten(msg)
self.assertEqual(s.getvalue(), self.typ(expected)) self.assertEqual(s.getvalue(), self.typ(expected))
def test_verify_generated_headers(self):
"""gh-121650: by default the generator prevents header injection"""
class LiteralHeader(str):
name = 'Header'
def fold(self, **kwargs):
return self
for text in (
'Value\r\nBad Injection\r\n',
'NoNewLine'
):
with self.subTest(text=text):
message = message_from_string(
"Header: Value\r\n\r\nBody",
policy=self.policy,
)
del message['Header']
message['Header'] = LiteralHeader(text)
with self.assertRaises(email.errors.HeaderWriteError):
message.as_string()
class TestBytesGenerator(TestGeneratorBase, TestEmailBase): class TestBytesGenerator(TestGeneratorBase, TestEmailBase):

View File

@ -26,6 +26,7 @@ class PolicyAPITests(unittest.TestCase):
'raise_on_defect': False, 'raise_on_defect': False,
'mangle_from_': True, 'mangle_from_': True,
'message_factory': None, 'message_factory': None,
'verify_generated_headers': True,
} }
# These default values are the ones set on email.policy.default. # These default values are the ones set on email.policy.default.
# If any of these defaults change, the docs must be updated. # If any of these defaults change, the docs must be updated.
@ -294,6 +295,31 @@ class PolicyAPITests(unittest.TestCase):
with self.assertRaises(email.errors.HeaderParseError): with self.assertRaises(email.errors.HeaderParseError):
policy.fold("Subject", subject) policy.fold("Subject", subject)
def test_verify_generated_headers(self):
"""Turning protection off allows header injection"""
policy = email.policy.default.clone(verify_generated_headers=False)
for text in (
'Header: Value\r\nBad: Injection\r\n',
'Header: NoNewLine'
):
with self.subTest(text=text):
message = email.message_from_string(
"Header: Value\r\n\r\nBody",
policy=policy,
)
class LiteralHeader(str):
name = 'Header'
def fold(self, **kwargs):
return self
del message['Header']
message['Header'] = LiteralHeader(text)
self.assertEqual(
message.as_string(),
f"{text}\nBody",
)
# XXX: Need subclassing tests. # XXX: Need subclassing tests.
# For adding subclassed objects, make sure the usual rules apply (subclass # For adding subclassed objects, make sure the usual rules apply (subclass
# wins), but that the order still works (right overrides left). # wins), but that the order still works (right overrides left).

View File

@ -0,0 +1,5 @@
:mod:`email` headers with embedded newlines are now quoted on output. The
:mod:`~email.generator` will now refuse to serialize (write) headers that
are unsafely folded or delimited; see
:attr:`~email.policy.Policy.verify_generated_headers`. (Contributed by Bas
Bloemsaat and Petr Viktorin in :gh:`121650`.)