#18891: Complete new provisional email API.

This adds EmailMessage and, MIMEPart subclasses of Message
with new API methods, and a ContentManager class used by
the new methods.  Also a new policy setting, content_manager.

Patch was reviewed by Stephen J. Turnbull and Serhiy Storchaka,
and reflects their feedback.

I will ideally add some examples of using the new API to the
documentation before the final release.
This commit is contained in:
R David Murray 2013-10-16 22:48:40 -04:00
parent 1a16288197
commit 3da240fd01
15 changed files with 2539 additions and 26 deletions

View File

@ -0,0 +1,427 @@
:mod:`email.contentmanager`: Managing MIME Content
--------------------------------------------------
.. module:: email.contentmanager
:synopsis: Storing and Retrieving Content from MIME Parts
.. moduleauthor:: R. David Murray <rdmurray@bitdance.com>
.. sectionauthor:: R. David Murray <rdmurray@bitdance.com>
.. note::
The contentmanager module has been included in the standard library on a
:term:`provisional basis <provisional package>`. Backwards incompatible
changes (up to and including removal of the module) may occur if deemed
necessary by the core developers.
.. versionadded:: 3.4
as a :term:`provisional module <provisional package>`.
The :mod:`~email.message` module provides a class that can represent an
arbitrary email message. That basic message model has a useful and flexible
API, but it provides only a lower-level API for interacting with the generic
parts of a message (the headers, generic header parameters, and the payload,
which may be a list of sub-parts). This module provides classes and tools
that provide an enhanced and extensible API for dealing with various specific
types of content, including the ability to retrieve the content of the message
as a specialized object type rather than as a simple bytes object. The module
automatically takes care of the RFC-specified MIME details (required headers
and parameters, etc.) for the certain common content types content properties,
and support for additional types can be added by an application using the
extension mechanisms.
This module defines the eponymous "Content Manager" classes. The base
:class:`.ContentManager` class defines an API for registering content
management functions which extract data from ``Message`` objects or insert data
and headers into ``Message`` objects, thus providing a way of converting
between ``Message`` objects containing data and other representations of that
data (Python data types, specialized Python objects, external files, etc). The
module also defines one concrete content manager: :data:`raw_data_manager`
converts between MIME content types and ``str`` or ``bytes`` data. It also
provides a convenient API for managing the MIME parameters when inserting
content into ``Message``\ s. It also handles inserting and extracting
``Message`` objects when dealing with the ``message/rfc822`` content type.
Another part of the enhanced interface is subclasses of
:class:`~email.message.Message` that provide new convenience API functions,
including convenience methods for calling the Content Managers derived from
this module.
.. note::
Although :class:`.EmailMessage` and :class:`.MIMEPart` are currently
documented in this module because of the provisional nature of the code, the
implementation lives in the :mod:`email.message` module.
.. class:: EmailMessage(policy=default)
If *policy* is specified (it must be an instance of a :mod:`~email.policy`
class) use the rules it specifies to udpate and serialize the representation
of the message. If *policy* is not set, use the
:class:`~email.policy.default` policy, which follows the rules of the email
RFCs except for line endings (instead of the RFC mandated ``\r\n``, it uses
the Python standard ``\n`` line endings). For more information see the
:mod:`~email.policy` documentation.
This class is a subclass of :class:`~email.message.Message`. It adds
the following methods:
.. attribute:: is_attachment
Set to ``True`` if there is a :mailheader:`Content-Disposition` header
and its (case insensitive) value is ``attachment``, ``False`` otherwise.
.. method:: get_body(preferencelist=('related', 'html', 'plain'))
Return the MIME part that is the best candidate to be the "body" of the
message.
*preferencelist* must be a sequence of strings from the set ``related``,
``html``, and ``plain``, and indicates the order of preference for the
content type of the part returned.
Start looking for candidate matches with the object on which the
``get_body`` method is called.
If ``related`` is not included in *preferencelist*, consider the root
part (or subpart of the root part) of any related encountered as a
candidate if the (sub-)part matches a preference.
When encountering a ``multipart/related``, check the ``start`` parameter
and if a part with a matching :mailheader:`Content-ID` is found, consider
only it when looking for candidate matches. Otherwise consider only the
first (default root) part of the ``multipart/related``.
If a part has a :mailheader:``Content-Disposition`` header, only consider
the part a candidate match if the value of the header is ``inline``.
If none of the candidates matches any of the preferences in
*preferneclist*, return ``None``.
Notes: (1) For most applications the only *preferencelist* combinations
that really make sense are ``('plain',)``, ``('html', 'plain')``, and the
default, ``('related', 'html', 'plain')``. (2) Because matching starts
with the object on which ``get_body`` is called, calling ``get_body`` on
a ``multipart/related`` will return the object itself unless
*preferencelist* has a non-default value. (3) Messages (or message parts)
that do not specify a :mailheader:`Content-Type` or whose
:mailheader:`Content-Type` header is invalid will be treated as if they
are of type ``text/plain``, which may occasionally cause ``get_body`` to
return unexpected results.
.. method:: iter_attachments()
Return an iterator over all of the parts of the message that are not
candidate "body" parts. That is, skip the first occurrence of each of
``text/plain``, ``text/html``, ``multipart/related``, or
``multipart/alternative`` (unless they are explicitly marked as
attachments via :mailheader:`Content-Disposition: attachment`), and
return all remaining parts. When applied directly to a
``multipart/related``, return an iterator over the all the related parts
except the root part (ie: the part pointed to by the ``start`` parameter,
or the first part if there is no ``start`` parameter or the ``start``
parameter doesn't match the :mailheader:`Content-ID` of any of the
parts). When applied directly to a ``multipart/alternative`` or a
non-``multipart``, return an empty iterator.
.. method:: iter_parts()
Return an iterator over all of the immediate sub-parts of the message,
which will be empty for a non-``multipart``. (See also
:meth:``~email.message.walk``.)
.. method:: get_content(*args, content_manager=None, **kw)
Call the ``get_content`` method of the *content_manager*, passing self
as the message object, and passing along any other arguments or keywords
as additional arguments. If *content_manager* is not specified, use
the ``content_manager`` specified by the current :mod:`~email.policy`.
.. method:: set_content(*args, content_manager=None, **kw)
Call the ``set_content`` method of the *content_manager*, passing self
as the message object, and passing along any other arguments or keywords
as additional arguments. If *content_manager* is not specified, use
the ``content_manager`` specified by the current :mod:`~email.policy`.
.. method:: make_related(boundary=None)
Convert a non-``multipart`` message into a ``multipart/related`` message,
moving any existing :mailheader:`Content-` headers and payload into a
(new) first part of the ``multipart``. If *boundary* is specified, use
it as the boundary string in the multipart, otherwise leave the boundary
to be automatically created when it is needed (for example, when the
message is serialized).
.. method:: make_alternative(boundary=None)
Convert a non-``multipart`` or a ``multipart/related`` into a
``multipart/alternative``, moving any existing :mailheader:`Content-`
headers and payload into a (new) first part of the ``multipart``. If
*boundary* is specified, use it as the boundary string in the multipart,
otherwise leave the boundary to be automatically created when it is
needed (for example, when the message is serialized).
.. method:: make_mixed(boundary=None)
Convert a non-``multipart``, a ``multipart/related``, or a
``multipart-alternative`` into a ``multipart/mixed``, moving any existing
:mailheader:`Content-` headers and payload into a (new) first part of the
``multipart``. If *boundary* is specified, use it as the boundary string
in the multipart, otherwise leave the boundary to be automatically
created when it is needed (for example, when the message is serialized).
.. method:: add_related(*args, content_manager=None, **kw)
If the message is a ``multipart/related``, create a new message
object, pass all of the arguments to its :meth:`set_content` method,
and :meth:`~email.message.Message.attach` it to the ``multipart``. If
the message is a non-``multipart``, call :meth:`make_related` and then
proceed as above. If the message is any other type of ``multipart``,
raise a :exc:`TypeError`. If *content_manager* is not specified, use
the ``content_manager`` specified by the current :mod:`~email.policy`.
If the added part has no :mailheader:`Content-Disposition` header,
add one with the value ``inline``.
.. method:: add_alternative(*args, content_manager=None, **kw)
If the message is a ``multipart/alternative``, create a new message
object, pass all of the arguments to its :meth:`set_content` method, and
:meth:`~email.message.Message.attach` it to the ``multipart``. If the
message is a non-``multipart`` or ``multipart/related``, call
:meth:`make_alternative` and then proceed as above. If the message is
any other type of ``multipart``, raise a :exc:`TypeError`. If
*content_manager* is not specified, use the ``content_manager`` specified
by the current :mod:`~email.policy`.
.. method:: add_attachment(*args, content_manager=None, **kw)
If the message is a ``multipart/mixed``, create a new message object,
pass all of the arguments to its :meth:`set_content` method, and
:meth:`~email.message.Message.attach` it to the ``multipart``. If the
message is a non-``multipart``, ``multipart/related``, or
``multipart/alternative``, call :meth:`make_mixed` and then proceed as
above. If *content_manager* is not specified, use the ``content_manager``
specified by the current :mod:`~email.policy`. If the added part
has no :mailheader:`Content-Disposition` header, add one with the value
``attachment``. This method can be used both for explicit attachments
(:mailheader:`Content-Disposition: attachment` and ``inline`` attachments
(:mailheader:`Content-Disposition: inline`), by passing appropriate
options to the ``content_manager``.
.. method:: clear()
Remove the payload and all of the headers.
.. method:: clear_content()
Remove the payload and all of the :exc:`Content-` headers, leaving
all other headers intact and in their original order.
.. class:: ContentManager()
Base class for content managers. Provides the standard registry mechanisms
to register converters between MIME content and other representations, as
well as the ``get_content`` and ``set_content`` dispatch methods.
.. method:: get_content(msg, *args, **kw)
Look up a handler function based on the ``mimetype`` of *msg* (see next
paragraph), call it, passing through all arguments, and return the result
of the call. The expectation is that the handler will extract the
payload from *msg* and return an object that encodes information about
the extracted data.
To find the handler, look for the following keys in the registry,
stopping with the first one found:
* the string representing the full MIME type (``maintype/subtype``)
* the string representing the ``maintype``
* the empty string
If none of these keys produce a handler, raise a :exc:`KeyError` for the
full MIME type.
.. method:: set_content(msg, obj, *args, **kw)
If the ``maintype`` is ``multipart``, raise a :exc:`TypeError`; otherwise
look up a handler function based on the type of *obj* (see next
paragraph), call :meth:`~email.message.EmailMessage.clear_content` on the
*msg*, and call the handler function, passing through all arguments. The
expectation is that the handler will transform and store *obj* into
*msg*, possibly making other changes to *msg* as well, such as adding
various MIME headers to encode information needed to interpret the stored
data.
To find the handler, obtain the type of *obj* (``typ = type(obj)``), and
look for the following keys in the registry, stopping with the first one
found:
* the type itself (``typ``)
* the type's fully qualified name (``typ.__module__ + '.' +
typ.__qualname__``).
* the type's qualname (``typ.__qualname__``)
* the type's name (``typ.__name__``).
If none of the above match, repeat all of the checks above for each of
the types in the :term:`MRO` (``typ.__mro__``). Finally, if no other key
yields a handler, check for a handler for the key ``None``. If there is
no handler for ``None``, raise a :exc:`KeyError` for the fully
qualified name of the type.
Also add a :mailheader:`MIME-Version` header if one is not present (see
also :class:`.MIMEPart`).
.. method:: add_get_handler(key, handler)
Record the function *handler* as the handler for *key*. For the possible
values of *key*, see :meth:`get_content`.
.. method:: add_set_handler(typekey, handler)
Record *handler* as the function to call when an object of a type
matching *typekey* is passed to :meth:`set_content`. For the possible
values of *typekey*, see :meth:`set_content`.
.. class:: MIMEPart(policy=default)
This class represents a subpart of a MIME message. It is identical to
:class:`EmailMessage`, except that no :mailheader:`MIME-Version` headers are
added when :meth:`~EmailMessage.set_content` is called, since sub-parts do
not need their own :mailheader:`MIME-Version` headers.
Content Manager Instances
~~~~~~~~~~~~~~~~~~~~~~~~~
Currently the email package provides only one concrete content manager,
:data:`raw_data_manager`, although more may be added in the future.
:data:`raw_data_manager` is the
:attr:`~email.policy.EmailPolicy.content_manager` provided by
:attr:`~email.policy.EmailPolicy` and its derivatives.
.. data:: raw_data_manager
This content manager provides only a minimum interface beyond that provided
by :class:`~email.message.Message` itself: it deals only with text, raw
byte strings, and :class:`~email.message.Message` objects. Nevertheless, it
provides significant advantages compared to the base API: ``get_content`` on
a text part will return a unicode string without the application needing to
manually decode it, ``set_content`` provides a rich set of options for
controlling the headers added to a part and controlling the content transfer
encoding, and it enables the use of the various ``add_`` methods, thereby
simplifying the creation of multipart messages.
.. method:: get_content(msg, errors='replace')
Return the payload of the part as either a string (for ``text`` parts), a
:class:`~email.message.EmailMessage` object (for ``message/rfc822``
parts), or a ``bytes`` object (for all other non-multipart types). Raise
a :exc:`KeyError` if called on a ``multipart``. If the part is a
``text`` part and *errors* is specified, use it as the error handler when
decoding the payload to unicode. The default error handler is
``replace``.
.. method:: set_content(msg, <'str'>, subtype="plain", charset='utf-8' \
cte=None, \
disposition=None, filename=None, cid=None, \
params=None, headers=None)
set_content(msg, <'bytes'>, maintype, subtype, cte="base64", \
disposition=None, filename=None, cid=None, \
params=None, headers=None)
set_content(msg, <'Message'>, cte=None, \
disposition=None, filename=None, cid=None, \
params=None, headers=None)
set_content(msg, <'list'>, subtype='mixed', \
disposition=None, filename=None, cid=None, \
params=None, headers=None)
Add headers and payload to *msg*:
Add a :mailheader:`Content-Type` header with a ``maintype/subtype``
value.
* For ``str``, set the MIME ``maintype`` to ``text``, and set the
subtype to *subtype* if it is specified, or ``plain`` if it is not.
* For ``bytes``, use the specified *maintype* and *subtype*, or
raise a :exc:`TypeError` if they are not specified.
* For :class:`~email.message.Message` objects, set the maintype to
``message``, and set the subtype to *subtype* if it is specified
or ``rfc822`` if it is not. If *subtype* is ``partial``, raise an
error (``bytes`` objects must be used to construct
``message/partial`` parts).
* For *<'list'>*, which should be a list of
:class:`~email.message.Message` objects, set the ``maintype`` to
``multipart``, and the ``subtype`` to *subtype* if it is
specified, and ``mixed`` if it is not. If the message parts in
the *<'list'>* have :mailheader:`MIME-Version` headers, remove
them.
If *charset* is provided (which is valid only for ``str``), encode the
string to bytes using the specified character set. The default is
``utf-8``. If the specified *charset* is a known alias for a standard
MIME charset name, use the standard charset instead.
If *cte* is set, encode the payload using the specified content transfer
encoding, and set the :mailheader:`Content-Transfer-Endcoding` header to
that value. For ``str`` objects, if it is not set use heuristics to
determine the most compact encoding. Possible values for *cte* are
``quoted-printable``, ``base64``, ``7bit``, ``8bit``, and ``binary``.
If the input cannot be encoded in the specified encoding (eg: ``7bit``),
raise a :exc:`ValueError`. For :class:`~email.message.Message`, per
:rfc:`2046`, raise an error if a *cte* of ``quoted-printable`` or
``base64`` is requested for *subtype* ``rfc822``, and for any *cte*
other than ``7bit`` for *subtype* ``external-body``. For
``message/rfc822``, use ``8bit`` if *cte* is not specified. For all
other values of *subtype*, use ``7bit``.
.. note:: A *cte* of ``binary`` does not actually work correctly yet.
The ``Message`` object as modified by ``set_content`` is correct, but
:class:`~email.generator.BytesGenerator` does not serialize it
correctly.
If *disposition* is set, use it as the value of the
:mailheader:`Content-Disposition` header. If not specified, and
*filename* is specified, add the header with the value ``attachment``.
If it is not specified and *filename* is also not specified, do not add
the header. The only valid values for *disposition* are ``attachment``
and ``inline``.
If *filename* is specified, use it as the value of the ``filename``
parameter of the :mailheader:`Content-Disposition` header. There is no
default.
If *cid* is specified, add a :mailheader:`Content-ID` header with
*cid* as its value.
If *params* is specified, iterate its ``items`` method and use the
resulting ``(key, value)`` pairs to set additional paramters on the
:mailheader:`Content-Type` header.
If *headers* is specified and is a list of strings of the form
``headername: headervalue`` or a list of ``header`` objects
(distinguised from strings by having a ``name`` attribute), add the
headers to *msg*.

