#15160: Extend the new email parser to handle MIME headers.

This code passes all the same tests that the existing RFC mime header
parser passes, plus a bunch of additional ones.

There are a couple of commented out tests where there are issues with the
folding.  The folding doesn't normally get invoked for headers parsed from
source, and the cases are marginal anyway (headers with invalid binary data)
so I'm not worried about them, but will fix them after the beta.

There are things that can be done to make this API even more convenient, but I
think this is a solid foundation worth having.  And the parser is a full RFC
parser, so it handles cases that the current parser doesn't.  (There are also
probably cases where it fails when the current parser doesn't, but I haven't
found them yet ;)

Oh, yeah, and there are some really ugly bits in the parser for handling some
'postel' cases that are unfortunately common.

I hope/plan to to eventually refactor a lot of the code in the parser which
should reduce the line count...but there is no escaping the fact that the
error recovery is welter of special cases.
This commit is contained in:
R David Murray 2012-06-24 05:03:27 -04:00
parent 49c15d4a5f
commit 97f43c019f
6 changed files with 1918 additions and 34 deletions

View File

@ -234,11 +234,80 @@ headers.
result in a :exc:`ValueError`.
Each of the above classes also has a ``Unique`` variant (for example,
Many of the above classes also have a ``Unique`` variant (for example,
``UniqueUnstructuredHeader``). The only difference is that in the ``Unique``
variant, :attr:`~.BaseHeader.max_count` is set to 1.
.. class:: MIMEVersionHeader
There is really only one valid value for the :mailheader:`MIME-Version`
header, and that is ``1.0``. For future proofing, this header class
supports other valid version numbers. If a version number has a valid value
per :rfc:`2045`, then the header object will have non-``None`` values for
the following attributes:
.. attribute:: version
The version number as a string, with any whitespace and/or comments
removed.
.. attribute:: major
The major version number as an integer
.. attribute:: minor
The minor version number as an integer
.. class:: ParameterizedMIMEHeader
MOME headers all start with the prefix 'Content-'. Each specific header has
a certain value, described under the class for that header. Some can
also take a list of supplemental parameters, which have a common format.
This class serves as a base for all the MIME headers that take parameters.
.. attrbibute:: params
A dictionary mapping parameter names to parameter values.
.. class:: ContentTypeHeader
A :class:`ParameterizedMIMEHheader` class that handles the
:mailheader:`Content-Type` header.
.. attribute:: content_type
The content type string, in the form ``maintype/subtype``.
.. attribute:: maintype
.. attribute:: subtype
.. class:: ContentDispositionHeader
A :class:`ParameterizedMIMEHheader` class that handles the
:mailheader:`Content-Disposition` header.
.. attribute:: content-disposition
``inline`` and ``attachment`` are the only valid values in common use.
.. class:: ContentTransferEncoding
Handles the :mailheader:`Content-Transfer-Encoding` header.
.. attribute:: cte
Valid values are ``7bit``, ``8bit``, ``base64``, and
``quoted-printable``. See :rfc:`2045` for more information.
.. class:: HeaderRegistry(base_class=BaseHeader, \
default_class=UnstructuredHeader, \
use_default_map=True)

View File

@ -68,6 +68,8 @@ XXX: provide complete list of token types.
"""
import re
import urllib # For urllib.parse.unquote
from collections import namedtuple, OrderedDict
from email import _encoded_words as _ew
from email import errors
from email import utils
@ -83,6 +85,11 @@ ATOM_ENDS = SPECIALS | WSP
DOT_ATOM_ENDS = ATOM_ENDS - set('.')
# '.', '"', and '(' do not end phrases in order to support obs-phrase
PHRASE_ENDS = SPECIALS - set('."(')
TSPECIALS = (SPECIALS | set('/?=')) - set('.')
TOKEN_ENDS = TSPECIALS | WSP
ASPECIALS = TSPECIALS | set("*'%")
ATTRIBUTE_ENDS = ASPECIALS | WSP
EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%')
def quote_string(value):
return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
@ -356,8 +363,12 @@ class TokenList(list):
self.__class__.__name__,
self.token_type)
for token in self:
for line in token._pp(indent+' '):
yield line
if not hasattr(token, '_pp'):
yield (indent + ' !! invalid element in token '
'list: {!r}'.format(token))
else:
for line in token._pp(indent+' '):
yield line
if self.defects:
extra = ' Defects: {}'.format(self.defects)
else:
@ -567,6 +578,11 @@ class Atom(TokenList):
token_type = 'atom'
class Token(TokenList):
token_type = 'token'
class EncodedWord(TokenList):
token_type = 'encoded-word'
@ -602,13 +618,19 @@ class QuotedString(TokenList):
res.append(x.value)
return ''.join(res)
@property
def stripped_value(self):
for token in self:
if token.token_type == 'bare-quoted-string':
return token.value
class BareQuotedString(QuotedString):
token_type = 'bare-quoted-string'
def __str__(self):
return quote_string(''.join(self))
return quote_string(''.join(str(x) for x in self))
@property
def value(self):
@ -987,6 +1009,180 @@ class DomainLiteral(TokenList):
return x.value
class MIMEVersion(TokenList):
token_type = 'mime-version'
major = None
minor = None
class Parameter(TokenList):
token_type = 'parameter'
sectioned = False
extended = False
charset = 'us-ascii'
@property
def section_number(self):
# Because the first token, the attribute (name) eats CFWS, the second
# token is always the section if there is one.
return self[1].number if self.sectioned else 0
@property
def param_value(self):
# This is part of the "handle quoted extended parameters" hack.
for token in self:
if token.token_type == 'value':
return token.stripped_value
if token.token_type == 'quoted-string':
for token in token:
if token.token_type == 'bare-quoted-string':
for token in token:
if token.token_type == 'value':
return token.stripped_value
return ''
class InvalidParameter(Parameter):
token_type = 'invalid-parameter'
class Attribute(TokenList):
token_type = 'attribute'
@property
def stripped_value(self):
for token in self:
if token.token_type.endswith('attrtext'):
return token.value
class Section(TokenList):
token_type = 'section'
number = None
class Value(TokenList):
token_type = 'value'
@property
def stripped_value(self):
token = self[0]
if token.token_type == 'cfws':
token = self[1]
if token.token_type.endswith(
('quoted-string', 'attribute', 'extended-attribute')):
return token.stripped_value
return self.value
class MimeParameters(TokenList):
token_type = 'mime-parameters'
@property
def params(self):
# The RFC specifically states that the ordering of parameters is not
# guaranteed and may be reordered by the transport layer. So we have
# to assume the RFC 2231 pieces can come in any order. However, we
# output them in the order that we first see a given name, which gives
# us a stable __str__.
params = OrderedDict()
for token in self:
if not token.token_type.endswith('parameter'):
continue
if token[0].token_type != 'attribute':
continue
name = token[0].value.strip()
if name not in params:
params[name] = []
params[name].append((token.section_number, token))
for name, parts in params.items():
parts = sorted(parts)
# XXX: there might be more recovery we could do here if, for
# example, this is really a case of a duplicate attribute name.
value_parts = []
charset = parts[0][1].charset
for i, (section_number, param) in enumerate(parts):
if section_number != i:
param.defects.append(errors.InvalidHeaderDefect(
"inconsistent multipart parameter numbering"))
value = param.param_value
if param.extended:
try:
value = urllib.parse.unquote_to_bytes(value)
except UnicodeEncodeError:
# source had surrogate escaped bytes. What we do now
# is a bit of an open question. I'm not sure this is
# the best choice, but it is what the old algorithm did
value = urllib.parse.unquote(value, encoding='latin-1')
else:
try:
value = value.decode(charset, 'surrogateescape')
except LookupError:
# XXX: there should really be a custom defect for
# unknown character set to make it easy to find,
# because otherwise unknown charset is a silent
# failure.
value = value.decode('us-ascii', 'surrogateescape')
if utils._has_surrogates(value):
param.defects.append(errors.UndecodableBytesDefect())
value_parts.append(value)
value = ''.join(value_parts)
yield name, value
def __str__(self):
params = []
for name, value in self.params:
if value:
params.append('{}={}'.format(name, quote_string(value)))
else:
params.append(name)
params = '; '.join(params)
return ' ' + params if params else ''
class ParameterizedHeaderValue(TokenList):
@property
def params(self):
for token in reversed(self):
if token.token_type == 'mime-parameters':
return token.params
return {}
@property
def parts(self):
if self and self[-1].token_type == 'mime-parameters':
# We don't want to start a new line if all of the params don't fit
# after the value, so unwrap the parameter list.
return TokenList(self[:-1] + self[-1])
return TokenList(self).parts
class ContentType(ParameterizedHeaderValue):
token_type = 'content-type'
maintype = 'text'
subtype = 'plain'
class ContentDisposition(ParameterizedHeaderValue):
token_type = 'content-disposition'
content_disposition = None
class ContentTransferEncoding(TokenList):
token_type = 'content-transfer-encoding'
cte = '7bit'
class HeaderLabel(TokenList):
token_type = 'header-label'
@ -1145,6 +1341,13 @@ _wsp_splitter = re.compile(r'([{}]+)'.format(''.join(WSP))).split
_non_atom_end_matcher = re.compile(r"[^{}]+".format(
''.join(ATOM_ENDS).replace('\\','\\\\').replace(']','\]'))).match
_non_printable_finder = re.compile(r"[\x00-\x20\x7F]").findall
_non_token_end_matcher = re.compile(r"[^{}]+".format(
''.join(TOKEN_ENDS).replace('\\','\\\\').replace(']','\]'))).match
_non_attribute_end_matcher = re.compile(r"[^{}]+".format(
''.join(ATTRIBUTE_ENDS).replace('\\','\\\\').replace(']','\]'))).match
_non_extended_attribute_end_matcher = re.compile(r"[^{}]+".format(
''.join(EXTENDED_ATTRIBUTE_ENDS).replace(
'\\','\\\\').replace(']','\]'))).match
def _validate_xtext(xtext):
"""If input token contains ASCII non-printables, register a defect."""
@ -2153,3 +2356,598 @@ def get_address_list(value):
address_list.append(ValueTerminal(',', 'list-separator'))
value = value[1:]
return address_list, value
#
# XXX: As I begin to add additional header parsers, I'm realizing we probably
# have two level of parser routines: the get_XXX methods that get a token in
# the grammar, and parse_XXX methods that parse an entire field value. So
# get_address_list above should really be a parse_ method, as probably should
# be get_unstructured.
#
def parse_mime_version(value):
""" mime-version = [CFWS] 1*digit [CFWS] "." [CFWS] 1*digit [CFWS]
"""
# The [CFWS] is implicit in the RFC 2045 BNF.
# XXX: This routine is a bit verbose, should factor out a get_int method.
mime_version = MIMEVersion()
if not value:
mime_version.defects.append(errors.HeaderMissingRequiredValue(
"Missing MIME version number (eg: 1.0)"))
return mime_version
if value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mime_version.append(token)
if not value:
mime_version.defects.append(errors.HeaderMissingRequiredValue(
"Expected MIME version number but found only CFWS"))
digits = ''
while value and value[0] != '.' and value[0] not in CFWS_LEADER:
digits += value[0]
value = value[1:]
if not digits.isdigit():
mime_version.defects.append(errors.InvalidHeaderDefect(
"Expected MIME major version number but found {!r}".format(digits)))
mime_version.append(ValueTerminal(digits, 'xtext'))
else:
mime_version.major = int(digits)
mime_version.append(ValueTerminal(digits, 'digits'))
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mime_version.append(token)
if not value or value[0] != '.':
if mime_version.major is not None:
mime_version.defects.append(errors.InvalidHeaderDefect(
"Incomplete MIME version; found only major number"))
if value:
mime_version.append(ValueTerminal(value, 'xtext'))
return mime_version
mime_version.append(ValueTerminal('.', 'version-separator'))
value = value[1:]
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mime_version.append(token)
if not value:
if mime_version.major is not None:
mime_version.defects.append(errors.InvalidHeaderDefect(
"Incomplete MIME version; found only major number"))
return mime_version
digits = ''
while value and value[0] not in CFWS_LEADER:
digits += value[0]
value = value[1:]
if not digits.isdigit():
mime_version.defects.append(errors.InvalidHeaderDefect(
"Expected MIME minor version number but found {!r}".format(digits)))
mime_version.append(ValueTerminal(digits, 'xtext'))
else:
mime_version.minor = int(digits)
mime_version.append(ValueTerminal(digits, 'digits'))
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mime_version.append(token)
if value:
mime_version.defects.append(errors.InvalidHeaderDefect(
"Excess non-CFWS text after MIME version"))
mime_version.append(ValueTerminal(value, 'xtext'))
return mime_version
def get_invalid_parameter(value):
""" Read everything up to the next ';'.
This is outside the formal grammar. The InvalidParameter TokenList that is
returned acts like a Parameter, but the data attributes are None.
"""
invalid_parameter = InvalidParameter()
while value and value[0] != ';':
if value[0] in PHRASE_ENDS:
invalid_parameter.append(ValueTerminal(value[0],
'misplaced-special'))
value = value[1:]
else:
token, value = get_phrase(value)
invalid_parameter.append(token)
return invalid_parameter, value
def get_ttext(value):
"""ttext = <matches _ttext_matcher>
We allow any non-TOKEN_ENDS in ttext, but add defects to the token's
defects list if we find non-ttext characters. We also register defects for
*any* non-printables even though the RFC doesn't exclude all of them,
because we follow the spirit of RFC 5322.
"""
m = _non_token_end_matcher(value)
if not m:
raise errors.HeaderParseError(
"expected ttext but found '{}'".format(value))
ttext = m.group()
value = value[len(ttext):]
ttext = ValueTerminal(ttext, 'ttext')
_validate_xtext(ttext)
return ttext, value
def get_token(value):
"""token = [CFWS] 1*ttext [CFWS]
The RFC equivalent of ttext is any US-ASCII chars except space, ctls, or
tspecials. We also exclude tabs even though the RFC doesn't.
The RFC implies the CFWS but is not explicit about it in the BNF.
"""
mtoken = Token()
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mtoken.append(token)
if value and value[0] in TOKEN_ENDS:
raise errors.HeaderParseError(
"expected token but found '{}'".format(value))
token, value = get_ttext(value)
mtoken.append(token)
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
mtoken.append(token)
return mtoken, value
def get_attrtext(value):
"""attrtext = 1*(any non-ATTRIBUTE_ENDS character)
We allow any non-ATTRIBUTE_ENDS in attrtext, but add defects to the
token's defects list if we find non-attrtext characters. We also register
defects for *any* non-printables even though the RFC doesn't exclude all of
them, because we follow the spirit of RFC 5322.
"""
m = _non_attribute_end_matcher(value)
if not m:
raise errors.HeaderParseError(
"expected attrtext but found {!r}".format(value))
attrtext = m.group()
value = value[len(attrtext):]
attrtext = ValueTerminal(attrtext, 'attrtext')
_validate_xtext(attrtext)
return attrtext, value
def get_attribute(value):
""" [CFWS] 1*attrtext [CFWS]
This version of the BNF makes the CFWS explicit, and as usual we use a
value terminal for the actual run of characters. The RFC equivalent of
attrtext is the token characters, with the subtraction of '*', "'", and '%'.
We include tab in the excluded set just as we do for token.
"""
attribute = Attribute()
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
attribute.append(token)
if value and value[0] in ATTRIBUTE_ENDS:
raise errors.HeaderParseError(
"expected token but found '{}'".format(value))
token, value = get_attrtext(value)
attribute.append(token)
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
attribute.append(token)
return attribute, value
def get_extended_attrtext(value):
"""attrtext = 1*(any non-ATTRIBUTE_ENDS character plus '%')
This is a special parsing routine so that we get a value that
includes % escapes as a single string (which we decode as a single
string later).
"""
m = _non_extended_attribute_end_matcher(value)
if not m:
raise errors.HeaderParseError(
"expected extended attrtext but found {!r}".format(value))
attrtext = m.group()
value = value[len(attrtext):]
attrtext = ValueTerminal(attrtext, 'extended-attrtext')
_validate_xtext(attrtext)
return attrtext, value
def get_extended_attribute(value):
""" [CFWS] 1*extended_attrtext [CFWS]
This is like the non-extended version except we allow % characters, so that
we can pick up an encoded value as a single string.
"""
# XXX: should we have an ExtendedAttribute TokenList?
attribute = Attribute()
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
attribute.append(token)
if value and value[0] in EXTENDED_ATTRIBUTE_ENDS:
raise errors.HeaderParseError(
"expected token but found '{}'".format(value))
token, value = get_extended_attrtext(value)
attribute.append(token)
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
attribute.append(token)
return attribute, value
def get_section(value):
""" '*' digits
The formal BNF is more complicated because leading 0s are not allowed. We
check for that and add a defect. We also assume no CFWS is allowed between
the '*' and the digits, though the RFC is not crystal clear on that.
The caller should already have dealt with leading CFWS.
"""
section = Section()
if not value or value[0] != '*':
raise errors.HeaderParseError("Expected section but found {}".format(
value))
section.append(ValueTerminal('*', 'section-marker'))
value = value[1:]
if not value or not value[0].isdigit():
raise errors.HeaderParseError("Expected section number but "
"found {}".format(value))
digits = ''
while value and value[0].isdigit():
digits += value[0]
value = value[1:]
if digits[0] == '0' and digits != '0':
section.defects.append(errors.InvalidHeaderError("section number"
"has an invalid leading 0"))
section.number = int(digits)
section.append(ValueTerminal(digits, 'digits'))
return section, value
def get_value(value):
""" quoted-string / attribute
"""
v = Value()
if not value:
raise errors.HeaderParseError("Expected value but found end of string")
leader = None
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError("Expected value but found "
"only {}".format(leader))
if value[0] == '"':
token, value = get_quoted_string(value)
else:
token, value = get_extended_attribute(value)
if leader is not None:
token[:0] = [leader]
v.append(token)
return v, value
def get_parameter(value):
""" attribute [section] ["*"] [CFWS] "=" value
The CFWS is implied by the RFC but not made explicit in the BNF. This
simplified form of the BNF from the RFC is made to conform with the RFC BNF
through some extra checks. We do it this way because it makes both error
recovery and working with the resulting parse tree easier.
"""
# It is possible CFWS would also be implicitly allowed between the section
# and the 'extended-attribute' marker (the '*') , but we've never seen that
# in the wild and we will therefore ignore the possibility.
param = Parameter()
token, value = get_attribute(value)
param.append(token)
if not value or value[0] == ';':
param.defects.append(errors.InvalidHeaderDefect("Parameter contains "
"name ({}) but no value".format(token)))
return param, value
if value[0] == '*':
try:
token, value = get_section(value)
param.sectioned = True
param.append(token)
except errors.HeaderParseError:
pass
if not value:
raise errors.HeaderParseError("Incomplete parameter")
if value[0] == '*':
param.append(ValueTerminal('*', 'extended-parameter-marker'))
value = value[1:]
param.extended = True
if value[0] != '=':
raise errors.HeaderParseError("Parameter not followed by '='")
param.append(ValueTerminal('=', 'parameter-separator'))
value = value[1:]
leader = None
if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
param.append(token)
remainder = None
appendto = param
if param.extended and value and value[0] == '"':
# Now for some serious hackery to handle the common invalid case of
# double quotes around an extended value. We also accept (with defect)
# a value marked as encoded that isn't really.
qstring, remainder = get_quoted_string(value)
inner_value = qstring.stripped_value
semi_valid = False
if param.section_number == 0:
if inner_value and inner_value[0] == "'":
semi_valid = True
else:
token, rest = get_attrtext(inner_value)
if rest and rest[0] == "'":
semi_valid = True
else:
try:
token, rest = get_extended_attrtext(inner_value)
except:
pass
else:
if not rest:
semi_valid = True
if semi_valid:
param.defects.append(errors.InvalidHeaderDefect(
"Quoted string value for extended parameter is invalid"))
param.append(qstring)
for t in qstring:
if t.token_type == 'bare-quoted-string':
t[:] = []
appendto = t
break
value = inner_value
else:
remainder = None
param.defects.append(errors.InvalidHeaderDefect(
"Parameter marked as extended but appears to have a "
"quoted string value that is non-encoded"))
if value and value[0] == "'":
token = None
else:
token, value = get_value(value)
if not param.extended or param.section_number > 0:
if not value or value[0] != "'":
appendto.append(token)
if remainder is not None:
assert not value, value
value = remainder
return param, value
param.defects.append(errors.InvalidHeaderDefect(
"Apparent initial-extended-value but attribute "
"was not marked as extended or was not initial section"))
if not value:
# Assume the charset/lang is missing and the token is the value.
param.defects.append(errors.InvalidHeaderDefect(
"Missing required charset/lang delimiters"))
appendto.append(token)
if remainder is None:
return param, value
else:
if token is not None:
for t in token:
if t.token_type == 'extended-attrtext':
break
t.token_type == 'attrtext'
appendto.append(t)
param.charset = t.value
if value[0] != "'":
raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
"delimiter, but found {!r}".format(value))
appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
value = value[1:]
if value and value[0] != "'":
token, value = get_attrtext(value)
appendto.append(token)
param.lang = token.value
if not value or value[0] != "'":
raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
"delimiter, but found {}".format(value))
appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
value = value[1:]
if remainder is not None:
# Treat the rest of value as bare quoted string content.
v = Value()
while value:
if value[0] in WSP:
token, value = get_fws(value)
else:
token, value = get_qcontent(value)
v.append(token)
token = v
else:
token, value = get_value(value)
appendto.append(token)
if remainder is not None:
assert not value, value
value = remainder
return param, value
def parse_mime_parameters(value):
""" parameter *( ";" parameter )
That BNF is meant to indicate this routine should only be called after
finding and handling the leading ';'. There is no corresponding rule in
the formal RFC grammar, but it is more convenient for us for the set of
parameters to be treated as its own TokenList.
This is 'parse' routine because it consumes the reminaing value, but it
would never be called to parse a full header. Instead it is called to
parse everything after the non-parameter value of a specific MIME header.
"""
mime_parameters = MimeParameters()
while value:
try:
token, value = get_parameter(value)
mime_parameters.append(token)
except errors.HeaderParseError as err:
leader = None
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
mime_parameters.append(leader)
return mime_parameters
if value[0] == ';':
if leader is not None:
mime_parameters.append(leader)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter entry with no content"))
else:
token, value = get_invalid_parameter(value)
if leader:
token[:0] = [leader]
mime_parameters.append(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"invalid parameter {!r}".format(token)))
if value and value[0] != ';':
# Junk after the otherwise valid parameter. Mark it as
# invalid, but it will have a value.
param = mime_parameters[-1]
param.token_type = 'invalid-parameter'
token, value = get_invalid_parameter(value)
param.extend(token)
mime_parameters.defects.append(errors.InvalidHeaderDefect(
"parameter with invalid trailing text {!r}".format(token)))
if value:
# Must be a ';' at this point.
mime_parameters.append(ValueTerminal(';', 'parameter-separator'))
value = value[1:]
return mime_parameters
def _find_mime_parameters(tokenlist, value):
"""Do our best to find the parameters in an invalid MIME header
"""
while value and value[0] != ';':
if value[0] in PHRASE_ENDS:
tokenlist.append(ValueTerminal(value[0], 'misplaced-special'))
value = value[1:]
else:
token, value = get_phrase(value)
tokenlist.append(token)
if not value:
return
tokenlist.append(ValueTerminal(';', 'parameter-separator'))
tokenlist.append(parse_mime_parameters(value[1:]))
def parse_content_type_header(value):
""" maintype "/" subtype *( ";" parameter )
The maintype and substype are tokens. Theoretically they could
be checked against the official IANA list + x-token, but we
don't do that.
"""
ctype = ContentType()
recover = False
if not value:
ctype.defects.append(errors.HeaderMissingRequiredValue(
"Missing content type specification"))
return ctype
try:
token, value = get_token(value)
except errors.HeaderParseError:
ctype.defects.append(errors.InvalidHeaderDefect(
"Expected content maintype but found {!r}".format(value)))
_find_mime_parameters(ctype, value)
return ctype
ctype.append(token)
# XXX: If we really want to follow the formal grammer we should make
# mantype and subtype specialized TokenLists here. Probably not worth it.
if not value or value[0] != '/':
ctype.defects.append(errors.InvalidHeaderDefect(
"Invalid content type"))
if value:
_find_mime_parameters(ctype, value)
return ctype
ctype.maintype = token.value.strip().lower()
ctype.append(ValueTerminal('/', 'content-type-separator'))
value = value[1:]
try:
token, value = get_token(value)
except errors.HeaderParseError:
ctype.defects.append(errors.InvalidHeaderDefect(
"Expected content subtype but found {!r}".format(value)))
_find_mime_parameters(ctype, value)
return ctype
ctype.append(token)
ctype.subtype = token.value.strip().lower()
if not value:
return ctype
if value[0] != ';':
ctype.defects.append(errors.InvalidHeaderDefect(
"Only parameters are valid after content type, but "
"found {!r}".format(value)))
# The RFC requires that a syntactically invalid content-type be treated
# as text/plain. Perhaps we should postel this, but we should probably
# only do that if we were checking the subtype value against IANA.
del ctype.maintype, ctype.subtype
_find_mime_parameters(ctype, value)
return ctype
ctype.append(ValueTerminal(';', 'parameter-separator'))
ctype.append(parse_mime_parameters(value[1:]))
return ctype
def parse_content_disposition_header(value):
""" disposition-type *( ";" parameter )
"""
disp_header = ContentDisposition()
if not value:
disp_header.defects.append(errors.HeaderMissingRequiredValue(
"Missing content disposition"))
return disp_header
try:
token, value = get_token(value)
except errors.HeaderParseError:
ctype.defects.append(errors.InvalidHeaderDefect(
"Expected content disposition but found {!r}".format(value)))
_find_mime_parameters(disp_header, value)
return disp_header
disp_header.append(token)
disp_header.content_disposition = token.value.strip().lower()
if not value:
return disp_header
if value[0] != ';':
disp_header.defects.append(errors.InvalidHeaderDefect(
"Only parameters are valid after content disposition, but "
"found {!r}".format(value)))
_find_mime_parameters(disp_header, value)
return disp_header
disp_header.append(ValueTerminal(';', 'parameter-separator'))
disp_header.append(parse_mime_parameters(value[1:]))
return disp_header
def parse_content_transfer_encoding_header(value):
""" mechanism
"""
# We should probably validate the values, since the list is fixed.
cte_header = ContentTransferEncoding()
if not value:
cte_header.defects.append(errors.HeaderMissingRequiredValue(
"Missing content transfer encoding"))
return cte_header
try:
token, value = get_token(value)
except errors.HeaderParseError:
ctype.defects.append(errors.InvalidHeaderDefect(
"Expected content trnasfer encoding but found {!r}".format(value)))
else:
cte_header.append(token)
cte_header.cte = token.value.strip().lower()
if not value:
return cte_header
while value:
cte_header.defects.append(errors.InvalidHeaderDefect(
"Extra text after content transfer encoding"))
if value[0] in PHRASE_ENDS:
cte_header.append(ValueTerminal(value[0], 'misplaced-special'))
value = value[1:]
else:
token, value = get_phrase(value)
cte_header.append(token)
return cte_header

View File

@ -391,24 +391,151 @@ class UniqueSingleAddressHeader(SingleAddressHeader):
max_count = 1
class MIMEVersionHeader:
max_count = 1
value_parser = staticmethod(parser.parse_mime_version)
@classmethod
def parse(cls, value, kwds):
kwds['parse_tree'] = parse_tree = cls.value_parser(value)
kwds['decoded'] = str(parse_tree)
kwds['defects'].extend(parse_tree.all_defects)
kwds['major'] = None if parse_tree.minor is None else parse_tree.major
kwds['minor'] = parse_tree.minor
if parse_tree.minor is not None:
kwds['version'] = '{}.{}'.format(kwds['major'], kwds['minor'])
else:
kwds['version'] = None
def init(self, *args, **kw):
self._version = kw.pop('version')
self._major = kw.pop('major')
self._minor = kw.pop('minor')
super().init(*args, **kw)
@property
def major(self):
return self._major
@property
def minor(self):
return self._minor
@property
def version(self):
return self._version
class ParameterizedMIMEHeader:
# Mixin that handles the params dict. Must be subclassed and
# a property value_parser for the specific header provided.
max_count = 1
@classmethod
def parse(cls, value, kwds):
kwds['parse_tree'] = parse_tree = cls.value_parser(value)
kwds['decoded'] = str(parse_tree)
kwds['defects'].extend(parse_tree.all_defects)
if parse_tree.params is None:
kwds['params'] = {}
else:
# The MIME RFCs specify that parameter ordering is arbitrary.
kwds['params'] = {utils._sanitize(name).lower():
utils._sanitize(value)
for name, value in parse_tree.params}
def init(self, *args, **kw):
self._params = kw.pop('params')
super().init(*args, **kw)
@property
def params(self):
return self._params.copy()
class ContentTypeHeader(ParameterizedMIMEHeader):
value_parser = staticmethod(parser.parse_content_type_header)
def init(self, *args, **kw):
super().init(*args, **kw)
self._maintype = utils._sanitize(self._parse_tree.maintype)
self._subtype = utils._sanitize(self._parse_tree.subtype)
@property
def maintype(self):
return self._maintype
@property
def subtype(self):
return self._subtype
@property
def content_type(self):
return self.maintype + '/' + self.subtype
class ContentDispositionHeader(ParameterizedMIMEHeader):
value_parser = staticmethod(parser.parse_content_disposition_header)
def init(self, *args, **kw):
super().init(*args, **kw)
cd = self._parse_tree.content_disposition
self._content_disposition = cd if cd is None else utils._sanitize(cd)
@property
def content_disposition(self):
return self._content_disposition
class ContentTransferEncodingHeader:
max_count = 1
value_parser = staticmethod(parser.parse_content_transfer_encoding_header)
@classmethod
def parse(cls, value, kwds):
kwds['parse_tree'] = parse_tree = cls.value_parser(value)
kwds['decoded'] = str(parse_tree)
kwds['defects'].extend(parse_tree.all_defects)
def init(self, *args, **kw):
super().init(*args, **kw)
self._cte = utils._sanitize(self._parse_tree.cte)
@property
def cte(self):
return self._cte
# The header factory #
_default_header_map = {
'subject': UniqueUnstructuredHeader,
'date': UniqueDateHeader,
'resent-date': DateHeader,
'orig-date': UniqueDateHeader,
'sender': UniqueSingleAddressHeader,
'resent-sender': SingleAddressHeader,
'to': UniqueAddressHeader,
'resent-to': AddressHeader,
'cc': UniqueAddressHeader,
'resent-cc': AddressHeader,
'bcc': UniqueAddressHeader,
'resent-bcc': AddressHeader,
'from': UniqueAddressHeader,
'resent-from': AddressHeader,
'reply-to': UniqueAddressHeader,
'subject': UniqueUnstructuredHeader,
'date': UniqueDateHeader,
'resent-date': DateHeader,
'orig-date': UniqueDateHeader,
'sender': UniqueSingleAddressHeader,
'resent-sender': SingleAddressHeader,
'to': UniqueAddressHeader,
'resent-to': AddressHeader,
'cc': UniqueAddressHeader,
'resent-cc': AddressHeader,
'bcc': UniqueAddressHeader,
'resent-bcc': AddressHeader,
'from': UniqueAddressHeader,
'resent-from': AddressHeader,
'reply-to': UniqueAddressHeader,
'mime-version': MIMEVersionHeader,
'content-type': ContentTypeHeader,
'content-disposition': ContentDispositionHeader,
'content-transfer-encoding': ContentTransferEncodingHeader,
}
class HeaderRegistry:

View File

@ -3,7 +3,7 @@ import unittest
from email import _header_value_parser as parser
from email import errors
from email import policy
from test.test_email import TestEmailBase
from test.test_email import TestEmailBase, parameterize
class TestTokens(TestEmailBase):
@ -28,7 +28,32 @@ class TestTokens(TestEmailBase):
self.assertDefectsEqual(parts[2].all_defects, [errors.UndecodableBytesDefect])
class TestParser(TestEmailBase):
class TestParserMixin:
def _assert_results(self, tl, rest, string, value, defects, remainder,
comments=None):
self.assertEqual(str(tl), string)
self.assertEqual(tl.value, value)
self.assertDefectsEqual(tl.all_defects, defects)
self.assertEqual(rest, remainder)
if comments is not None:
self.assertEqual(tl.comments, comments)
def _test_get_x(self, method, source, string, value, defects,
remainder, comments=None):
tl, rest = method(source)
self._assert_results(tl, rest, string, value, defects, remainder,
comments=None)
return tl
def _test_parse_x(self, method, input, string, value, defects,
comments=None):
tl = method(input)
self._assert_results(tl, '', string, value, defects, '', comments)
return tl
class TestParser(TestParserMixin, TestEmailBase):
# _wsp_splitter
@ -49,19 +74,6 @@ class TestParser(TestEmailBase):
['foo', ' \t ', 'def jik'])
# test harness
def _test_get_x(self, method, input, string, value, defects,
remainder, comments=None):
token, rest = method(input)
self.assertEqual(str(token), string)
self.assertEqual(token.value, value)
self.assertDefectsEqual(token.all_defects, defects)
self.assertEqual(rest, remainder)
if comments is not None:
self.assertEqual(token.comments, comments)
return token
# get_fws
def test_get_fws_only(self):
@ -2390,6 +2402,67 @@ class TestParser(TestEmailBase):
str(address_list.mailboxes[2]))
@parameterize
class Test_parse_mime_version(TestParserMixin, TestEmailBase):
def mime_version_as_value(self,
value,
tl_str,
tl_value,
major,
minor,
defects):
mime_version = self._test_parse_x(parser.parse_mime_version,
value, tl_str, tl_value, defects)
self.assertEqual(mime_version.major, major)
self.assertEqual(mime_version.minor, minor)
mime_version_params = {
'rfc_2045_1': (
'1.0',
'1.0',
'1.0',
1,
0,
[]),
'RFC_2045_2': (
'1.0 (produced by MetaSend Vx.x)',
'1.0 (produced by MetaSend Vx.x)',
'1.0 ',
1,
0,
[]),
'RFC_2045_3': (
'(produced by MetaSend Vx.x) 1.0',
'(produced by MetaSend Vx.x) 1.0',
' 1.0',
1,
0,
[]),
'RFC_2045_4': (
'1.(produced by MetaSend Vx.x)0',
'1.(produced by MetaSend Vx.x)0',
'1. 0',
1,
0,
[]),
'empty': (
'',
'',
'',
None,
None,
[errors.HeaderMissingRequiredValue]),
}
class TestFolding(TestEmailBase):
policy = policy.default

View File

@ -259,6 +259,7 @@ class TestMessageAPI(TestEmailBase):
self.assertTrue(lines[0].startswith('From '))
eq(text, NL.join(lines[1:]))
# test_headerregistry.TestContentTypeHeader.bad_params
def test_bad_param(self):
msg = email.message_from_string("Content-Type: blarg; baz; boo\n")
self.assertEqual(msg.get_param('baz'), '')
@ -292,6 +293,7 @@ class TestMessageAPI(TestEmailBase):
eq(msg.get_params(header='x-header'),
[('foo', ''), ('bar', 'one'), ('baz', 'two')])
# test_headerregistry.TestContentTypeHeader.spaces_around_param_equals
def test_get_param_liberal(self):
msg = Message()
msg['Content-Type'] = 'Content-Type: Multipart/mixed; boundary = "CPIMSSMTPC06p5f3tG"'
@ -314,10 +316,12 @@ class TestMessageAPI(TestEmailBase):
# msg.get_param("weird")
# yet.
# test_headerregistry.TestContentTypeHeader.spaces_around_semis
def test_get_param_funky_continuation_lines(self):
msg = self._msgobj('msg_22.txt')
self.assertEqual(msg.get_payload(1).get_param('name'), 'wibble.JPG')
# test_headerregistry.TestContentTypeHeader.semis_inside_quotes
def test_get_param_with_semis_in_quotes(self):
msg = email.message_from_string(
'Content-Type: image/pjpeg; name="Jim&amp;&amp;Jill"\n')
@ -325,6 +329,7 @@ class TestMessageAPI(TestEmailBase):
self.assertEqual(msg.get_param('name', unquote=False),
'"Jim&amp;&amp;Jill"')
# test_headerregistry.TestContentTypeHeader.quotes_inside_rfc2231_value
def test_get_param_with_quotes(self):
msg = email.message_from_string(
'Content-Type: foo; bar*0="baz\\"foobar"; bar*1="\\"baz"')
@ -1885,6 +1890,7 @@ class TestNonConformant(TestEmailBase):
"\nContent-Transfer-Encoding: {}".format(cte)))
self.assertEqual(len(msg.defects), 0)
# test_headerregistry.TestContentTyopeHeader invalid_1 and invalid_2.
def test_invalid_content_type(self):
eq = self.assertEqual
neq = self.ndiffAssertEqual
@ -3437,6 +3443,7 @@ class Test8BitBytesHandling(unittest.TestCase):
self.assertEqual(msg.get_content_maintype(), "text")
self.assertEqual(msg.get_content_subtype(), "pl\uFFFDin")
# test_headerregistry.TestContentTypeHeader.non_ascii_in_params
def test_get_params_with_8bit(self):
msg = email.message_from_bytes(
'X-Header: foo=\xa7ne; b\xa7r=two; baz=three\n'.encode('latin-1'))
@ -3446,6 +3453,7 @@ class Test8BitBytesHandling(unittest.TestCase):
# XXX: someday you might be able to get 'b\xa7r', for now you can't.
self.assertEqual(msg.get_param('b\xa7r', header='x-header'), None)
# test_headerregistry.TestContentTypeHeader.non_ascii_in_rfc2231_value
def test_get_rfc2231_params_with_8bit(self):
msg = email.message_from_bytes(textwrap.dedent("""\
Content-Type: text/plain; charset=us-ascii;
@ -4491,6 +4499,9 @@ A very long line that must get split to something other than at the
# Test RFC 2231 header parameters (en/de)coding
class TestRFC2231(TestEmailBase):
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_with_double_quotes
# test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_inside_double_quotes
def test_get_param(self):
eq = self.assertEqual
msg = self._msgobj('msg_29.txt')
@ -4576,11 +4587,15 @@ Do you like this message?
-Me
""")
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_charset
# I changed the charset name, though, because the one in the file isn't
# a legal charset name. Should add a test for an illegal charset.
def test_rfc2231_get_content_charset(self):
eq = self.assertEqual
msg = self._msgobj('msg_32.txt')
eq(msg.get_content_charset(), 'us-ascii')
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_no_double_quotes
def test_rfc2231_parse_rfc_quoting(self):
m = textwrap.dedent('''\
Content-Disposition: inline;
@ -4594,6 +4609,7 @@ Do you like this message?
'This is even more ***fun*** is it not.pdf')
self.assertEqual(m, msg.as_string())
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_with_double_quotes
def test_rfc2231_parse_extra_quoting(self):
m = textwrap.dedent('''\
Content-Disposition: inline;
@ -4607,6 +4623,9 @@ Do you like this message?
'This is even more ***fun*** is it not.pdf')
self.assertEqual(m, msg.as_string())
# test_headerregistry.TestContentTypeHeader.rfc2231_no_language_or_charset
# but new test uses *0* because otherwise lang/charset is not valid.
# test_headerregistry.TestContentTypeHeader.rfc2231_segmented_normal_values
def test_rfc2231_no_language_or_charset(self):
m = '''\
Content-Transfer-Encoding: 8bit
@ -4621,6 +4640,7 @@ Content-Type: text/html; NAME*0=file____C__DOCUMENTS_20AND_20SETTINGS_FABIEN_LOC
param,
'file____C__DOCUMENTS_20AND_20SETTINGS_FABIEN_LOCAL_20SETTINGS_TEMP_nsmail.htm')
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_no_charset
def test_rfc2231_no_language_or_charset_in_filename(self):
m = '''\
Content-Disposition: inline;
@ -4633,6 +4653,7 @@ Content-Disposition: inline;
self.assertEqual(msg.get_filename(),
'This is even more ***fun*** is it not.pdf')
# Duplicate of previous test?
def test_rfc2231_no_language_or_charset_in_filename_encoded(self):
m = '''\
Content-Disposition: inline;
@ -4645,6 +4666,8 @@ Content-Disposition: inline;
self.assertEqual(msg.get_filename(),
'This is even more ***fun*** is it not.pdf')
# test_headerregistry.TestContentTypeHeader.rfc2231_partly_encoded,
# but the test below is wrong (the first part should be decoded).
def test_rfc2231_partly_encoded(self):
m = '''\
Content-Disposition: inline;
@ -4696,6 +4719,7 @@ Content-Type: text/plain;
self.assertEqual(msg.get_content_charset(),
'this is even more ***fun*** is it not.pdf')
# test_headerregistry.TestContentTypeHeader.rfc2231_unknown_charset_treated_as_ascii
def test_rfc2231_bad_encoding_in_filename(self):
m = '''\
Content-Disposition: inline;
@ -4762,6 +4786,7 @@ Content-Type: application/x-foo;
eq(language, None)
eq(s, "Frank's Document")
# test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_inside_double_quotes
def test_rfc2231_single_tick_in_filename(self):
m = """\
Content-Type: application/x-foo; name*0=\"Frank's\"; name*1=\" Document\"
@ -4772,6 +4797,7 @@ Content-Type: application/x-foo; name*0=\"Frank's\"; name*1=\" Document\"
self.assertFalse(isinstance(param, tuple))
self.assertEqual(param, "Frank's Document")
# test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_in_value_with_charset_and_lang
def test_rfc2231_tick_attack_extended(self):
eq = self.assertEqual
m = """\
@ -4785,6 +4811,7 @@ Content-Type: application/x-foo;
eq(language, 'en-us')
eq(s, "Frank's Document")
# test_headerregistry.TestContentTypeHeader.rfc2231_single_quote_in_non_encoded_value
def test_rfc2231_tick_attack(self):
m = """\
Content-Type: application/x-foo;
@ -4796,6 +4823,7 @@ Content-Type: application/x-foo;
self.assertFalse(isinstance(param, tuple))
self.assertEqual(param, "us-ascii'en-us'Frank's Document")
# test_headerregistry.TestContentTypeHeader.rfc2231_single_quotes_inside_quotes
def test_rfc2231_no_extended_values(self):
eq = self.assertEqual
m = """\
@ -4805,6 +4833,7 @@ Content-Type: application/x-foo; name=\"Frank's Document\"
msg = email.message_from_string(m)
eq(msg.get_param('name'), "Frank's Document")
# test_headerregistry.TestContentTypeHeader.rfc2231_encoded_then_unencoded_segments
def test_rfc2231_encoded_then_unencoded_segments(self):
eq = self.assertEqual
m = """\
@ -4820,6 +4849,8 @@ Content-Type: application/x-foo;
eq(language, 'en-us')
eq(s, 'My Document For You')
# test_headerregistry.TestContentTypeHeader.rfc2231_unencoded_then_encoded_segments
# test_headerregistry.TestContentTypeHeader.rfc2231_quoted_unencoded_then_encoded_segments
def test_rfc2231_unencoded_then_encoded_segments(self):
eq = self.assertEqual
m = """\

View File

@ -9,6 +9,9 @@ from email import headerregistry
from email.headerregistry import Address, Group
DITTO = object()
class TestHeaderRegistry(TestEmailBase):
def test_arbitrary_name_unstructured(self):
@ -175,6 +178,789 @@ class TestDateHeader(TestHeaderBase):
self.assertEqual(m['Date'].datetime, self.dt)
@parameterize
class TestContentTypeHeader(TestHeaderBase):
def content_type_as_value(self,
source,
content_type,
maintype,
subtype,
*args):
l = len(args)
parmdict = args[0] if l>0 else {}
defects = args[1] if l>1 else []
decoded = args[2] if l>2 and args[2] is not DITTO else source
header = 'Content-Type:' + ' ' if source else ''
folded = args[3] if l>3 else header + source + '\n'
h = self.make_header('Content-Type', source)
self.assertEqual(h.content_type, content_type)
self.assertEqual(h.maintype, maintype)
self.assertEqual(h.subtype, subtype)
self.assertEqual(h.params, parmdict)
self.assertDefectsEqual(h.defects, defects)
self.assertEqual(h, decoded)
self.assertEqual(h.fold(policy=policy.default), folded)
content_type_params = {
# Examples from RFC 2045.
'RFC_2045_1': (
'text/plain; charset=us-ascii (Plain text)',
'text/plain',
'text',
'plain',
{'charset': 'us-ascii'},
[],
'text/plain; charset="us-ascii"'),
'RFC_2045_2': (
'text/plain; charset=us-ascii',
'text/plain',
'text',
'plain',
{'charset': 'us-ascii'},
[],
'text/plain; charset="us-ascii"'),
'RFC_2045_3': (
'text/plain; charset="us-ascii"',
'text/plain',
'text',
'plain',
{'charset': 'us-ascii'}),
# RFC 2045 5.2 says syntactically invalid values are to be treated as
# text/plain.
'no_subtype_in_content_type': (
'text/',
'text/plain',
'text',
'plain',
{},
[errors.InvalidHeaderDefect]),
'no_slash_in_content_type': (
'foo',
'text/plain',
'text',
'plain',
{},
[errors.InvalidHeaderDefect]),
'junk_text_in_content_type': (
'<crazy "stuff">',
'text/plain',
'text',
'plain',
{},
[errors.InvalidHeaderDefect]),
'too_many_slashes_in_content_type': (
'image/jpeg/foo',
'text/plain',
'text',
'plain',
{},
[errors.InvalidHeaderDefect]),
# But unknown names are OK. We could make non-IANA names a defect, but
# by not doing so we make ourselves future proof. The fact that they
# are unknown will be detectable by the fact that they don't appear in
# the mime_registry...and the application is free to extend that list
# to handle them even if the core library doesn't.
'unknown_content_type': (
'bad/names',
'bad/names',
'bad',
'names'),
# The content type is case insensitive, and CFWS is ignored.
'mixed_case_content_type': (
'ImAge/JPeg',
'image/jpeg',
'image',
'jpeg'),
'spaces_in_content_type': (
' text / plain ',
'text/plain',
'text',
'plain'),
'cfws_in_content_type': (
'(foo) text (bar)/(baz)plain(stuff)',
'text/plain',
'text',
'plain'),
# test some parameters (more tests could be added for parameters
# associated with other content types, but since parameter parsing is
# generic they would be redundant for the current implementation).
'charset_param': (
'text/plain; charset="utf-8"',
'text/plain',
'text',
'plain',
{'charset': 'utf-8'}),
'capitalized_charset': (
'text/plain; charset="US-ASCII"',
'text/plain',
'text',
'plain',
{'charset': 'US-ASCII'}),
'unknown_charset': (
'text/plain; charset="fOo"',
'text/plain',
'text',
'plain',
{'charset': 'fOo'}),
'capitalized_charset_param_name_and_comment': (
'text/plain; (interjection) Charset="utf-8"',
'text/plain',
'text',
'plain',
{'charset': 'utf-8'},
[],
# Should the parameter name be lowercased here?
'text/plain; Charset="utf-8"'),
# Since this is pretty much the ur-mimeheader, we'll put all the tests
# that exercise the parameter parsing and formatting here.
#
# XXX: question: is minimal quoting preferred?
'unquoted_param_value': (
'text/plain; title=foo',
'text/plain',
'text',
'plain',
{'title': 'foo'},
[],
'text/plain; title="foo"'),
'param_value_with_tspecials': (
'text/plain; title="(bar)foo blue"',
'text/plain',
'text',
'plain',
{'title': '(bar)foo blue'}),
'param_with_extra_quoted_whitespace': (
'text/plain; title=" a loong way \t home "',
'text/plain',
'text',
'plain',
{'title': ' a loong way \t home '}),
'bad_params': (
'blarg; baz; boo',
'text/plain',
'text',
'plain',
{'baz': '', 'boo': ''},
[errors.InvalidHeaderDefect]*3),
'spaces_around_param_equals': (
'Multipart/mixed; boundary = "CPIMSSMTPC06p5f3tG"',
'multipart/mixed',
'multipart',
'mixed',
{'boundary': 'CPIMSSMTPC06p5f3tG'},
[],
'Multipart/mixed; boundary="CPIMSSMTPC06p5f3tG"'),
'spaces_around_semis': (
('image/jpeg; name="wibble.JPG" ; x-mac-type="4A504547" ; '
'x-mac-creator="474B4F4E"'),
'image/jpeg',
'image',
'jpeg',
{'name': 'wibble.JPG',
'x-mac-type': '4A504547',
'x-mac-creator': '474B4F4E'},
[],
('image/jpeg; name="wibble.JPG"; x-mac-type="4A504547"; '
'x-mac-creator="474B4F4E"'),
# XXX: it could be that we will eventually prefer to fold starting
# from the decoded value, in which case these spaces and similar
# spaces in other tests will be wrong.
('Content-Type: image/jpeg; name="wibble.JPG" ; '
'x-mac-type="4A504547" ;\n'
' x-mac-creator="474B4F4E"\n'),
),
'semis_inside_quotes': (
'image/jpeg; name="Jim&amp;&amp;Jill"',
'image/jpeg',
'image',
'jpeg',
{'name': 'Jim&amp;&amp;Jill'}),
'single_quotes_inside_quotes': (
'image/jpeg; name="Jim \'Bob\' Jill"',
'image/jpeg',
'image',
'jpeg',
{'name': "Jim 'Bob' Jill"}),
'double_quotes_inside_quotes': (
r'image/jpeg; name="Jim \"Bob\" Jill"',
'image/jpeg',
'image',
'jpeg',
{'name': 'Jim "Bob" Jill'},
[],
r'image/jpeg; name="Jim \"Bob\" Jill"'),
# XXX: This test works except for the refolding of the header. I'll
# deal with that bug when I deal with the other folding bugs.
#'non_ascii_in_params': (
# ('foo\xa7/bar; b\xa7r=two; '
# 'baz=thr\xa7e'.encode('latin-1').decode('us-ascii',
# 'surrogateescape')),
# 'foo\uFFFD/bar',
# 'foo\uFFFD',
# 'bar',
# {'b\uFFFDr': 'two', 'baz': 'thr\uFFFDe'},
# [errors.UndecodableBytesDefect]*3,
# 'foo<6F>/bar; b<>r="two"; baz="thr<68>e"',
# ),
# RFC 2231 parameter tests.
'rfc2231_segmented_normal_values': (
'image/jpeg; name*0="abc"; name*1=".html"',
'image/jpeg',
'image',
'jpeg',
{'name': "abc.html"},
[],
'image/jpeg; name="abc.html"'),
'quotes_inside_rfc2231_value': (
r'image/jpeg; bar*0="baz\"foobar"; bar*1="\"baz"',
'image/jpeg',
'image',
'jpeg',
{'bar': 'baz"foobar"baz'},
[],
r'image/jpeg; bar="baz\"foobar\"baz"'),
# XXX: This test works except for the refolding of the header. I'll
# deal with that bug when I deal with the other folding bugs.
#'non_ascii_rfc2231_value': (
# ('text/plain; charset=us-ascii; '
# "title*=us-ascii'en'This%20is%20"
# 'not%20f\xa7n').encode('latin-1').decode('us-ascii',
# 'surrogateescape'),
# 'text/plain',
# 'text',
# 'plain',
# {'charset': 'us-ascii', 'title': 'This is not f\uFFFDn'},
# [errors.UndecodableBytesDefect],
# 'text/plain; charset="us-ascii"; title="This is not f<>n"'),
'rfc2231_encoded_charset': (
'text/plain; charset*=ansi-x3.4-1968\'\'us-ascii',
'text/plain',
'text',
'plain',
{'charset': 'us-ascii'},
[],
'text/plain; charset="us-ascii"'),
# This follows the RFC: no double quotes around encoded values.
'rfc2231_encoded_no_double_quotes': (
("text/plain;"
"\tname*0*=''This%20is%20;"
"\tname*1*=%2A%2A%2Afun%2A%2A%2A%20;"
'\tname*2="is it not.pdf"'),
'text/plain',
'text',
'plain',
{'name': 'This is ***fun*** is it not.pdf'},
[],
'text/plain; name="This is ***fun*** is it not.pdf"',
('Content-Type: text/plain;\tname*0*=\'\'This%20is%20;\n'
'\tname*1*=%2A%2A%2Afun%2A%2A%2A%20;\tname*2="is it not.pdf"\n'),
),
# Make sure we also handle it if there are spurrious double qoutes.
'rfc2231_encoded_with_double_quotes': (
("text/plain;"
'\tname*0*="us-ascii\'\'This%20is%20even%20more%20";'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";'
'\tname*2="is it not.pdf"'),
'text/plain',
'text',
'plain',
{'name': 'This is even more ***fun*** is it not.pdf'},
[errors.InvalidHeaderDefect]*2,
'text/plain; name="This is even more ***fun*** is it not.pdf"',
('Content-Type: text/plain;\t'
'name*0*="us-ascii\'\'This%20is%20even%20more%20";\n'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it not.pdf"\n'),
),
'rfc2231_single_quote_inside_double_quotes': (
('text/plain; charset=us-ascii;'
'\ttitle*0*="us-ascii\'en\'This%20is%20really%20";'
'\ttitle*1*="%2A%2A%2Afun%2A%2A%2A%20";'
'\ttitle*2="isn\'t it!"'),
'text/plain',
'text',
'plain',
{'charset': 'us-ascii', 'title': "This is really ***fun*** isn't it!"},
[errors.InvalidHeaderDefect]*2,
('text/plain; charset="us-ascii"; '
'title="This is really ***fun*** isn\'t it!"'),
('Content-Type: text/plain; charset=us-ascii;\n'
'\ttitle*0*="us-ascii\'en\'This%20is%20really%20";\n'
'\ttitle*1*="%2A%2A%2Afun%2A%2A%2A%20";\ttitle*2="isn\'t it!"\n'),
),
'rfc2231_single_quote_in_value_with_charset_and_lang': (
('application/x-foo;'
"\tname*0*=\"us-ascii'en-us'Frank's\"; name*1*=\" Document\""),
'application/x-foo',
'application',
'x-foo',
{'name': "Frank's Document"},
[errors.InvalidHeaderDefect]*2,
'application/x-foo; name="Frank\'s Document"',
('Content-Type: application/x-foo;\t'
'name*0*="us-ascii\'en-us\'Frank\'s";\n'
' name*1*=" Document"\n'),
),
'rfc2231_single_quote_in_non_encoded_value': (
('application/x-foo;'
"\tname*0=\"us-ascii'en-us'Frank's\"; name*1=\" Document\""),
'application/x-foo',
'application',
'x-foo',
{'name': "us-ascii'en-us'Frank's Document"},
[],
'application/x-foo; name="us-ascii\'en-us\'Frank\'s Document"',
('Content-Type: application/x-foo;\t'
'name*0="us-ascii\'en-us\'Frank\'s";\n'
' name*1=" Document"\n'),
),
'rfc2231_no_language_or_charset': (
'text/plain; NAME*0*=english_is_the_default.html',
'text/plain',
'text',
'plain',
{'name': 'english_is_the_default.html'},
[errors.InvalidHeaderDefect],
'text/plain; NAME="english_is_the_default.html"'),
'rfc2231_encoded_no_charset': (
("text/plain;"
'\tname*0*="\'\'This%20is%20even%20more%20";'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";'
'\tname*2="is it.pdf"'),
'text/plain',
'text',
'plain',
{'name': 'This is even more ***fun*** is it.pdf'},
[errors.InvalidHeaderDefect]*2,
'text/plain; name="This is even more ***fun*** is it.pdf"',
('Content-Type: text/plain;\t'
'name*0*="\'\'This%20is%20even%20more%20";\n'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'),
),
# XXX: see below...the first name line here should be *0 not *0*.
'rfc2231_partly_encoded': (
("text/plain;"
'\tname*0*="\'\'This%20is%20even%20more%20";'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";'
'\tname*2="is it.pdf"'),
'text/plain',
'text',
'plain',
{'name': 'This is even more ***fun*** is it.pdf'},
[errors.InvalidHeaderDefect]*2,
'text/plain; name="This is even more ***fun*** is it.pdf"',
('Content-Type: text/plain;\t'
'name*0*="\'\'This%20is%20even%20more%20";\n'
'\tname*1*="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'),
),
'rfc2231_partly_encoded_2': (
("text/plain;"
'\tname*0*="\'\'This%20is%20even%20more%20";'
'\tname*1="%2A%2A%2Afun%2A%2A%2A%20";'
'\tname*2="is it.pdf"'),
'text/plain',
'text',
'plain',
{'name': 'This is even more %2A%2A%2Afun%2A%2A%2A%20is it.pdf'},
[errors.InvalidHeaderDefect],
'text/plain; name="This is even more %2A%2A%2Afun%2A%2A%2A%20is it.pdf"',
('Content-Type: text/plain;\t'
'name*0*="\'\'This%20is%20even%20more%20";\n'
'\tname*1="%2A%2A%2Afun%2A%2A%2A%20";\tname*2="is it.pdf"\n'),
),
'rfc2231_unknown_charset_treated_as_ascii': (
"text/plain; name*0*=bogus'xx'ascii_is_the_default",
'text/plain',
'text',
'plain',
{'name': 'ascii_is_the_default'},
[],
'text/plain; name="ascii_is_the_default"'),
'rfc2231_bad_character_in_charset_parameter_value': (
"text/plain; charset*=ascii''utf-8%E2%80%9D",
'text/plain',
'text',
'plain',
{'charset': 'utf-8\uFFFD\uFFFD\uFFFD'},
[errors.UndecodableBytesDefect],
'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'),
'rfc2231_encoded_then_unencoded_segments': (
('application/x-foo;'
'\tname*0*="us-ascii\'en-us\'My";'
'\tname*1=" Document";'
'\tname*2=" For You"'),
'application/x-foo',
'application',
'x-foo',
{'name': 'My Document For You'},
[errors.InvalidHeaderDefect],
'application/x-foo; name="My Document For You"',
('Content-Type: application/x-foo;\t'
'name*0*="us-ascii\'en-us\'My";\n'
'\tname*1=" Document";\tname*2=" For You"\n'),
),
# My reading of the RFC is that this is an invalid header. The RFC
# says that if charset and language information is given, the first
# segment *must* be encoded.
'rfc2231_unencoded_then_encoded_segments': (
('application/x-foo;'
'\tname*0=us-ascii\'en-us\'My;'
'\tname*1*=" Document";'
'\tname*2*=" For You"'),
'application/x-foo',
'application',
'x-foo',
{'name': 'My Document For You'},
[errors.InvalidHeaderDefect]*3,
'application/x-foo; name="My Document For You"',
("Content-Type: application/x-foo;\tname*0=us-ascii'en-us'My;\t"
# XXX: the newline is in the wrong place, come back and fix
# this when the rest of tests pass.
'name*1*=" Document"\n;'
'\tname*2*=" For You"\n'),
),
# XXX: I would say this one should default to ascii/en for the
# "encoded" segment, since the the first segment is not encoded and is
# in double quotes, making the value a valid non-encoded string. The
# old parser decodes this just like the previous case, which may be the
# better Postel rule, but could equally result in borking headers that
# intentially have quoted quotes in them. We could get this 98% right
# if we treat it as a quoted string *unless* it matches the
# charset'lang'value pattern exactly *and* there is at least one
# encoded segment. Implementing that algorithm will require some
# refactoring, so I haven't done it (yet).
'rfc2231_qouted_unencoded_then_encoded_segments': (
('application/x-foo;'
'\tname*0="us-ascii\'en-us\'My";'
'\tname*1*=" Document";'
'\tname*2*=" For You"'),
'application/x-foo',
'application',
'x-foo',
{'name': "us-ascii'en-us'My Document For You"},
[errors.InvalidHeaderDefect]*2,
'application/x-foo; name="us-ascii\'en-us\'My Document For You"',
('Content-Type: application/x-foo;\t'
'name*0="us-ascii\'en-us\'My";\n'
'\tname*1*=" Document";\tname*2*=" For You"\n'),
),
}
@parameterize
class TestContentTransferEncoding(TestHeaderBase):
def cte_as_value(self,
source,
cte,
*args):
l = len(args)
defects = args[0] if l>0 else []
decoded = args[1] if l>1 and args[1] is not DITTO else source
header = 'Content-Transfer-Encoding:' + ' ' if source else ''
folded = args[2] if l>2 else header + source + '\n'
h = self.make_header('Content-Transfer-Encoding', source)
self.assertEqual(h.cte, cte)
self.assertDefectsEqual(h.defects, defects)
self.assertEqual(h, decoded)
self.assertEqual(h.fold(policy=policy.default), folded)
cte_params = {
'RFC_2183_1': (
'base64',
'base64',),
'no_value': (
'',
'7bit',
[errors.HeaderMissingRequiredValue],
'',
'Content-Transfer-Encoding:\n',
),
'junk_after_cte': (
'7bit and a bunch more',
'7bit',
[errors.InvalidHeaderDefect]),
}
@parameterize
class TestContentDisposition(TestHeaderBase):
def content_disp_as_value(self,
source,
content_disposition,
*args):
l = len(args)
parmdict = args[0] if l>0 else {}
defects = args[1] if l>1 else []
decoded = args[2] if l>2 and args[2] is not DITTO else source
header = 'Content-Disposition:' + ' ' if source else ''
folded = args[3] if l>3 else header + source + '\n'
h = self.make_header('Content-Disposition', source)
self.assertEqual(h.content_disposition, content_disposition)
self.assertEqual(h.params, parmdict)
self.assertDefectsEqual(h.defects, defects)
self.assertEqual(h, decoded)
self.assertEqual(h.fold(policy=policy.default), folded)
content_disp_params = {
# Examples from RFC 2183.
'RFC_2183_1': (
'inline',
'inline',),
'RFC_2183_2': (
('attachment; filename=genome.jpeg;'
' modification-date="Wed, 12 Feb 1997 16:29:51 -0500";'),
'attachment',
{'filename': 'genome.jpeg',
'modification-date': 'Wed, 12 Feb 1997 16:29:51 -0500'},
[],
('attachment; filename="genome.jpeg"; '
'modification-date="Wed, 12 Feb 1997 16:29:51 -0500"'),
('Content-Disposition: attachment; filename=genome.jpeg;\n'
' modification-date="Wed, 12 Feb 1997 16:29:51 -0500";\n'),
),
'no_value': (
'',
None,
{},
[errors.HeaderMissingRequiredValue],
'',
'Content-Disposition:\n'),
'invalid_value': (
'ab./k',
'ab.',
{},
[errors.InvalidHeaderDefect]),
'invalid_value_with_params': (
'ab./k; filename="foo"',
'ab.',
{'filename': 'foo'},
[errors.InvalidHeaderDefect]),
}
@parameterize
class TestMIMEVersionHeader(TestHeaderBase):
def version_string_as_MIME_Version(self,
source,
decoded,
version,
major,
minor,
defects):
h = self.make_header('MIME-Version', source)
self.assertEqual(h, decoded)
self.assertEqual(h.version, version)
self.assertEqual(h.major, major)
self.assertEqual(h.minor, minor)
self.assertDefectsEqual(h.defects, defects)
if source:
source = ' ' + source
self.assertEqual(h.fold(policy=policy.default),
'MIME-Version:' + source + '\n')
version_string_params = {
# Examples from the RFC.
'RFC_2045_1': (
'1.0',
'1.0',
'1.0',
1,
0,
[]),
'RFC_2045_2': (
'1.0 (produced by MetaSend Vx.x)',
'1.0 (produced by MetaSend Vx.x)',
'1.0',
1,
0,
[]),
'RFC_2045_3': (
'(produced by MetaSend Vx.x) 1.0',
'(produced by MetaSend Vx.x) 1.0',
'1.0',
1,
0,
[]),
'RFC_2045_4': (
'1.(produced by MetaSend Vx.x)0',
'1.(produced by MetaSend Vx.x)0',
'1.0',
1,
0,
[]),
# Other valid values.
'1_1': (
'1.1',
'1.1',
'1.1',
1,
1,
[]),
'2_1': (
'2.1',
'2.1',
'2.1',
2,
1,
[]),
'whitespace': (
'1 .0',
'1 .0',
'1.0',
1,
0,
[]),
'leading_trailing_whitespace_ignored': (
' 1.0 ',
' 1.0 ',
'1.0',
1,
0,
[]),
# Recoverable invalid values. We can recover here only because we
# already have a valid value by the time we encounter the garbage.
# Anywhere else, and we don't know where the garbage ends.
'non_comment_garbage_after': (
'1.0 <abc>',
'1.0 <abc>',
'1.0',
1,
0,
[errors.InvalidHeaderDefect]),
# Unrecoverable invalid values. We *could* apply more heuristics to
# get someing out of the first two, but doing so is not worth the
# effort.
'non_comment_garbage_before': (
'<abc> 1.0',
'<abc> 1.0',
None,
None,
None,
[errors.InvalidHeaderDefect]),
'non_comment_garbage_inside': (
'1.<abc>0',
'1.<abc>0',
None,
None,
None,
[errors.InvalidHeaderDefect]),
'two_periods': (
'1..0',
'1..0',
None,
None,
None,
[errors.InvalidHeaderDefect]),
'2_x': (
'2.x',
'2.x',
None, # This could be 2, but it seems safer to make it None.
None,
None,
[errors.InvalidHeaderDefect]),
'foo': (
'foo',
'foo',
None,
None,
None,
[errors.InvalidHeaderDefect]),
'missing': (
'',
'',
None,
None,
None,
[errors.HeaderMissingRequiredValue]),
}
@parameterize
class TestAddressHeader(TestHeaderBase):