mirror of https://github.com/python/cpython
bpo-35805: Add parser for Message-ID email header. (GH-13397)
* bpo-35805: Add parser for Message-ID header. This parser is based on the definition of Identification Fields from RFC 5322 Sec 3.6.4. This should also prevent folding of Message-ID header using RFC 2047 encoded words and hence fix bpo-35805. * Prevent folding of non-ascii message-id headers. * Add fold method to MsgID token to prevent folding.
This commit is contained in:
parent
bc6469f79c
commit
46d88a1131
|
@ -321,19 +321,26 @@ variant, :attr:`~.BaseHeader.max_count` is set to 1.
|
|||
|
||||
The default mappings are:
|
||||
|
||||
:subject: UniqueUnstructuredHeader
|
||||
:date: UniqueDateHeader
|
||||
:resent-date: DateHeader
|
||||
:orig-date: UniqueDateHeader
|
||||
:sender: UniqueSingleAddressHeader
|
||||
:resent-sender: SingleAddressHeader
|
||||
:to: UniqueAddressHeader
|
||||
:resent-to: AddressHeader
|
||||
:cc: UniqueAddressHeader
|
||||
:resent-cc: AddressHeader
|
||||
:from: UniqueAddressHeader
|
||||
:resent-from: AddressHeader
|
||||
:reply-to: UniqueAddressHeader
|
||||
:subject: UniqueUnstructuredHeader
|
||||
:date: UniqueDateHeader
|
||||
:resent-date: DateHeader
|
||||
:orig-date: UniqueDateHeader
|
||||
:sender: UniqueSingleAddressHeader
|
||||
:resent-sender: SingleAddressHeader
|
||||
:to: UniqueAddressHeader
|
||||
:resent-to: AddressHeader
|
||||
:cc: UniqueAddressHeader
|
||||
:resent-cc: AddressHeader
|
||||
:bcc: UniqueAddressHeader
|
||||
:resent-bcc: AddressHeader
|
||||
:from: UniqueAddressHeader
|
||||
:resent-from: AddressHeader
|
||||
:reply-to: UniqueAddressHeader
|
||||
:mime-version: MIMEVersionHeader
|
||||
:content-type: ContentTypeHeader
|
||||
:content-disposition: ContentDispositionHeader
|
||||
:content-transfer-encoding: ContentTransferEncodingHeader
|
||||
:message-id: MessageIDHeader
|
||||
|
||||
``HeaderRegistry`` has the following methods:
|
||||
|
||||
|
|
|
@ -179,37 +179,30 @@ class WhiteSpaceTokenList(TokenList):
|
|||
|
||||
|
||||
class UnstructuredTokenList(TokenList):
|
||||
|
||||
token_type = 'unstructured'
|
||||
|
||||
|
||||
class Phrase(TokenList):
|
||||
|
||||
token_type = 'phrase'
|
||||
|
||||
class Word(TokenList):
|
||||
|
||||
token_type = 'word'
|
||||
|
||||
|
||||
class CFWSList(WhiteSpaceTokenList):
|
||||
|
||||
token_type = 'cfws'
|
||||
|
||||
|
||||
class Atom(TokenList):
|
||||
|
||||
token_type = 'atom'
|
||||
|
||||
|
||||
class Token(TokenList):
|
||||
|
||||
token_type = 'token'
|
||||
encode_as_ew = False
|
||||
|
||||
|
||||
class EncodedWord(TokenList):
|
||||
|
||||
token_type = 'encoded-word'
|
||||
cte = None
|
||||
charset = None
|
||||
|
@ -496,16 +489,19 @@ class Domain(TokenList):
|
|||
|
||||
|
||||
class DotAtom(TokenList):
|
||||
|
||||
token_type = 'dot-atom'
|
||||
|
||||
|
||||
class DotAtomText(TokenList):
|
||||
|
||||
token_type = 'dot-atom-text'
|
||||
as_ew_allowed = True
|
||||
|
||||
|
||||
class NoFoldLiteral(TokenList):
|
||||
token_type = 'no-fold-literal'
|
||||
as_ew_allowed = False
|
||||
|
||||
|
||||
class AddrSpec(TokenList):
|
||||
|
||||
token_type = 'addr-spec'
|
||||
|
@ -809,7 +805,6 @@ class ParameterizedHeaderValue(TokenList):
|
|||
|
||||
|
||||
class ContentType(ParameterizedHeaderValue):
|
||||
|
||||
token_type = 'content-type'
|
||||
as_ew_allowed = False
|
||||
maintype = 'text'
|
||||
|
@ -817,27 +812,35 @@ class ContentType(ParameterizedHeaderValue):
|
|||
|
||||
|
||||
class ContentDisposition(ParameterizedHeaderValue):
|
||||
|
||||
token_type = 'content-disposition'
|
||||
as_ew_allowed = False
|
||||
content_disposition = None
|
||||
|
||||
|
||||
class ContentTransferEncoding(TokenList):
|
||||
|
||||
token_type = 'content-transfer-encoding'
|
||||
as_ew_allowed = False
|
||||
cte = '7bit'
|
||||
|
||||
|
||||
class HeaderLabel(TokenList):
|
||||
|
||||
token_type = 'header-label'
|
||||
as_ew_allowed = False
|
||||
|
||||
|
||||
class Header(TokenList):
|
||||
class MsgID(TokenList):
|
||||
token_type = 'msg-id'
|
||||
as_ew_allowed = False
|
||||
|
||||
def fold(self, policy):
|
||||
# message-id tokens may not be folded.
|
||||
return str(self) + policy.linesep
|
||||
|
||||
class MessageID(MsgID):
|
||||
token_type = 'message-id'
|
||||
|
||||
|
||||
class Header(TokenList):
|
||||
token_type = 'header'
|
||||
|
||||
|
||||
|
@ -1583,7 +1586,7 @@ def get_addr_spec(value):
|
|||
addr_spec.append(token)
|
||||
if not value or value[0] != '@':
|
||||
addr_spec.defects.append(errors.InvalidHeaderDefect(
|
||||
"add-spec local part with no domain"))
|
||||
"addr-spec local part with no domain"))
|
||||
return addr_spec, value
|
||||
addr_spec.append(ValueTerminal('@', 'address-at-symbol'))
|
||||
token, value = get_domain(value[1:])
|
||||
|
@ -1968,6 +1971,110 @@ def get_address_list(value):
|
|||
value = value[1:]
|
||||
return address_list, value
|
||||
|
||||
|
||||
def get_no_fold_literal(value):
|
||||
""" no-fold-literal = "[" *dtext "]"
|
||||
"""
|
||||
no_fold_literal = NoFoldLiteral()
|
||||
if not value:
|
||||
raise errors.HeaderParseError(
|
||||
"expected no-fold-literal but found '{}'".format(value))
|
||||
if value[0] != '[':
|
||||
raise errors.HeaderParseError(
|
||||
"expected '[' at the start of no-fold-literal "
|
||||
"but found '{}'".format(value))
|
||||
no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start'))
|
||||
value = value[1:]
|
||||
token, value = get_dtext(value)
|
||||
no_fold_literal.append(token)
|
||||
if not value or value[0] != ']':
|
||||
raise errors.HeaderParseError(
|
||||
"expected ']' at the end of no-fold-literal "
|
||||
"but found '{}'".format(value))
|
||||
no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end'))
|
||||
return no_fold_literal, value[1:]
|
||||
|
||||
def get_msg_id(value):
|
||||
"""msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS]
|
||||
id-left = dot-atom-text / obs-id-left
|
||||
id-right = dot-atom-text / no-fold-literal / obs-id-right
|
||||
no-fold-literal = "[" *dtext "]"
|
||||
"""
|
||||
msg_id = MsgID()
|
||||
if value[0] in CFWS_LEADER:
|
||||
token, value = get_cfws(value)
|
||||
msg_id.append(token)
|
||||
if not value or value[0] != '<':
|
||||
raise errors.HeaderParseError(
|
||||
"expected msg-id but found '{}'".format(value))
|
||||
msg_id.append(ValueTerminal('<', 'msg-id-start'))
|
||||
value = value[1:]
|
||||
# Parse id-left.
|
||||
try:
|
||||
token, value = get_dot_atom_text(value)
|
||||
except errors.HeaderParseError:
|
||||
try:
|
||||
# obs-id-left is same as local-part of add-spec.
|
||||
token, value = get_obs_local_part(value)
|
||||
msg_id.defects.append(errors.ObsoleteHeaderDefect(
|
||||
"obsolete id-left in msg-id"))
|
||||
except errors.HeaderParseError:
|
||||
raise errors.HeaderParseError(
|
||||
"expected dot-atom-text or obs-id-left"
|
||||
" but found '{}'".format(value))
|
||||
msg_id.append(token)
|
||||
if not value or value[0] != '@':
|
||||
msg_id.defects.append(errors.InvalidHeaderDefect(
|
||||
"msg-id with no id-right"))
|
||||
# Even though there is no id-right, if the local part
|
||||
# ends with `>` let's just parse it too and return
|
||||
# along with the defect.
|
||||
if value and value[0] == '>':
|
||||
msg_id.append(ValueTerminal('>', 'msg-id-end'))
|
||||
value = value[1:]
|
||||
return msg_id, value
|
||||
msg_id.append(ValueTerminal('@', 'address-at-symbol'))
|
||||
value = value[1:]
|
||||
# Parse id-right.
|
||||
try:
|
||||
token, value = get_dot_atom_text(value)
|
||||
except errors.HeaderParseError:
|
||||
try:
|
||||
token, value = get_no_fold_literal(value)
|
||||
except errors.HeaderParseError as e:
|
||||
try:
|
||||
token, value = get_domain(value)
|
||||
msg_id.defects.append(errors.ObsoleteHeaderDefect(
|
||||
"obsolete id-right in msg-id"))
|
||||
except errors.HeaderParseError:
|
||||
raise errors.HeaderParseError(
|
||||
"expected dot-atom-text, no-fold-literal or obs-id-right"
|
||||
" but found '{}'".format(value))
|
||||
msg_id.append(token)
|
||||
if value and value[0] == '>':
|
||||
value = value[1:]
|
||||
else:
|
||||
msg_id.defects.append(errors.InvalidHeaderDefect(
|
||||
"missing trailing '>' on msg-id"))
|
||||
msg_id.append(ValueTerminal('>', 'msg-id-end'))
|
||||
if value and value[0] in CFWS_LEADER:
|
||||
token, value = get_cfws(value)
|
||||
msg_id.append(token)
|
||||
return msg_id, value
|
||||
|
||||
|
||||
def parse_message_id(value):
|
||||
"""message-id = "Message-ID:" msg-id CRLF
|
||||
"""
|
||||
message_id = MessageID()
|
||||
try:
|
||||
token, value = get_msg_id(value)
|
||||
except errors.HeaderParseError:
|
||||
message_id.defects.append(errors.InvalidHeaderDefect(
|
||||
"Expected msg-id but found {!r}".format(value)))
|
||||
message_id.append(token)
|
||||
return message_id
|
||||
|
||||
#
|
||||
# XXX: As I begin to add additional header parsers, I'm realizing we probably
|
||||
# have two level of parser routines: the get_XXX methods that get a token in
|
||||
|
|
|
@ -520,6 +520,18 @@ class ContentTransferEncodingHeader:
|
|||
return self._cte
|
||||
|
||||
|
||||
class MessageIDHeader:
|
||||
|
||||
max_count = 1
|
||||
value_parser = staticmethod(parser.parse_message_id)
|
||||
|
||||
@classmethod
|
||||
def parse(cls, value, kwds):
|
||||
kwds['parse_tree'] = parse_tree = cls.value_parser(value)
|
||||
kwds['decoded'] = str(parse_tree)
|
||||
kwds['defects'].extend(parse_tree.all_defects)
|
||||
|
||||
|
||||
# The header factory #
|
||||
|
||||
_default_header_map = {
|
||||
|
@ -542,6 +554,7 @@ _default_header_map = {
|
|||
'content-type': ContentTypeHeader,
|
||||
'content-disposition': ContentDispositionHeader,
|
||||
'content-transfer-encoding': ContentTransferEncodingHeader,
|
||||
'message-id': MessageIDHeader,
|
||||
}
|
||||
|
||||
class HeaderRegistry:
|
||||
|
|
|
@ -2494,6 +2494,78 @@ class TestParser(TestParserMixin, TestEmailBase):
|
|||
";foo", ";foo", ";foo", [errors.InvalidHeaderDefect]*3
|
||||
)
|
||||
|
||||
# get_msg_id
|
||||
|
||||
def test_get_msg_id_valid(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
"<simeple.local@example.something.com>",
|
||||
"<simeple.local@example.something.com>",
|
||||
"<simeple.local@example.something.com>",
|
||||
[],
|
||||
'',
|
||||
)
|
||||
self.assertEqual(msg_id.token_type, 'msg-id')
|
||||
|
||||
def test_get_msg_id_obsolete_local(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
'<"simeple.local"@example.com>',
|
||||
'<"simeple.local"@example.com>',
|
||||
'<simeple.local@example.com>',
|
||||
[errors.ObsoleteHeaderDefect],
|
||||
'',
|
||||
)
|
||||
self.assertEqual(msg_id.token_type, 'msg-id')
|
||||
|
||||
def test_get_msg_id_non_folding_literal_domain(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
"<simple.local@[someexamplecom.domain]>",
|
||||
"<simple.local@[someexamplecom.domain]>",
|
||||
"<simple.local@[someexamplecom.domain]>",
|
||||
[],
|
||||
"",
|
||||
)
|
||||
self.assertEqual(msg_id.token_type, 'msg-id')
|
||||
|
||||
|
||||
def test_get_msg_id_obsolete_domain_part(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
"<simplelocal@(old)example.com>",
|
||||
"<simplelocal@(old)example.com>",
|
||||
"<simplelocal@ example.com>",
|
||||
[errors.ObsoleteHeaderDefect],
|
||||
""
|
||||
)
|
||||
|
||||
def test_get_msg_id_no_id_right_part(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
"<simplelocal>",
|
||||
"<simplelocal>",
|
||||
"<simplelocal>",
|
||||
[errors.InvalidHeaderDefect],
|
||||
""
|
||||
)
|
||||
self.assertEqual(msg_id.token_type, 'msg-id')
|
||||
|
||||
def test_get_msg_id_no_angle_start(self):
|
||||
with self.assertRaises(errors.HeaderParseError):
|
||||
parser.get_msg_id("msgwithnoankle")
|
||||
|
||||
def test_get_msg_id_no_angle_end(self):
|
||||
msg_id = self._test_get_x(
|
||||
parser.get_msg_id,
|
||||
"<simplelocal@domain",
|
||||
"<simplelocal@domain>",
|
||||
"<simplelocal@domain>",
|
||||
[errors.InvalidHeaderDefect],
|
||||
""
|
||||
)
|
||||
self.assertEqual(msg_id.token_type, 'msg-id')
|
||||
|
||||
|
||||
@parameterize
|
||||
class Test_parse_mime_parameters(TestParserMixin, TestEmailBase):
|
||||
|
|
|
@ -1648,6 +1648,34 @@ class TestFolding(TestHeaderBase):
|
|||
'xxxxxxxxxxxxxxxxxxxx=3D=3D-xxx-xx-xx?=\n'
|
||||
' =?utf-8?q?=3E?=\n')
|
||||
|
||||
def test_message_id_header_is_not_folded(self):
|
||||
h = self.make_header(
|
||||
'Message-ID',
|
||||
'<somemessageidlongerthan@maxlinelength.com>')
|
||||
self.assertEqual(
|
||||
h.fold(policy=policy.default.clone(max_line_length=20)),
|
||||
'Message-ID: <somemessageidlongerthan@maxlinelength.com>\n')
|
||||
|
||||
# Test message-id isn't folded when id-right is no-fold-literal.
|
||||
h = self.make_header(
|
||||
'Message-ID',
|
||||
'<somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>')
|
||||
self.assertEqual(
|
||||
h.fold(policy=policy.default.clone(max_line_length=20)),
|
||||
'Message-ID: <somemessageidlongerthan@[127.0.0.0.0.0.0.0.0.1]>\n')
|
||||
|
||||
# Test message-id isn't folded when id-right is non-ascii characters.
|
||||
h = self.make_header('Message-ID', '<ईमेल@wők.com>')
|
||||
self.assertEqual(
|
||||
h.fold(policy=policy.default.clone(max_line_length=30)),
|
||||
'Message-ID: <ईमेल@wők.com>\n')
|
||||
|
||||
# Test message-id is folded without breaking the msg-id token into
|
||||
# encoded words, *even* if they don't fit into max_line_length.
|
||||
h = self.make_header('Message-ID', '<ईमेलfromMessage@wők.com>')
|
||||
self.assertEqual(
|
||||
h.fold(policy=policy.default.clone(max_line_length=20)),
|
||||
'Message-ID:\n <ईमेलfromMessage@wők.com>\n')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Add parser for Message-ID header and add it to default HeaderRegistry. This
|
||||
should prevent folding of Message-ID using RFC 2048 encoded words.
|
Loading…
Reference in New Issue