View File

@ -33,10 +33,11 @@ Here are the methods of the :class:`Message` class:
.. class:: Message(policy=compat32)
The *policy* argument determiens the :mod:`~email.policy` that will be used
to update the message model. The default value, :class:`compat32
<email.policy.Compat32>` maintains backward compatibility with the
Python 3.2 version of the email package. For more information see the
If *policy* is specified (it must be an instance of a :mod:`~email.policy`
class) use the rules it specifies to udpate and serialize the representation
of the message. If *policy* is not set, use the :class`compat32
<email.policy.Compat32>` policy, which maintains backward compatibility with
the Python 3.2 version of the email package. For more information see the
:mod:`~email.policy` documentation.
.. versionchanged:: 3.3 The *policy* keyword argument was added.
@ -465,7 +466,8 @@ Here are the methods of the :class:`Message` class:
to ``False``.
.. method:: set_param(param, value, header='Content-Type', requote=True, charset=None, language='')
.. method:: set_param(param, value, header='Content-Type', requote=True,
charset=None, language='', replace=False)
Set a parameter in the :mailheader:`Content-Type` header. If the
parameter already exists in the header, its value will be replaced with
@ -482,6 +484,12 @@ Here are the methods of the :class:`Message` class:
language, defaulting to the empty string. Both *charset* and *language*
should be strings.
If *replace* is ``False`` (the default) the header is moved to the
end of the list of headers. If *replace* is ``True``, the header
will be updated in place.
.. versionchanged: 3.4 ``replace`` keyword was added.
.. method:: del_param(param, header='content-type', requote=True)

View File

@ -371,7 +371,7 @@ added matters. To illustrate::
to) :rfc:`5322`, :rfc:`2047`, and the current MIME RFCs.
This policy adds new header parsing and folding algorithms. Instead of
simple strings, headers are custom objects with custom attributes depending
simple strings, headers are ``str`` subclasses with attributes that depend
on the type of the field. The parsing and folding algorithm fully implement
:rfc:`2047` and :rfc:`5322`.
@ -408,6 +408,20 @@ added matters. To illustrate::
fields are treated as unstructured. This list will be completed before
the extension is marked stable.)
.. attribute:: content_manager
An object with at least two methods: get_content and set_content. When
the :meth:`~email.message.Message.get_content` or
:meth:`~email.message.Message.set_content` method of a
:class:`~email.message.Message` object is called, it calls the
corresponding method of this object, passing it the message object as its
first argument, and any arguments or keywords that were passed to it as
additional arguments. By default ``content_manager`` is set to
:data:`~email.contentmanager.raw_data_manager`.
.. versionadded 3.4
The class provides the following concrete implementations of the abstract
methods of :class:`Policy`:
@ -427,7 +441,7 @@ added matters. To illustrate::
The name is returned unchanged. If the input value has a ``name``
attribute and it matches *name* ignoring case, the value is returned
unchanged. Otherwise the *name* and *value* are passed to
``header_factory``, and the resulting custom header object is returned as
``header_factory``, and the resulting header object is returned as
the value. In this case a ``ValueError`` is raised if the input value
contains CR or LF characters.
@ -435,7 +449,7 @@ added matters. To illustrate::
If the value has a ``name`` attribute, it is returned to unmodified.
Otherwise the *name*, and the *value* with any CR or LF characters
removed, are passed to the ``header_factory``, and the resulting custom
removed, are passed to the ``header_factory``, and the resulting
header object is returned. Any surrogateescaped bytes get turned into
the unicode unknown-character glyph.
@ -445,9 +459,9 @@ added matters. To illustrate::
A value is considered to be a 'source value' if and only if it does not
have a ``name`` attribute (having a ``name`` attribute means it is a
header object of some sort). If a source value needs to be refolded
according to the policy, it is converted into a custom header object by
according to the policy, it is converted into a header object by
passing the *name* and the *value* with any CR and LF characters removed
to the ``header_factory``. Folding of a custom header object is done by
to the ``header_factory``. Folding of a header object is done by
calling its ``fold`` method with the current policy.
Source values are split into lines using :meth:`~str.splitlines`. If
@ -502,23 +516,23 @@ With all of these :class:`EmailPolicies <.EmailPolicy>`, the effective API of
the email package is changed from the Python 3.2 API in the following ways:
* Setting a header on a :class:`~email.message.Message` results in that
header being parsed and a custom header object created.
header being parsed and a header object created.
* Fetching a header value from a :class:`~email.message.Message` results
in that header being parsed and a custom header object created and
in that header being parsed and a header object created and
returned.
* Any custom header object, or any header that is refolded due to the
* Any header object, or any header that is refolded due to the
policy settings, is folded using an algorithm that fully implements the
RFC folding algorithms, including knowing where encoded words are required
and allowed.
From the application view, this means that any header obtained through the
:class:`~email.message.Message` is a custom header object with custom
:class:`~email.message.Message` is a header object with extra
attributes, whose string value is the fully decoded unicode value of the
header. Likewise, a header may be assigned a new value, or a new header
created, using a unicode string, and the policy will take care of converting
the unicode string into the correct RFC encoded form.
The custom header objects and their attributes are described in
The header objects and their attributes are described in
:mod:`~email.headerregistry`.

View File

@ -53,6 +53,7 @@ Contents of the :mod:`email` package documentation:
email.generator.rst
email.policy.rst
email.headerregistry.rst
email.contentmanager.rst
email.mime.rst
email.header.rst
email.charset.rst

View File

@ -280,6 +280,21 @@ result: a bytes object containing the fully formatted message.
(Contributed by R. David Murray in :issue:`18600`.)
A pair of new subclasses of :class:`~email.message.Message` have been added,
along with a new sub-module, :mod:`~email.contentmanager`. All documentation
is currently in the new module, which is being added as part of the new
:term:`provisional <provosional package>` email API. These classes provide a
number of new methods that make extracting content from and inserting content
into email messages much easier. See the :mod:`~email.contentmanager`
documentation for details.
These API additions complete the bulk of the work that was planned as part of
the email6 project. The currently provisional API is scheduled to become final
in Python 3.5 (possibly with a few minor additions in the area of error
handling).
(Contributed by R. David Murray in :issue:`18891`.)
functools
---------

249
Lib/email/contentmanager.py Normal file
View File

@ -0,0 +1,249 @@
import binascii
import email.charset
import email.message
import email.errors
from email import quoprimime
class ContentManager:
def __init__(self):
self.get_handlers = {}
self.set_handlers = {}
def add_get_handler(self, key, handler):
self.get_handlers[key] = handler
def get_content(self, msg, *args, **kw):
content_type = msg.get_content_type()
if content_type in self.get_handlers:
return self.get_handlers[content_type](msg, *args, **kw)
maintype = msg.get_content_maintype()
if maintype in self.get_handlers:
return self.get_handlers[maintype](msg, *args, **kw)
if '' in self.get_handlers:
return self.get_handlers[''](msg, *args, **kw)
raise KeyError(content_type)
def add_set_handler(self, typekey, handler):
self.set_handlers[typekey] = handler
def set_content(self, msg, obj, *args, **kw):
if msg.get_content_maintype() == 'multipart':
# XXX: is this error a good idea or not? We can remove it later,
# but we can't add it later, so do it for now.
raise TypeError("set_content not valid on multipart")
handler = self._find_set_handler(msg, obj)
msg.clear_content()
handler(msg, obj, *args, **kw)
def _find_set_handler(self, msg, obj):
full_path_for_error = None
for typ in type(obj).__mro__:
if typ in self.set_handlers:
return self.set_handlers[typ]
qname = typ.__qualname__
modname = getattr(typ, '__module__', '')
full_path = '.'.join((modname, qname)) if modname else qname
if full_path_for_error is None:
full_path_for_error = full_path
if full_path in self.set_handlers:
return self.set_handlers[full_path]
if qname in self.set_handlers:
return self.set_handlers[qname]
name = typ.__name__
if name in self.set_handlers:
return self.set_handlers[name]
if None in self.set_handlers:
return self.set_handlers[None]
raise KeyError(full_path_for_error)
raw_data_manager = ContentManager()
def get_text_content(msg, errors='replace'):
content = msg.get_payload(decode=True)
charset = msg.get_param('charset', 'ASCII')
return content.decode(charset, errors=errors)
raw_data_manager.add_get_handler('text', get_text_content)
def get_non_text_content(msg):
return msg.get_payload(decode=True)
for maintype in 'audio image video application'.split():
raw_data_manager.add_get_handler(maintype, get_non_text_content)
def get_message_content(msg):
return msg.get_payload(0)
for subtype in 'rfc822 external-body'.split():
raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
def get_and_fixup_unknown_message_content(msg):
# If we don't understand a message subtype, we are supposed to treat it as
# if it were application/octet-stream, per
# tools.ietf.org/html/rfc2046#section-5.2.4. Feedparser doesn't do that,
# so do our best to fix things up. Note that it is *not* appropriate to
# model message/partial content as Message objects, so they are handled
# here as well. (How to reassemble them is out of scope for this comment :)
return bytes(msg.get_payload(0))
raw_data_manager.add_get_handler('message',
get_and_fixup_unknown_message_content)
def _prepare_set(msg, maintype, subtype, headers):
msg['Content-Type'] = '/'.join((maintype, subtype))
if headers:
if not hasattr(headers[0], 'name'):
mp = msg.policy
headers = [mp.header_factory(*mp.header_source_parse([header]))
for header in headers]
try:
for header in headers:
if header.defects:
raise header.defects[0]
msg[header.name] = header
except email.errors.HeaderDefect as exc:
raise ValueError("Invalid header: {}".format(
header.fold(policy=msg.policy))) from exc
def _finalize_set(msg, disposition, filename, cid, params):
if disposition is None and filename is not None:
disposition = 'attachment'
if disposition is not None:
msg['Content-Disposition'] = disposition
if filename is not None:
msg.set_param('filename',
filename,
header='Content-Disposition',
replace=True)
if cid is not None:
msg['Content-ID'] = cid
if params is not None:
for key, value in params.items():
msg.set_param(key, value)
# XXX: This is a cleaned-up version of base64mime.body_encode. It would
# be nice to drop both this and quoprimime.body_encode in favor of
# enhanced binascii routines that accepted a max_line_length parameter.
def _encode_base64(data, max_line_length):
encoded_lines = []
unencoded_bytes_per_line = max_line_length * 3 // 4
for i in range(0, len(data), unencoded_bytes_per_line):
thisline = data[i:i+unencoded_bytes_per_line]
encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
return ''.join(encoded_lines)
def _encode_text(string, charset, cte, policy):
lines = string.encode(charset).splitlines()
linesep = policy.linesep.encode('ascii')
def embeded_body(lines): return linesep.join(lines) + linesep
def normal_body(lines): return b'\n'.join(lines) + b'\n'
if cte==None:
# Use heuristics to decide on the "best" encoding.
try:
return '7bit', normal_body(lines).decode('ascii')
except UnicodeDecodeError:
pass
if (policy.cte_type == '8bit' and
max(len(x) for x in lines) <= policy.max_line_length):
return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
sniff = embeded_body(lines[:10])
sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
policy.max_line_length)
sniff_base64 = binascii.b2a_base64(sniff)
# This is a little unfair to qp; it includes lineseps, base64 doesn't.
if len(sniff_qp) > len(sniff_base64):
cte = 'base64'
else:
cte = 'quoted-printable'
if len(lines) <= 10:
return cte, sniff_qp
if cte == '7bit':
data = normal_body(lines).decode('ascii')
elif cte == '8bit':
data = normal_body(lines).decode('ascii', 'surrogateescape')
elif cte == 'quoted-printable':
data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
policy.max_line_length)
elif cte == 'base64':
data = _encode_base64(embeded_body(lines), policy.max_line_length)
else:
raise ValueError("Unknown content transfer encoding {}".format(cte))
return cte, data
def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, 'text', subtype, headers)
cte, payload = _encode_text(string, charset, cte, msg.policy)
msg.set_payload(payload)
msg.set_param('charset',
email.charset.ALIASES.get(charset, charset),
replace=True)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
raw_data_manager.add_set_handler(str, set_text_content)
def set_message_content(msg, message, subtype="rfc822", cte=None,
disposition=None, filename=None, cid=None,
params=None, headers=None):
if subtype == 'partial':
raise ValueError("message/partial is not supported for Message objects")
if subtype == 'rfc822':
if cte not in (None, '7bit', '8bit', 'binary'):
# http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
raise ValueError(
"message/rfc822 parts do not support cte={}".format(cte))
# 8bit will get coerced on serialization if policy.cte_type='7bit'. We
# may end up claiming 8bit when it isn't needed, but the only negative
# result of that should be a gateway that needs to coerce to 7bit
# having to look through the whole embedded message to discover whether
# or not it actually has to do anything.
cte = '8bit' if cte is None else cte
elif subtype == 'external-body':
if cte not in (None, '7bit'):
# http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
raise ValueError(
"message/external-body parts do not support cte={}".format(cte))
cte = '7bit'
elif cte is None:
# http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
# subtypes should be restricted to 7bit, so assume that.
cte = '7bit'
_prepare_set(msg, 'message', subtype, headers)
msg.set_payload([message])
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
raw_data_manager.add_set_handler(email.message.Message, set_message_content)
def set_bytes_content(msg, data, maintype, subtype, cte='base64',
disposition=None, filename=None, cid=None,
params=None, headers=None):
_prepare_set(msg, maintype, subtype, headers)
if cte == 'base64':
data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
elif cte == 'quoted-printable':
# XXX: quoprimime.body_encode won't encode newline characters in data,
# so we can't use it. This means max_line_length is ignored. Another
# bug to fix later. (Note: encoders.quopri is broken on line ends.)
data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
data = data.decode('ascii')
elif cte == '7bit':
# Make sure it really is only ASCII. The early warning here seems
# worth the overhead...if you care write your own content manager :).
data.encode('ascii')
elif cte in ('8bit', 'binary'):
data = data.decode('ascii', 'surrogateescape')
msg.set_payload(data)
msg['Content-Transfer-Encoding'] = cte
_finalize_set(msg, disposition, filename, cid, params)
for typ in (bytes, bytearray, memoryview):
raw_data_manager.add_set_handler(typ, set_bytes_content)

View File

@ -8,8 +8,6 @@ __all__ = ['Message']
import re
import uu
import base64
import binascii
from io import BytesIO, StringIO
# Intrapackage imports
@ -679,7 +677,7 @@ class Message:
return failobj
def set_param(self, param, value, header='Content-Type', requote=True,
charset=None, language=''):
charset=None, language='', replace=False):
"""Set a parameter in the Content-Type header.
If the parameter already exists in the header, its value will be
@ -723,8 +721,11 @@ class Message:
else:
ctype = SEMISPACE.join([ctype, append_param])
if ctype != self.get(header):
del self[header]
self[header] = ctype
if replace:
self.replace_header(header, ctype)
else:
del self[header]
self[header] = ctype
def del_param(self, param, header='content-type', requote=True):
"""Remove the given parameter completely from the Content-Type header.
@ -905,3 +906,208 @@ class Message:
# I.e. def walk(self): ...
from email.iterators import walk
class MIMEPart(Message):
def __init__(self, policy=None):
if policy is None:
from email.policy import default
policy = default
Message.__init__(self, policy)
@property
def is_attachment(self):
c_d = self.get('content-disposition')
if c_d is None:
return False
return c_d.lower() == 'attachment'
def _find_body(self, part, preferencelist):
if part.is_attachment:
return
maintype, subtype = part.get_content_type().split('/')
if maintype == 'text':
if subtype in preferencelist:
yield (preferencelist.index(subtype), part)
return
if maintype != 'multipart':
return
if subtype != 'related':
for subpart in part.iter_parts():
yield from self._find_body(subpart, preferencelist)
return
if 'related' in preferencelist:
yield (preferencelist.index('related'), part)
candidate = None
start = part.get_param('start')
if start:
for subpart in part.iter_parts():
if subpart['content-id'] == start:
candidate = subpart
break
if candidate is None:
subparts = part.get_payload()
candidate = subparts[0] if subparts else None
if candidate is not None:
yield from self._find_body(candidate, preferencelist)
def get_body(self, preferencelist=('related', 'html', 'plain')):
"""Return best candidate mime part for display as 'body' of message.
Do a depth first search, starting with self, looking for the first part
matching each of the items in preferencelist, and return the part
corresponding to the first item that has a match, or None if no items
have a match. If 'related' is not included in preferencelist, consider
the root part of any multipart/related encountered as a candidate
match. Ignore parts with 'Content-Disposition: attachment'.
"""
best_prio = len(preferencelist)
body = None
for prio, part in self._find_body(self, preferencelist):
if prio < best_prio:
best_prio = prio
body = part
if prio == 0:
break
return body
_body_types = {('text', 'plain'),
('text', 'html'),
('multipart', 'related'),
('multipart', 'alternative')}
def iter_attachments(self):
"""Return an iterator over the non-main parts of a multipart.
Skip the first of each occurrence of text/plain, text/html,
multipart/related, or multipart/alternative in the multipart (unless
they have a 'Content-Disposition: attachment' header) and include all
remaining subparts in the returned iterator. When applied to a
multipart/related, return all parts except the root part. Return an
empty iterator when applied to a multipart/alternative or a
non-multipart.
"""
maintype, subtype = self.get_content_type().split('/')
if maintype != 'multipart' or subtype == 'alternative':
return
parts = self.get_payload()
if maintype == 'multipart' and subtype == 'related':
# For related, we treat everything but the root as an attachment.
# The root may be indicated by 'start'; if there's no start or we
# can't find the named start, treat the first subpart as the root.
start = self.get_param('start')
if start:
found = False
attachments = []
for part in parts:
if part.get('content-id') == start:
found = True
else:
attachments.append(part)
if found:
yield from attachments
return
parts.pop(0)
yield from parts
return
# Otherwise we more or less invert the remaining logic in get_body.
# This only really works in edge cases (ex: non-text relateds or
# alternatives) if the sending agent sets content-disposition.
seen = [] # Only skip the first example of each candidate type.
for part in parts:
maintype, subtype = part.get_content_type().split('/')
if ((maintype, subtype) in self._body_types and
not part.is_attachment and subtype not in seen):
seen.append(subtype)
continue
yield part
def iter_parts(self):
"""Return an iterator over all immediate subparts of a multipart.
Return an empty iterator for a non-multipart.
"""
if self.get_content_maintype() == 'multipart':
yield from self.get_payload()
def get_content(self, *args, content_manager=None, **kw):
if content_manager is None:
content_manager = self.policy.content_manager
return content_manager.get_content(self, *args, **kw)
def set_content(self, *args, content_manager=None, **kw):
if content_manager is None:
content_manager = self.policy.content_manager
content_manager.set_content(self, *args, **kw)
def _make_multipart(self, subtype, disallowed_subtypes, boundary):
if self.get_content_maintype() == 'multipart':
existing_subtype = self.get_content_subtype()
disallowed_subtypes = disallowed_subtypes + (subtype,)
if existing_subtype in disallowed_subtypes:
raise ValueError("Cannot convert {} to {}".format(
existing_subtype, subtype))
keep_headers = []
part_headers = []
for name, value in self._headers:
if name.lower().startswith('content-'):
part_headers.append((name, value))
else:
keep_headers.append((name, value))
if part_headers:
# There is existing content, move it to the first subpart.
part = type(self)(policy=self.policy)
part._headers = part_headers
part._payload = self._payload
self._payload = [part]
else:
self._payload = []
self._headers = keep_headers
self['Content-Type'] = 'multipart/' + subtype
if boundary is not None:
self.set_param('boundary', boundary)
def make_related(self, boundary=None):
self._make_multipart('related', ('alternative', 'mixed'), boundary)
def make_alternative(self, boundary=None):
self._make_multipart('alternative', ('mixed',), boundary)
def make_mixed(self, boundary=None):
self._make_multipart('mixed', (), boundary)
def _add_multipart(self, _subtype, *args, _disp=None, **kw):
if (self.get_content_maintype() != 'multipart' or
self.get_content_subtype() != _subtype):
getattr(self, 'make_' + _subtype)()
part = type(self)(policy=self.policy)
part.set_content(*args, **kw)
if _disp and 'content-disposition' not in part:
part['Content-Disposition'] = _disp
self.attach(part)
def add_related(self, *args, **kw):
self._add_multipart('related', *args, _disp='inline', **kw)
def add_alternative(self, *args, **kw):
self._add_multipart('alternative', *args, **kw)
def add_attachment(self, *args, **kw):
self._add_multipart('mixed', *args, _disp='attachment', **kw)
def clear(self):
self._headers = []
self._payload = None
def clear_content(self):
self._headers = [(n, v) for n, v in self._headers
if not n.lower().startswith('content-')]
self._payload = None
class EmailMessage(MIMEPart):
def set_content(self, *args, **kw):
super().set_content(*args, **kw)
if 'MIME-Version' not in self:
self['MIME-Version'] = '1.0'

View File

@ -5,6 +5,7 @@ code that adds all the email6 features.
from email._policybase import Policy, Compat32, compat32, _extend_docstrings
from email.utils import _has_surrogates
from email.headerregistry import HeaderRegistry as HeaderRegistry
from email.contentmanager import raw_data_manager
__all__ = [
'Compat32',
@ -58,10 +59,22 @@ class EmailPolicy(Policy):
special treatment, while all other fields are
treated as unstructured. This list will be
completed before the extension is marked stable.)
content_manager -- an object with at least two methods: get_content
and set_content. When the get_content or
set_content method of a Message object is called,
it calls the corresponding method of this object,
passing it the message object as its first argument,
and any arguments or keywords that were passed to
it as additional arguments. The default
content_manager is
:data:`~email.contentmanager.raw_data_manager`.
"""
refold_source = 'long'
header_factory = HeaderRegistry()
content_manager = raw_data_manager
def __init__(self, **kw):
# Ensure that each new instance gets a unique header factory

View File

@ -68,9 +68,13 @@ def _has_surrogates(s):
# How to deal with a string containing bytes before handing it to the
# application through the 'normal' interface.
def _sanitize(string):
# Turn any escaped bytes into unicode 'unknown' char.
original_bytes = string.encode('ascii', 'surrogateescape')
return original_bytes.decode('ascii', 'replace')
# Turn any escaped bytes into unicode 'unknown' char. If the escaped
# bytes happen to be utf-8 they will instead get decoded, even if they
# were invalid in the charset the source was supposed to be in. This
# seems like it is not a bad thing; a defect was still registered.
original_bytes = string.encode('utf-8', 'surrogateescape')
return original_bytes.decode('utf-8', 'replace')
# Helpers

View File

@ -2,6 +2,7 @@ import os
import sys
import unittest
import test.support
import collections
import email
from email.message import Message
from email._policybase import compat32
@ -42,6 +43,8 @@ class TestEmailBase(unittest.TestCase):
# here we make minimal changes in the test_email tests compared to their
# pre-3.3 state.
policy = compat32
# Likewise, the default message object is Message.
message = Message
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
@ -54,11 +57,23 @@ class TestEmailBase(unittest.TestCase):
with openfile(filename) as fp:
return email.message_from_file(fp, policy=self.policy)
def _str_msg(self, string, message=Message, policy=None):
def _str_msg(self, string, message=None, policy=None):
if policy is None:
policy = self.policy
if message is None:
message = self.message
return email.message_from_string(string, message, policy=policy)
def _bytes_msg(self, bytestring, message=None, policy=None):
if policy is None:
policy = self.policy
if message is None:
message = self.message
return email.message_from_bytes(bytestring, message, policy=policy)
def _make_message(self):
return self.message(policy=self.policy)
def _bytes_repr(self, b):
return [repr(x) for x in b.splitlines(keepends=True)]
@ -123,6 +138,7 @@ def parameterize(cls):
"""
paramdicts = {}
testers = collections.defaultdict(list)
for name, attr in cls.__dict__.items():
if name.endswith('_params'):
if not hasattr(attr, 'keys'):
@ -134,7 +150,15 @@ def parameterize(cls):
d[n] = x
attr = d
paramdicts[name[:-7] + '_as_'] = attr
if '_as_' in name:
testers[name.split('_as_')[0] + '_as_'].append(name)
testfuncs = {}
for name in paramdicts:
if name not in testers:
raise ValueError("No tester found for {}".format(name))
for name in testers:
if name not in paramdicts:
raise ValueError("No params found for {}".format(name))
for name, attr in cls.__dict__.items():
for paramsname, paramsdict in paramdicts.items():
if name.startswith(paramsname):

View File

@ -0,0 +1,796 @@
import unittest
from test.test_email import TestEmailBase, parameterize
import textwrap
from email import policy
from email.message import EmailMessage
from email.contentmanager import ContentManager, raw_data_manager
@parameterize
class TestContentManager(TestEmailBase):
policy = policy.default
message = EmailMessage
get_key_params = {
'full_type': (1, 'text/plain',),
'maintype_only': (2, 'text',),
'null_key': (3, '',),
}
def get_key_as_get_content_key(self, order, key):
def foo_getter(msg, foo=None):
bar = msg['X-Bar-Header']
return foo, bar
cm = ContentManager()
cm.add_get_handler(key, foo_getter)
m = self._make_message()
m['Content-Type'] = 'text/plain'
m['X-Bar-Header'] = 'foo'
self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
def get_key_as_get_content_key_order(self, order, key):
def bar_getter(msg):
return msg['X-Bar-Header']
def foo_getter(msg):
return msg['X-Foo-Header']
cm = ContentManager()
cm.add_get_handler(key, foo_getter)
for precedence, key in self.get_key_params.values():
if precedence > order:
cm.add_get_handler(key, bar_getter)
m = self._make_message()
m['Content-Type'] = 'text/plain'
m['X-Bar-Header'] = 'bar'
m['X-Foo-Header'] = 'foo'
self.assertEqual(cm.get_content(m), ('foo'))
def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
cm = ContentManager()
m = self._make_message()
m['Content-Type'] = 'text/plain'
with self.assertRaisesRegex(KeyError, 'text/plain'):
cm.get_content(m)
class BaseThing(str):
pass
baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
class Thing(BaseThing):
pass
testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
set_key_params = {
'type': (0, Thing,),
'full_path': (1, testobject_full_path,),
'qualname': (2, 'TestContentManager.Thing',),
'name': (3, 'Thing',),
'base_type': (4, BaseThing,),
'base_full_path': (5, baseobject_full_path,),
'base_qualname': (6, 'TestContentManager.BaseThing',),
'base_name': (7, 'BaseThing',),
'str_type': (8, str,),
'str_full_path': (9, 'builtins.str',),
'str_name': (10, 'str',), # str name and qualname are the same
'null_key': (11, None,),
}
def set_key_as_set_content_key(self, order, key):
def foo_setter(msg, obj, foo=None):
msg['X-Foo-Header'] = foo
msg.set_payload(obj)
cm = ContentManager()
cm.add_set_handler(key, foo_setter)
m = self._make_message()
msg_obj = self.Thing()
cm.set_content(m, msg_obj, foo='bar')
self.assertEqual(m['X-Foo-Header'], 'bar')
self.assertEqual(m.get_payload(), msg_obj)
def set_key_as_set_content_key_order(self, order, key):
def foo_setter(msg, obj):
msg['X-FooBar-Header'] = 'foo'
msg.set_payload(obj)
def bar_setter(msg, obj):
msg['X-FooBar-Header'] = 'bar'
cm = ContentManager()
cm.add_set_handler(key, foo_setter)
for precedence, key in self.get_key_params.values():
if precedence > order:
cm.add_set_handler(key, bar_setter)
m = self._make_message()
msg_obj = self.Thing()
cm.set_content(m, msg_obj)
self.assertEqual(m['X-FooBar-Header'], 'foo')
self.assertEqual(m.get_payload(), msg_obj)
def test_set_content_raises_if_unknown_type_and_no_default(self):
cm = ContentManager()
m = self._make_message()
msg_obj = self.Thing()
with self.assertRaisesRegex(KeyError, self.testobject_full_path):
cm.set_content(m, msg_obj)
def test_set_content_raises_if_called_on_multipart(self):
cm = ContentManager()
m = self._make_message()
m['Content-Type'] = 'multipart/foo'
with self.assertRaises(TypeError):
cm.set_content(m, 'test')
def test_set_content_calls_clear_content(self):
m = self._make_message()
m['Content-Foo'] = 'bar'
m['Content-Type'] = 'text/html'
m['To'] = 'test'
m.set_payload('abc')
cm = ContentManager()
cm.add_set_handler(str, lambda *args, **kw: None)
m.set_content('xyz', content_manager=cm)
self.assertIsNone(m['Content-Foo'])
self.assertIsNone(m['Content-Type'])
self.assertEqual(m['To'], 'test')
self.assertIsNone(m.get_payload())
@parameterize
class TestRawDataManager(TestEmailBase):
# Note: these tests are dependent on the order in which headers are added
# to the message objects by the code. There's no defined ordering in
# RFC5322/MIME, so this makes the tests more fragile than the standards
# require. However, if the header order changes it is best to understand
# *why*, and make sure it isn't a subtle bug in whatever change was
# applied.
policy = policy.default.clone(max_line_length=60,
content_manager=raw_data_manager)
message = EmailMessage
def test_get_text_plain(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain
Basic text.
"""))
self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
def test_get_text_html(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/html
<p>Basic text.</p>
"""))
self.assertEqual(raw_data_manager.get_content(m),
"<p>Basic text.</p>\n")
def test_get_text_plain_latin1(self):
m = self._bytes_msg(textwrap.dedent("""\
Content-Type: text/plain; charset=latin1
Basìc tëxt.
""").encode('latin1'))
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
def test_get_text_plain_latin1_quoted_printable(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain; charset="latin-1"
Content-Transfer-Encoding: quoted-printable
Bas=ECc t=EBxt.
"""))
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
def test_get_text_plain_utf8_base64(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain; charset="utf8"
Content-Transfer-Encoding: base64
QmFzw6xjIHTDq3h0Lgo=
"""))
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
def test_get_text_plain_bad_utf8_quoted_printable(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain; charset="utf8"
Content-Transfer-Encoding: quoted-printable
Bas=c3=acc t=c3=abxt=fd.
"""))
self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt<78>.\n")
def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain; charset="utf8"
Content-Transfer-Encoding: quoted-printable
Bas=c3=acc t=c3=abxt=fd.
"""))
self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
"Basìc tëxt.\n")
def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain; charset="utf8"
Content-Transfer-Encoding: base64
QmFzw6xjIHTDq3h0Lgo\xFF=
"""))
self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
"Basìc tëxt.\n")
def test_get_text_invalid_keyword(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: text/plain
Basic text.
"""))
with self.assertRaises(TypeError):
raw_data_manager.get_content(m, foo='ignore')
def test_get_non_text(self):
template = textwrap.dedent("""\
Content-Type: {}
Content-Transfer-Encoding: base64
Ym9ndXMgZGF0YQ==
""")
for maintype in 'audio image video application'.split():
with self.subTest(maintype=maintype):
m = self._str_msg(template.format(maintype+'/foo'))
self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
def test_get_non_text_invalid_keyword(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: image/jpg
Content-Transfer-Encoding: base64
Ym9ndXMgZGF0YQ==
"""))
with self.assertRaises(TypeError):
raw_data_manager.get_content(m, errors='ignore')
def test_get_raises_on_multipart(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: multipart/mixed; boundary="==="
--===
--===--
"""))
with self.assertRaises(KeyError):
raw_data_manager.get_content(m)
def test_get_message_rfc822_and_external_body(self):
template = textwrap.dedent("""\
Content-Type: message/{}
To: foo@example.com
From: bar@example.com
Subject: example
an example message
""")
for subtype in 'rfc822 external-body'.split():
with self.subTest(subtype=subtype):
m = self._str_msg(template.format(subtype))
sub_msg = raw_data_manager.get_content(m)
self.assertIsInstance(sub_msg, self.message)
self.assertEqual(raw_data_manager.get_content(sub_msg),
"an example message\n")
self.assertEqual(sub_msg['to'], 'foo@example.com')
self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
m = self._str_msg(textwrap.dedent("""\
Content-Type: message/partial
To: foo@example.com
From: bar@example.com
Subject: example
The real body is in another message.
"""))
self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
def test_set_text_plain(self):
m = self._make_message()
content = "Simple message.\n"
raw_data_manager.set_content(m, content)
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
Simple message.
"""))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_html(self):
m = self._make_message()
content = "<p>Simple message.</p>\n"
raw_data_manager.set_content(m, content, subtype='html')
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: text/html; charset="utf-8"
Content-Transfer-Encoding: 7bit
<p>Simple message.</p>
"""))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_charset_latin_1(self):
m = self._make_message()
content = "Simple message.\n"
raw_data_manager.set_content(m, content, charset='latin-1')
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: text/plain; charset="iso-8859-1"
Content-Transfer-Encoding: 7bit
Simple message.
"""))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_short_line_minimal_non_ascii_heuristics(self):
m = self._make_message()
content = "et là il est monté sur moi et il commence à m'éto.\n"
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
et il est monté sur moi et il commence à m'éto.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_long_line_minimal_non_ascii_heuristics(self):
m = self._make_message()
content = ("j'ai un problème de python. il est sorti de son"
" vivarium. et là il est monté sur moi et il commence"
" à m'éto.\n")
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: quoted-printable
j'ai un probl=C3=A8me de python. il est sorti de son vivari=
um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
=C3=A0 m'=C3=A9to.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
m = self._make_message()
content = '\n'*10 + (
"j'ai un problème de python. il est sorti de son"
" vivarium. et là il est monté sur moi et il commence"
" à m'éto.\n")
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: quoted-printable
""" + '\n'*10 + """
j'ai un probl=C3=A8me de python. il est sorti de son vivari=
um. et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
=C3=A0 m'=C3=A9to.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_maximal_non_ascii_heuristics(self):
m = self._make_message()
content = "áàäéèęöő.\n"
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
áàäéèęöő.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
m = self._make_message()
content = '\n'*10 + "áàäéèęöő.\n"
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
""" + '\n'*10 + """
áàäéèęöő.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_long_line_maximal_non_ascii_heuristics(self):
m = self._make_message()
content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: base64
w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
w6TDqcOoxJnDtsWRLgo=
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
# Yes, it chooses "wrong" here. It's a heuristic. So this result
# could change if we come up with a better heuristic.
m = self._make_message()
content = ('\n'*10 +
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
raw_data_manager.set_content(m, "\n"*10 +
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
"áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: quoted-printable
""" + '\n'*10 + """
=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
=C5=91.
""").encode('utf-8'))
self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
self.assertEqual(m.get_content(), content)
def test_set_text_non_ascii_with_cte_7bit_raises(self):
m = self._make_message()
with self.assertRaises(UnicodeError):
raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
def test_set_text_non_ascii_with_charset_ascii_raises(self):
m = self._make_message()
with self.assertRaises(UnicodeError):
raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
m = self._make_message()
with self.assertRaises(UnicodeError):
raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
def test_set_message(self):
m = self._make_message()
m['Subject'] = "Forwarded message"
content = self._make_message()
content['To'] = 'python@vivarium.org'
content['From'] = 'police@monty.org'
content['Subject'] = "get back in your box"
content.set_content("Or face the comfy chair.")
raw_data_manager.set_content(m, content)
self.assertEqual(str(m), textwrap.dedent("""\
Subject: Forwarded message
Content-Type: message/rfc822
Content-Transfer-Encoding: 8bit
To: python@vivarium.org
From: police@monty.org
Subject: get back in your box
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
MIME-Version: 1.0
Or face the comfy chair.
"""))
payload = m.get_payload(0)
self.assertIsInstance(payload, self.message)
self.assertEqual(str(payload), str(content))
self.assertIsInstance(m.get_content(), self.message)
self.assertEqual(str(m.get_content()), str(content))
def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
m = self._make_message()
m['Subject'] = "Escape report"
content = self._make_message()
content['To'] = 'police@monty.org'
content['From'] = 'victim@monty.org'
content['Subject'] = "Help"
content.set_content("j'ai un problème de python. il est sorti de son"
" vivarium.")
raw_data_manager.set_content(m, content)
self.assertEqual(bytes(m), textwrap.dedent("""\
Subject: Escape report
Content-Type: message/rfc822
Content-Transfer-Encoding: 8bit
To: police@monty.org
From: victim@monty.org
Subject: Help
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 8bit
MIME-Version: 1.0
j'ai un problème de python. il est sorti de son vivarium.
""").encode('utf-8'))
# The choice of base64 for the body encoding is because generator
# doesn't bother with heuristics and uses it unconditionally for utf-8
# text.
# XXX: the first cte should be 7bit, too...that's a generator bug.
# XXX: the line length in the body also looks like a generator bug.
self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
textwrap.dedent("""\
Subject: Escape report
Content-Type: message/rfc822
Content-Transfer-Encoding: 8bit
To: police@monty.org
From: victim@monty.org
Subject: Help
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: base64
aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
Lgo=
"""))
self.assertIsInstance(m.get_content(), self.message)
self.assertEqual(str(m.get_content()), str(content))
def test_set_message_invalid_cte_raises(self):
m = self._make_message()
content = self._make_message()
for cte in 'quoted-printable base64'.split():
for subtype in 'rfc822 external-body'.split():
with self.subTest(cte=cte, subtype=subtype):
with self.assertRaises(ValueError) as ar:
m.set_content(content, subtype, cte=cte)
exc = str(ar.exception)
self.assertIn(cte, exc)
self.assertIn(subtype, exc)
subtype = 'external-body'
for cte in '8bit binary'.split():
with self.subTest(cte=cte, subtype=subtype):
with self.assertRaises(ValueError) as ar:
m.set_content(content, subtype, cte=cte)
exc = str(ar.exception)
self.assertIn(cte, exc)
self.assertIn(subtype, exc)
def test_set_image_jpg(self):
for content in (b"bogus content",
bytearray(b"bogus content"),
memoryview(b"bogus content")):
with self.subTest(content=content):
m = self._make_message()
raw_data_manager.set_content(m, content, 'image', 'jpeg')
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: image/jpeg
Content-Transfer-Encoding: base64
Ym9ndXMgY29udGVudA==
"""))
self.assertEqual(m.get_payload(decode=True), content)
self.assertEqual(m.get_content(), content)
def test_set_audio_aif_with_quoted_printable_cte(self):
# Why you would use qp, I don't know, but it is technically supported.
# XXX: the incorrect line length is because binascii.b2a_qp doesn't
# support a line length parameter, but we must use it to get newline
# encoding.
# XXX: what about that lack of tailing newline? Do we actually handle
# that correctly in all cases? That is, if the *source* has an
# unencoded newline, do we add an extra newline to the returned payload
# or not? And can that actually be disambiguated based on the RFC?
m = self._make_message()
content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
m.set_content(content, 'audio', 'aif', cte='quoted-printable')
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: audio/aif
Content-Transfer-Encoding: quoted-printable
MIME-Version: 1.0
b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
self.assertEqual(m.get_payload(decode=True), content)
self.assertEqual(m.get_content(), content)
def test_set_video_mpeg_with_binary_cte(self):
m = self._make_message()
content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
m.set_content(content, 'video', 'mpeg', cte='binary')
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: video/mpeg
Content-Transfer-Encoding: binary
MIME-Version: 1.0
""").encode('ascii') +
# XXX: the second \n ought to be a \r, but generator gets it wrong.
# THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
self.assertEqual(m.get_payload(decode=True), content)
self.assertEqual(m.get_content(), content)
def test_set_application_octet_stream_with_8bit_cte(self):
# In 8bit mode, univeral line end logic applies. It is up to the
# application to make sure the lines are short enough; we don't check.
m = self._make_message()
content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
m.set_content(content, 'application', 'octet-stream', cte='8bit')
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: application/octet-stream
Content-Transfer-Encoding: 8bit
MIME-Version: 1.0
""").encode('ascii') +
b'b\xFFgus\tcon\nt\nent\n' +
b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
self.assertEqual(m.get_payload(decode=True), content)
self.assertEqual(m.get_content(), content)
def test_set_headers_from_header_objects(self):
m = self._make_message()
content = "Simple message.\n"
header_factory = self.policy.header_factory
raw_data_manager.set_content(m, content, headers=(
header_factory("To", "foo@example.com"),
header_factory("From", "foo@example.com"),
header_factory("Subject", "I'm talking to myself.")))
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
To: foo@example.com
From: foo@example.com
Subject: I'm talking to myself.
Content-Transfer-Encoding: 7bit
Simple message.
"""))
def test_set_headers_from_strings(self):
m = self._make_message()
content = "Simple message.\n"
raw_data_manager.set_content(m, content, headers=(
"X-Foo-Header: foo",
"X-Bar-Header: bar",))
self.assertEqual(str(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
X-Foo-Header: foo
X-Bar-Header: bar
Content-Transfer-Encoding: 7bit
Simple message.
"""))
def test_set_headers_with_invalid_duplicate_string_header_raises(self):
m = self._make_message()
content = "Simple message.\n"
with self.assertRaisesRegex(ValueError, 'Content-Type'):
raw_data_manager.set_content(m, content, headers=(
"Content-Type: foo/bar",)
)
def test_set_headers_with_invalid_duplicate_header_header_raises(self):
m = self._make_message()
content = "Simple message.\n"
header_factory = self.policy.header_factory
with self.assertRaisesRegex(ValueError, 'Content-Type'):
raw_data_manager.set_content(m, content, headers=(
header_factory("Content-Type", " foo/bar"),)
)
def test_set_headers_with_defective_string_header_raises(self):
m = self._make_message()
content = "Simple message.\n"
with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
raw_data_manager.set_content(m, content, headers=(
'To: a@fairly@@invalid@address',)
)
print(m['To'].defects)
def test_set_headers_with_defective_header_header_raises(self):
m = self._make_message()
content = "Simple message.\n"
header_factory = self.policy.header_factory
with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
raw_data_manager.set_content(m, content, headers=(
header_factory('To', 'a@fairly@@invalid@address'),)
)
print(m['To'].defects)
def test_set_disposition_inline(self):
m = self._make_message()
m.set_content('foo', disposition='inline')
self.assertEqual(m['Content-Disposition'], 'inline')
def test_set_disposition_attachment(self):
m = self._make_message()
m.set_content('foo', disposition='attachment')
self.assertEqual(m['Content-Disposition'], 'attachment')
def test_set_disposition_foo(self):
m = self._make_message()
m.set_content('foo', disposition='foo')
self.assertEqual(m['Content-Disposition'], 'foo')
# XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
# would cause 'foo' above to raise.
def test_set_filename(self):
m = self._make_message()
m.set_content('foo', filename='bar.txt')
self.assertEqual(m['Content-Disposition'],
'attachment; filename="bar.txt"')
def test_set_filename_and_disposition_inline(self):
m = self._make_message()
m.set_content('foo', disposition='inline', filename='bar.txt')
self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
def test_set_non_ascii_filename(self):
m = self._make_message()
m.set_content('foo', filename='ábárî.txt')
self.assertEqual(bytes(m), textwrap.dedent("""\
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
Content-Disposition: attachment;
filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
MIME-Version: 1.0
foo
""").encode('ascii'))
content_object_params = {
'text_plain': ('content', ()),
'text_html': ('content', ('html',)),
'application_octet_stream': (b'content',
('application', 'octet_stream')),
'image_jpeg': (b'content', ('image', 'jpeg')),
'message_rfc822': (message(), ()),
'message_external_body': (message(), ('external-body',)),
}
def content_object_as_header_receiver(self, obj, mimetype):
m = self._make_message()
m.set_content(obj, *mimetype, headers=(
'To: foo@example.com',
'From: bar@simple.net'))
self.assertEqual(m['to'], 'foo@example.com')
self.assertEqual(m['from'], 'bar@simple.net')
def content_object_as_disposition_inline_receiver(self, obj, mimetype):
m = self._make_message()
m.set_content(obj, *mimetype, disposition='inline')
self.assertEqual(m['Content-Disposition'], 'inline')
def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
m = self._make_message()
m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
self.assertEqual(m.get_filename(), "bár.txt")
self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
def content_object_as_cid_receiver(self, obj, mimetype):
m = self._make_message()
m.set_content(obj, *mimetype, cid='some_random_stuff')
self.assertEqual(m['Content-ID'], 'some_random_stuff')
def content_object_as_params_receiver(self, obj, mimetype):
m = self._make_message()
params = {'foo': 'bár', 'abc': 'xyz'}
m.set_content(obj, *mimetype, params=params)
if isinstance(obj, str):
params['charset'] = 'utf-8'
self.assertEqual(m['Content-Type'].params, params)
if __name__ == '__main__':
unittest.main()

View File

@ -661,7 +661,7 @@ class TestContentTypeHeader(TestHeaderBase):
'text/plain; name="ascii_is_the_default"'),
'rfc2231_bad_character_in_charset_parameter_value': (
"text/plain; charset*=ascii''utf-8%E2%80%9D",
"text/plain; charset*=ascii''utf-8%F1%F2%F3",
'text/plain',
'text',
'plain',
@ -669,6 +669,18 @@ class TestContentTypeHeader(TestHeaderBase):
[errors.UndecodableBytesDefect],
'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'),
'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': (
"text/plain; charset*=ascii''utf-8%E2%80%9D",
'text/plain',
'text',
'plain',
{'charset': 'utf-8”'},
[errors.UndecodableBytesDefect],
'text/plain; charset="utf-8”"',
),
# XXX: if the above were *re*folded, it would get tagged as utf-8
# instead of ascii in the param, since it now contains non-ASCII.
'rfc2231_encoded_then_unencoded_segments': (
('application/x-foo;'
'\tname*0*="us-ascii\'en-us\'My";'

View File

@ -1,6 +1,13 @@
import unittest
import textwrap
from email import policy
from test.test_email import TestEmailBase
from email.message import EmailMessage, MIMEPart
from test.test_email import TestEmailBase, parameterize
# Helper.
def first(iterable):
return next(filter(lambda x: x is not None, iterable), None)
class Test(TestEmailBase):
@ -14,5 +21,738 @@ class Test(TestEmailBase):
m['To'] = 'xyz@abc'
@parameterize
class TestEmailMessageBase:
policy = policy.default
# The first argument is a triple (related, html, plain) of indices into the
# list returned by 'walk' called on a Message constructed from the third.
# The indices indicate which part should match the corresponding part-type
# when passed to get_body (ie: the "first" part of that type in the
# message). The second argument is a list of indices into the 'walk' list
# of the attachments that should be returned by a call to
# 'iter_attachments'. The third argument is a list of indices into 'walk'
# that should be returned by a call to 'iter_parts'. Note that the first
# item returned by 'walk' is the Message itself.
message_params = {
'empty_message': (
(None, None, 0),
(),
(),
""),
'non_mime_plain': (
(None, None, 0),
(),
(),
textwrap.dedent("""\
To: foo@example.com
simple text body
""")),
'mime_non_text': (
(None, None, None),
(),
(),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: image/jpg
bogus body.
""")),
'plain_html_alternative': (
(None, 2, 1),
(),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/alternative; boundary="==="
preamble
--===
Content-Type: text/plain
simple body
--===
Content-Type: text/html
<p>simple body</p>
--===--
""")),
'plain_html_mixed': (
(None, 2, 1),
(),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
preamble
--===
Content-Type: text/plain
simple body
--===
Content-Type: text/html
<p>simple body</p>
--===--
""")),
'plain_html_attachment_mixed': (
(None, None, 1),
(2,),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: text/plain
simple body
--===
Content-Type: text/html
Content-Disposition: attachment
<p>simple body</p>
--===--
""")),
'html_text_attachment_mixed': (
(None, 2, None),
(1,),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: text/plain
Content-Disposition: AtTaChment
simple body
--===
Content-Type: text/html
<p>simple body</p>
--===--
""")),
'html_text_attachment_inline_mixed': (
(None, 2, 1),
(),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: text/plain
Content-Disposition: InLine
simple body
--===
Content-Type: text/html
Content-Disposition: inline
<p>simple body</p>
--===--
""")),
# RFC 2387
'related': (
(0, 1, None),
(2,),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/related; boundary="==="; type=text/html
--===
Content-Type: text/html
<p>simple body</p>
--===
Content-Type: image/jpg
Content-ID: <image1>
bogus data
--===--
""")),
# This message structure will probably never be seen in the wild, but
# it proves we distinguish between text parts based on 'start'. The
# content would not, of course, actually work :)
'related_with_start': (
(0, 2, None),
(1,),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/related; boundary="==="; type=text/html;
start="<body>"
--===
Content-Type: text/html
Content-ID: <include>
useless text
--===
Content-Type: text/html
Content-ID: <body>
<p>simple body</p>
<!--#include file="<include>"-->
--===--
""")),
'mixed_alternative_plain_related': (
(3, 4, 2),
(6, 7),
(1, 6, 7),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: multipart/alternative; boundary="+++"
--+++
Content-Type: text/plain
simple body
--+++
Content-Type: multipart/related; boundary="___"
--___
Content-Type: text/html
<p>simple body</p>
--___
Content-Type: image/jpg
Content-ID: <image1@cid>
bogus jpg body
--___--
--+++--
--===
Content-Type: image/jpg
Content-Disposition: attachment
bogus jpg body
--===
Content-Type: image/jpg
Content-Disposition: AttacHmenT
another bogus jpg body
--===--
""")),
# This structure suggested by Stephen J. Turnbull...may not exist/be
# supported in the wild, but we want to support it.
'mixed_related_alternative_plain_html': (
(1, 4, 3),
(6, 7),
(1, 6, 7),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: multipart/related; boundary="+++"
--+++
Content-Type: multipart/alternative; boundary="___"
--___
Content-Type: text/plain
simple body
--___
Content-Type: text/html
<p>simple body</p>
--___--
--+++
Content-Type: image/jpg
Content-ID: <image1@cid>
bogus jpg body
--+++--
--===
Content-Type: image/jpg
Content-Disposition: attachment
bogus jpg body
--===
Content-Type: image/jpg
Content-Disposition: attachment
another bogus jpg body
--===--
""")),
# Same thing, but proving we only look at the root part, which is the
# first one if there isn't any start parameter. That is, this is a
# broken related.
'mixed_related_alternative_plain_html_wrong_order': (
(1, None, None),
(6, 7),
(1, 6, 7),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: multipart/related; boundary="+++"
--+++
Content-Type: image/jpg
Content-ID: <image1@cid>
bogus jpg body
--+++
Content-Type: multipart/alternative; boundary="___"
--___
Content-Type: text/plain
simple body
--___
Content-Type: text/html
<p>simple body</p>
--___--
--+++--
--===
Content-Type: image/jpg
Content-Disposition: attachment
bogus jpg body
--===
Content-Type: image/jpg
Content-Disposition: attachment
another bogus jpg body
--===--
""")),
'message_rfc822': (
(None, None, None),
(),
(),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: message/rfc822
To: bar@example.com
From: robot@examp.com
this is a message body.
""")),
'mixed_text_message_rfc822': (
(None, None, 1),
(2,),
(1, 2),
textwrap.dedent("""\
To: foo@example.com
MIME-Version: 1.0
Content-Type: multipart/mixed; boundary="==="
--===
Content-Type: text/plain
Your message has bounced, ser.
--===
Content-Type: message/rfc822
To: bar@example.com
From: robot@examp.com
this is a message body.
--===--
""")),
}
def message_as_get_body(self, body_parts, attachments, parts, msg):
m = self._str_msg(msg)
allparts = list(m.walk())
expected = [None if n is None else allparts[n] for n in body_parts]
related = 0; html = 1; plain = 2
self.assertEqual(m.get_body(), first(expected))
self.assertEqual(m.get_body(preferencelist=(
'related', 'html', 'plain')),
first(expected))
self.assertEqual(m.get_body(preferencelist=('related', 'html')),
first(expected[related:html+1]))
self.assertEqual(m.get_body(preferencelist=('related', 'plain')),
first([expected[related], expected[plain]]))
self.assertEqual(m.get_body(preferencelist=('html', 'plain')),
first(expected[html:plain+1]))
self.assertEqual(m.get_body(preferencelist=['related']),
expected[related])
self.assertEqual(m.get_body(preferencelist=['html']), expected[html])
self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain])
self.assertEqual(m.get_body(preferencelist=('plain', 'html')),
first(expected[plain:html-1:-1]))
self.assertEqual(m.get_body(preferencelist=('plain', 'related')),
first([expected[plain], expected[related]]))
self.assertEqual(m.get_body(preferencelist=('html', 'related')),
first(expected[html::-1]))
self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')),
first(expected[::-1]))
self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')),
first([expected[html],
expected[plain],
expected[related]]))
def message_as_iter_attachment(self, body_parts, attachments, parts, msg):
m = self._str_msg(msg)
allparts = list(m.walk())
attachments = [allparts[n] for n in attachments]
self.assertEqual(list(m.iter_attachments()), attachments)
def message_as_iter_parts(self, body_parts, attachments, parts, msg):
m = self._str_msg(msg)
allparts = list(m.walk())
parts = [allparts[n] for n in parts]
self.assertEqual(list(m.iter_parts()), parts)
class _TestContentManager:
def get_content(self, msg, *args, **kw):
return msg, args, kw
def set_content(self, msg, *args, **kw):
self.msg = msg
self.args = args
self.kw = kw
def test_get_content_with_cm(self):
m = self._str_msg('')
cm = self._TestContentManager()
self.assertEqual(m.get_content(content_manager=cm), (m, (), {}))
msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2)
self.assertEqual(msg, m)
self.assertEqual(args, ('foo',))
self.assertEqual(kw, dict(bar=1, k=2))
def test_get_content_default_cm_comes_from_policy(self):
p = policy.default.clone(content_manager=self._TestContentManager())
m = self._str_msg('', policy=p)
self.assertEqual(m.get_content(), (m, (), {}))
msg, args, kw = m.get_content('foo', bar=1, k=2)
self.assertEqual(msg, m)
self.assertEqual(args, ('foo',))
self.assertEqual(kw, dict(bar=1, k=2))
def test_set_content_with_cm(self):
m = self._str_msg('')
cm = self._TestContentManager()
m.set_content(content_manager=cm)
self.assertEqual(cm.msg, m)
self.assertEqual(cm.args, ())
self.assertEqual(cm.kw, {})
m.set_content('foo', content_manager=cm, bar=1, k=2)
self.assertEqual(cm.msg, m)
self.assertEqual(cm.args, ('foo',))
self.assertEqual(cm.kw, dict(bar=1, k=2))
def test_set_content_default_cm_comes_from_policy(self):
cm = self._TestContentManager()
p = policy.default.clone(content_manager=cm)
m = self._str_msg('', policy=p)
m.set_content()
self.assertEqual(cm.msg, m)
self.assertEqual(cm.args, ())
self.assertEqual(cm.kw, {})
m.set_content('foo', bar=1, k=2)
self.assertEqual(cm.msg, m)
self.assertEqual(cm.args, ('foo',))
self.assertEqual(cm.kw, dict(bar=1, k=2))
# outcome is whether xxx_method should raise ValueError error when called
# on multipart/subtype. Blank outcome means it depends on xxx (add
# succeeds, make raises). Note: 'none' means there are content-type
# headers but payload is None...this happening in practice would be very
# unusual, so treating it as if there were content seems reasonable.
# method subtype outcome
subtype_params = (
('related', 'no_content', 'succeeds'),
('related', 'none', 'succeeds'),
('related', 'plain', 'succeeds'),
('related', 'related', ''),
('related', 'alternative', 'raises'),
('related', 'mixed', 'raises'),
('alternative', 'no_content', 'succeeds'),
('alternative', 'none', 'succeeds'),
('alternative', 'plain', 'succeeds'),
('alternative', 'related', 'succeeds'),
('alternative', 'alternative', ''),
('alternative', 'mixed', 'raises'),
('mixed', 'no_content', 'succeeds'),
('mixed', 'none', 'succeeds'),
('mixed', 'plain', 'succeeds'),
('mixed', 'related', 'succeeds'),
('mixed', 'alternative', 'succeeds'),
('mixed', 'mixed', ''),
)
def _make_subtype_test_message(self, subtype):
m = self.message()
payload = None
msg_headers = [
('To', 'foo@bar.com'),
('From', 'bar@foo.com'),
]
if subtype != 'no_content':
('content-shadow', 'Logrus'),
msg_headers.append(('X-Random-Header', 'Corwin'))
if subtype == 'text':
payload = ''
msg_headers.append(('Content-Type', 'text/plain'))
m.set_payload('')
elif subtype != 'no_content':
payload = []
msg_headers.append(('Content-Type', 'multipart/' + subtype))
msg_headers.append(('X-Trump', 'Random'))
m.set_payload(payload)
for name, value in msg_headers:
m[name] = value
return m, msg_headers, payload
def _check_disallowed_subtype_raises(self, m, method_name, subtype, method):
with self.assertRaises(ValueError) as ar:
getattr(m, method)()
exc_text = str(ar.exception)
self.assertIn(subtype, exc_text)
self.assertIn(method_name, exc_text)
def _check_make_multipart(self, m, msg_headers, payload):
count = 0
for name, value in msg_headers:
if not name.lower().startswith('content-'):
self.assertEqual(m[name], value)
count += 1
self.assertEqual(len(m), count+1) # +1 for new Content-Type
part = next(m.iter_parts())
count = 0
for name, value in msg_headers:
if name.lower().startswith('content-'):
self.assertEqual(part[name], value)
count += 1
self.assertEqual(len(part), count)
self.assertEqual(part.get_payload(), payload)
def subtype_as_make(self, method, subtype, outcome):
m, msg_headers, payload = self._make_subtype_test_message(subtype)
make_method = 'make_' + method
if outcome in ('', 'raises'):
self._check_disallowed_subtype_raises(m, method, subtype, make_method)
return
getattr(m, make_method)()
self.assertEqual(m.get_content_maintype(), 'multipart')
self.assertEqual(m.get_content_subtype(), method)
if subtype == 'no_content':
self.assertEqual(len(m.get_payload()), 0)
self.assertEqual(m.items(),
msg_headers + [('Content-Type',
'multipart/'+method)])
else:
self.assertEqual(len(m.get_payload()), 1)
self._check_make_multipart(m, msg_headers, payload)
def subtype_as_make_with_boundary(self, method, subtype, outcome):
# Doing all variation is a bit of overkill...
m = self.message()
if outcome in ('', 'raises'):
m['Content-Type'] = 'multipart/' + subtype
with self.assertRaises(ValueError) as cm:
getattr(m, 'make_' + method)()
return
if subtype == 'plain':
m['Content-Type'] = 'text/plain'
elif subtype != 'no_content':
m['Content-Type'] = 'multipart/' + subtype
getattr(m, 'make_' + method)(boundary="abc")
self.assertTrue(m.is_multipart())
self.assertEqual(m.get_boundary(), 'abc')
def test_policy_on_part_made_by_make_comes_from_message(self):
for method in ('make_related', 'make_alternative', 'make_mixed'):
m = self.message(policy=self.policy.clone(content_manager='foo'))
m['Content-Type'] = 'text/plain'
getattr(m, method)()
self.assertEqual(m.get_payload(0).policy.content_manager, 'foo')
class _TestSetContentManager:
def set_content(self, msg, content, *args, **kw):
msg['Content-Type'] = 'text/plain'
msg.set_payload(content)
def subtype_as_add(self, method, subtype, outcome):
m, msg_headers, payload = self._make_subtype_test_message(subtype)
cm = self._TestSetContentManager()
add_method = 'add_attachment' if method=='mixed' else 'add_' + method
if outcome == 'raises':
self._check_disallowed_subtype_raises(m, method, subtype, add_method)
return
getattr(m, add_method)('test', content_manager=cm)
self.assertEqual(m.get_content_maintype(), 'multipart')
self.assertEqual(m.get_content_subtype(), method)
if method == subtype or subtype == 'no_content':
self.assertEqual(len(m.get_payload()), 1)
for name, value in msg_headers:
self.assertEqual(m[name], value)
part = m.get_payload()[0]
else:
self.assertEqual(len(m.get_payload()), 2)
self._check_make_multipart(m, msg_headers, payload)
part = m.get_payload()[1]
self.assertEqual(part.get_content_type(), 'text/plain')
self.assertEqual(part.get_payload(), 'test')
if method=='mixed':
self.assertEqual(part['Content-Disposition'], 'attachment')
elif method=='related':
self.assertEqual(part['Content-Disposition'], 'inline')
else:
# Otherwise we don't guess.
self.assertIsNone(part['Content-Disposition'])
class _TestSetRaisingContentManager:
def set_content(self, msg, content, *args, **kw):
raise Exception('test')
def test_default_content_manager_for_add_comes_from_policy(self):
cm = self._TestSetRaisingContentManager()
m = self.message(policy=self.policy.clone(content_manager=cm))
for method in ('add_related', 'add_alternative', 'add_attachment'):
with self.assertRaises(Exception) as ar:
getattr(m, method)('')
self.assertEqual(str(ar.exception), 'test')
def message_as_clear(self, body_parts, attachments, parts, msg):
m = self._str_msg(msg)
m.clear()
self.assertEqual(len(m), 0)
self.assertEqual(list(m.items()), [])
self.assertIsNone(m.get_payload())
self.assertEqual(list(m.iter_parts()), [])
def message_as_clear_content(self, body_parts, attachments, parts, msg):
m = self._str_msg(msg)
expected_headers = [h for h in m.keys()
if not h.lower().startswith('content-')]
m.clear_content()
self.assertEqual(list(m.keys()), expected_headers)
self.assertIsNone(m.get_payload())
self.assertEqual(list(m.iter_parts()), [])
def test_is_attachment(self):
m = self._make_message()
self.assertFalse(m.is_attachment)
m['Content-Disposition'] = 'inline'
self.assertFalse(m.is_attachment)
m.replace_header('Content-Disposition', 'attachment')
self.assertTrue(m.is_attachment)
m.replace_header('Content-Disposition', 'AtTachMent')
self.assertTrue(m.is_attachment)
class TestEmailMessage(TestEmailMessageBase, TestEmailBase):
message = EmailMessage
def test_set_content_adds_MIME_Version(self):
m = self._str_msg('')
cm = self._TestContentManager()
self.assertNotIn('MIME-Version', m)
m.set_content(content_manager=cm)
self.assertEqual(m['MIME-Version'], '1.0')
class _MIME_Version_adding_CM:
def set_content(self, msg, *args, **kw):
msg['MIME-Version'] = '1.0'
def test_set_content_does_not_duplicate_MIME_Version(self):
m = self._str_msg('')
cm = self._MIME_Version_adding_CM()
self.assertNotIn('MIME-Version', m)
m.set_content(content_manager=cm)
self.assertEqual(m['MIME-Version'], '1.0')
class TestMIMEPart(TestEmailMessageBase, TestEmailBase):
# Doing the full test run here may seem a bit redundant, since the two
# classes are almost identical. But what if they drift apart? So we do
# the full tests so that any future drift doesn't introduce bugs.
message = MIMEPart
def test_set_content_does_not_add_MIME_Version(self):
m = self._str_msg('')
cm = self._TestContentManager()
self.assertNotIn('MIME-Version', m)
m.set_content(content_manager=cm)
self.assertNotIn('MIME-Version', m)
if __name__ == '__main__':
unittest.main()

View File

@ -30,6 +30,7 @@ class PolicyAPITests(unittest.TestCase):
'raise_on_defect': False,
'header_factory': email.policy.EmailPolicy.header_factory,
'refold_source': 'long',
'content_manager': email.policy.EmailPolicy.content_manager,
})
# For each policy under test, we give here what we expect the defaults to

View File

@ -42,6 +42,9 @@ Core and Builtins
Library
-------
- Issue #18891: Completed the new email package (provisional) API additions
by adding new classes EmailMessage, MIMEPart, and ContentManager.
- Issue #18468: The re.split, re.findall, and re.sub functions and the group()
and groups() methods of match object now always return a string or a bytes
object.