Oops. I copied a slightly older version of the email package from the sandbox.

This should restore the email package in the py3k branch to exactly what's in
the sandbox.

This wipes out 1-2 fixes made post-copy, which I'll re-apply shortly.
This commit is contained in:
Guido van Rossum 2007-08-30 03:46:43 +00:00
parent 2c440a1086
commit 9604e66660
8 changed files with 538 additions and 476 deletions

View File

@ -25,7 +25,6 @@ module.
"""
__all__ = [
'base64_len',
'body_decode',
'body_encode',
'decode',
@ -33,12 +32,13 @@ __all__ = [
'encode',
'encodestring',
'header_encode',
'header_length',
]
import re
from base64 import b64encode
from binascii import b2a_base64, a2b_base64
from email.utils import fix_eols
CRLF = '\r\n'
NL = '\n'
@ -50,11 +50,10 @@ MISC_LEN = 7
# Helpers
def base64_len(s):
def header_length(bytearray):
"""Return the length of s when it is encoded with base64."""
groups_of_3, leftover = divmod(len(s), 3)
groups_of_3, leftover = divmod(len(bytearray), 3)
# 4 bytes out for each 3 bytes (or nonzero fraction thereof) in.
# Thanks, Tim!
n = groups_of_3 * 4
if leftover:
n += 4
@ -62,74 +61,26 @@ def base64_len(s):
def header_encode(header, charset='iso-8859-1', keep_eols=False,
maxlinelen=76, eol=NL):
def header_encode(header_bytes, charset='iso-8859-1'):
"""Encode a single header line with Base64 encoding in a given charset.
Defined in RFC 2045, this Base64 encoding is identical to normal Base64
encoding, except that each line must be intelligently wrapped (respecting
the Base64 encoding), and subsequent lines must start with a space.
charset names the character set to use to encode the header. It defaults
to iso-8859-1.
End-of-line characters (\\r, \\n, \\r\\n) will be automatically converted
to the canonical email line separator \\r\\n unless the keep_eols
parameter is True (the default is False).
Each line of the header will be terminated in the value of eol, which
defaults to "\\n". Set this to "\\r\\n" if you are using the result of
this function directly in email.
The resulting string will be in the form:
"=?charset?b?WW/5ciBtYXp66XLrIHf8eiBhIGhhbXBzdGHuciBBIFlv+XIgbWF6euly?=\\n
=?charset?b?6yB3/HogYSBoYW1wc3Rh7nIgQkMgWW/5ciBtYXp66XLrIHf8eiBhIGhh?="
with each line wrapped at, at most, maxlinelen characters (defaults to 76
characters).
to iso-8859-1. Base64 encoding is defined in RFC 2045.
"""
# Return empty headers unchanged
if not header:
return header
if not keep_eols:
header = fix_eols(header)
# Base64 encode each line, in encoded chunks no greater than maxlinelen in
# length, after the RFC chrome is added in.
base64ed = []
max_encoded = maxlinelen - len(charset) - MISC_LEN
max_unencoded = max_encoded * 3 // 4
for i in range(0, len(header), max_unencoded):
base64ed.append(b2a_base64(header[i:i+max_unencoded]))
# Now add the RFC chrome to each encoded chunk
lines = []
for line in base64ed:
# Ignore the last character of each line if it is a newline
if line[-1] == ord(NL):
line = line[:-1]
# Add the chrome
lines.append('=?%s?b?%s?=' % (charset, line))
# Glue the lines together and return it. BAW: should we be able to
# specify the leading whitespace in the joiner?
joiner = eol + ' '
return joiner.join(lines)
if not header_bytes:
return str(header_bytes)
encoded = b64encode(header_bytes)
return '=?%s?b?%s?=' % (charset, encoded)
def encode(s, binary=True, maxlinelen=76, eol=NL):
def body_encode(s, maxlinelen=76, eol=NL):
"""Encode a string with base64.
Each line will be wrapped at, at most, maxlinelen characters (defaults to
76 characters).
If binary is False, end-of-line characters will be converted to the
canonical email end-of-line sequence \\r\\n. Otherwise they will be left
verbatim (this is the default).
Each line of encoded text will end with eol, which defaults to "\\n". Set
this to "\r\n" if you will be using the result of this function directly
in an email.
@ -137,9 +88,6 @@ def encode(s, binary=True, maxlinelen=76, eol=NL):
if not s:
return s
if not binary:
s = fix_eols(s)
encvec = []
max_unencoded = maxlinelen * 3 // 4
for i in range(0, len(s), max_unencoded):
@ -152,25 +100,26 @@ def encode(s, binary=True, maxlinelen=76, eol=NL):
return EMPTYSTRING.join(encvec)
# For convenience and backwards compatibility w/ standard base64 module
body_encode = encode
encodestring = encode
def decode(string):
def decode(s, convert_eols=False):
"""Decode a raw base64 string, returning a bytes object.
This function does not parse a full MIME header value encoded with base64
(like =?iso-8895-1?b?bmloISBuaWgh?=) -- use the high level
email.Header class for that functionality.
If convert_eols is set to a string value, all canonical email linefeeds,
e.g. "\\r\\n", in the decoded text will be converted to the value of
convert_eols. os.linesep is a good choice for convert_eols if you are
decoding a text attachment.
This function does not parse a full MIME header value encoded with
base64 (like =?iso-8895-1?b?bmloISBuaWgh?=) -- please use the high
level email.Header class for that functionality.
"""
if not string:
return bytes()
elif isinstance(string, str):
return a2b_base64(string.encode('raw-unicode-escape'))
else:
return a2b_base64(string)
if not s:
return s
dec = a2b_base64(s)
if convert_eols:
return dec.replace(CRLF, convert_eols)
return dec
# For convenience and backwards compatibility w/ standard base64 module

View File

@ -9,6 +9,8 @@ __all__ = [
'add_codec',
]
from functools import partial
import email.base64mime
import email.quoprimime
@ -23,9 +25,10 @@ BASE64 = 2 # Base64
SHORTEST = 3 # the shorter of QP and base64, but only for headers
# In "=?charset?q?hello_world?=", the =?, ?q?, and ?= add up to 7
MISC_LEN = 7
RFC2047_CHROME_LEN = 7
DEFAULT_CHARSET = 'us-ascii'
EMPTYSTRING = ''
@ -259,63 +262,6 @@ class Charset:
else:
return encode_7or8bit
def convert(self, s):
"""Convert a string from the input_codec to the output_codec."""
if self.input_codec != self.output_codec:
rawbytes = bytes(ord(c) for c in s)
decoded = rawbytes.decode(self.input_codec)
encoded = decoded.encode(self.output_codec)
return str(encoded)
else:
return s
def to_splittable(self, s):
"""Convert a possibly multibyte string to a safely splittable format.
Uses the input_codec to try and convert the string to Unicode, so it
can be safely split on character boundaries (even for multibyte
characters).
Returns the string as-is if it isn't known how to convert it to
Unicode with the input_charset.
Characters that could not be converted to Unicode will be replaced
with the Unicode replacement character U+FFFD.
"""
if isinstance(s, str) or self.input_codec is None:
return s
try:
return str(s, self.input_codec, 'replace')
except LookupError:
# Input codec not installed on system, so return the original
# string unchanged.
return s
def from_splittable(self, ustr, to_output=True):
"""Convert a splittable string back into an encoded string.
Uses the proper codec to try and convert the string from Unicode back
into an encoded format. Return the string as-is if it is not Unicode,
or if it could not be converted from Unicode.
Characters that could not be converted from Unicode will be replaced
with an appropriate character (usually '?').
If to_output is True (the default), uses output_codec to convert to an
encoded format. If to_output is False, uses input_codec.
"""
if to_output:
codec = self.output_codec
else:
codec = self.input_codec
if not isinstance(ustr, str) or codec is None:
return ustr
try:
return str(ustr.encode(codec, 'replace'))
except LookupError:
# Output codec not installed
return ustr
def get_output_charset(self):
"""Return the output character set.
@ -324,66 +270,115 @@ class Charset:
"""
return self.output_charset or self.input_charset
def encoded_header_len(self, s):
"""Return the length of the encoded header string."""
cset = self.get_output_charset()
# The len(s) of a 7bit encoding is len(s)
if self.header_encoding == BASE64:
return email.base64mime.base64_len(s) + len(cset) + MISC_LEN
elif self.header_encoding == QP:
return email.quoprimime.header_quopri_len(s) + len(cset) + MISC_LEN
elif self.header_encoding == SHORTEST:
lenb64 = email.base64mime.base64_len(s)
lenqp = email.quoprimime.header_quopri_len(s)
return min(lenb64, lenqp) + len(cset) + MISC_LEN
else:
return len(s)
def header_encode(self, string):
"""Header-encode a string by converting it first to bytes.
:param string: A unicode string for the header. This must be
encodable to bytes using the current character set's `output_codec`.
The type of encoding (base64 or quoted-printable) will be based on
this charset's `header_encoding`.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:return: The encoded string, with RFC 2047 chrome.
"""
codec = self.output_codec or 'us-ascii'
charset = self.get_output_charset()
header_bytes = string.encode(codec)
# 7bit/8bit encodings return the string unchanged (modulo conversions)
if self.header_encoding == BASE64:
encoder = email.base64mime.header_encode
elif self.header_encoding == QP:
encoder = email.quoprimime.header_encode
elif self.header_encoding == SHORTEST:
lenb64 = email.base64mime.base64_len(header_bytes)
lenqp = email.quoprimime.header_quopri_len(header_bytes)
if lenb64 < lenqp:
encoder = email.base64mime.header_encode
else:
encoder = email.quoprimime.header_encode
else:
encoder_module = self._get_encoder(header_bytes)
if encoder_module is None:
return string
return encoder(header_bytes, codec)
return encoder_module.header_encode(header_bytes, codec)
def body_encode(self, s, convert=True):
"""Body-encode a string and convert it to output_charset.
def header_encode_lines(self, string, maxlengths):
"""Header-encode a string by converting it first to bytes.
If convert is True (the default), the string will be converted from
the input charset to output charset automatically. Unlike
header_encode(), there are no issues with byte boundaries and
multibyte charsets in email bodies, so this is usually pretty safe.
This is similar to `header_encode()` except that the string is fit
into maximum line lengths as given by the arguments.
:param string: A unicode string for the header. It must be possible
to encode this string to bytes using the character set's
output codec.
:param maxlengths: Maximum line length iterator. Each element
returned from this iterator will provide the next maximum line
length. This parameter is used as an argument to built-in next()
and should never be exhausted. The maximum line lengths should
not count the RFC 2047 chrome. These line lengths are only a
hint; the splitter does the best it can.
:param firstmaxlen: The maximum line length of the first line. If
None (the default), then `maxlen` is used for the first line.
:return: Lines of encoded strings, each with RFC 2047 chrome.
"""
# See which encoding we should use.
codec = self.output_codec or 'us-ascii'
header_bytes = string.encode(codec)
encoder_module = self._get_encoder(header_bytes)
encoder = partial(encoder_module.header_encode, charset=str(self))
# Calculate the number of characters that the RFC 2047 chrome will
# contribute to each line.
charset = self.get_output_charset()
extra = len(charset) + RFC2047_CHROME_LEN
# Now comes the hard part. We must encode bytes but we can't split on
# bytes because some character sets are variable length and each
# encoded word must stand on its own. So the problem is you have to
# encode to bytes to figure out this word's length, but you must split
# on characters. This causes two problems: first, we don't know how
# many octets a specific substring of unicode characters will get
# encoded to, and second, we don't know how many ASCII characters
# those octets will get encoded to. Unless we try it. Which seems
# inefficient. In the interest of being correct rather than fast (and
# in the hope that there will be few encoded headers in any such
# message), brute force it. :(
lines = []
current_line = []
maxlen = next(maxlengths) - extra
for character in string:
current_line.append(character)
this_line = EMPTYSTRING.join(current_line)
length = encoder_module.header_length(this_line.encode(charset))
if length > maxlen:
# This last character doesn't fit so pop it off.
current_line.pop()
# Does nothing fit on the first line?
if not lines and not current_line:
lines.append(None)
else:
separator = (' ' if lines else '')
joined_line = EMPTYSTRING.join(current_line)
header_bytes = joined_line.encode(codec)
lines.append(encoder(header_bytes))
current_line = [character]
maxlen = next(maxlengths) - extra
joined_line = EMPTYSTRING.join(current_line)
header_bytes = joined_line.encode(codec)
lines.append(encoder(header_bytes))
return lines
def _get_encoder(self, header_bytes):
if self.header_encoding == BASE64:
return email.base64mime
elif self.header_encoding == QP:
return email.quoprimime
elif self.header_encoding == SHORTEST:
len64 = email.base64mime.header_length(header_bytes)
lenqp = email.quoprimime.header_length(header_bytes)
if len64 < lenqp:
return email.base64mime
else:
return email.quoprimime
else:
return None
def body_encode(self, string):
"""Body-encode a string by converting it first to bytes.
The type of encoding (base64 or quoted-printable) will be based on
self.body_encoding.
"""
if convert:
s = self.convert(s)
# 7bit/8bit encodings return the string unchanged (module conversions)
if self.body_encoding is BASE64:
return email.base64mime.body_encode(s)
return email.base64mime.body_encode(string)
elif self.body_encoding is QP:
return email.quoprimime.body_encode(s)
return email.quoprimime.body_encode(string)
else:
return s
return string

View File

@ -133,12 +133,8 @@ class Generator:
def _write_headers(self, msg):
for h, v in msg.items():
print('%s:' % h, end=' ', file=self._fp)
if self._maxheaderlen == 0:
# Explicit no-wrapping
print(v, file=self._fp)
elif isinstance(v, Header):
# Header instances know what to do
print(v.encode(), file=self._fp)
if isinstance(v, Header):
print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
else:
# Header's got lots of smarts, so use it.
header = Header(v, maxlinelen=self._maxheaderlen,

View File

@ -25,10 +25,11 @@ BSPACE = b' '
SPACE8 = ' ' * 8
EMPTYSTRING = ''
MAXLINELEN = 76
MAXLINELEN = 78
USASCII = Charset('us-ascii')
UTF8 = Charset('utf-8')
TRANSITIONAL_SPACE = object()
# Match encoded-word strings in the form =?charset?q?Hello_World?=
ecre = re.compile(r'''
@ -109,7 +110,7 @@ def decode_header(header):
last_word = last_charset = None
for word, charset in decoded_words:
if isinstance(word, str):
word = bytes(ord(c) for c in word)
word = bytes(word, 'raw-unicode-escape')
if last_word is None:
last_word = word
last_charset = charset
@ -170,7 +171,8 @@ class Header:
The maximum line length can be specified explicit via maxlinelen. For
splitting the first line to a shorter value (to account for the field
header which isn't included in s, e.g. `Subject') pass in the name of
the field in header_name. The default maxlinelen is 76.
the field in header_name. The default maxlinelen is 78 as recommended
by RFC 2822.
continuation_ws must be RFC 2822 compliant folding whitespace (usually
either a space or a hard tab) which will be prepended to continuation
@ -198,9 +200,10 @@ class Header:
def __str__(self):
"""Return the string value of the header."""
self._normalize()
uchunks = []
lastcs = None
for s, charset in self._chunks:
for string, charset in self._chunks:
# We must preserve spaces between encoded and non-encoded word
# boundaries, which means for us we need to add a space when we go
# from a charset to None/us-ascii, or from None/us-ascii to a
@ -214,15 +217,16 @@ class Header:
elif nextcs not in (None, 'us-ascii'):
uchunks.append(SPACE)
lastcs = nextcs
uchunks.append(s)
uchunks.append(string)
return EMPTYSTRING.join(uchunks)
# Rich comparison operators for equality only. BAW: does it make sense to
# have or explicitly disable <, <=, >, >= operators?
def __eq__(self, other):
# other may be a Header or a string. Both are fine so coerce
# ourselves to a string, swap the args and do another comparison.
return other == self.encode()
# ourselves to a unicode (of the unencoded header value), swap the
# args and do another comparison.
return other == str(self)
def __ne__(self, other):
return not self == other
@ -267,7 +271,7 @@ class Header:
output_string = input_bytes.decode(output_charset, errors)
self._chunks.append((output_string, charset))
def encode(self, splitchars=';, \t'):
def encode(self, splitchars=';, \t', maxlinelen=None):
"""Encode a message header into an RFC-compliant format.
There are many issues involved in converting a given string for use in
@ -290,7 +294,14 @@ class Header:
syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
"""
self._normalize()
formatter = _ValueFormatter(self._headerlen, self._maxlinelen,
if maxlinelen is None:
maxlinelen = self._maxlinelen
# A maxlinelen of 0 means don't wrap. For all practical purposes,
# choosing a huge number here accomplishes that and makes the
# _ValueFormatter algorithm much simpler.
if maxlinelen == 0:
maxlinelen = 1000000
formatter = _ValueFormatter(self._headerlen, maxlinelen,
self._continuation_ws, splitchars)
for string, charset in self._chunks:
lines = string.splitlines()
@ -301,9 +312,8 @@ class Header:
return str(formatter)
def _normalize(self):
# Normalize the chunks so that all runs of identical charsets get
# collapsed into a single unicode string. You need a space between
# encoded words, or between encoded and unencoded words.
# Step 1: Normalize the chunks so that all runs of identical charsets
# get collapsed into a single unicode string.
chunks = []
last_charset = None
last_chunk = []
@ -313,8 +323,6 @@ class Header:
else:
if last_charset is not None:
chunks.append((SPACE.join(last_chunk), last_charset))
if last_charset != USASCII or charset != USASCII:
chunks.append((' ', USASCII))
last_chunk = [string]
last_charset = charset
if last_chunk:
@ -333,6 +341,10 @@ class _ValueFormatter:
self._current_line = _Accumulator(headerlen)
def __str__(self):
# Remove the trailing TRANSITIONAL_SPACE
last_line = self._current_line.pop()
if last_line is not TRANSITIONAL_SPACE:
self._current_line.push(last_line)
self.newline()
return NL.join(self._lines)
@ -348,24 +360,66 @@ class _ValueFormatter:
if len(encoded_string) + len(self._current_line) <= self._maxlen:
self._current_line.push(encoded_string)
return
# Attempt to split the line at the highest-level syntactic break
# possible. Note that we don't have a lot of smarts about field
# If the charset has no header encoding (i.e. it is an ASCII encoding)
# then we must split the header at the "highest level syntactic break"
# possible. Note that we don't have a lot of smarts about field
# syntax; we just try to break on semi-colons, then commas, then
# whitespace. Eventually, we'll allow this to be pluggable.
for ch in self._splitchars:
if ch in string:
break
else:
# We can't split the string to fit on the current line, so just
# put it on a line by itself.
self._lines.append(str(self._current_line))
self._current_line.reset(self._continuation_ws)
self._current_line.push(encoded_string)
# whitespace. Eventually, this should be pluggable.
if charset.header_encoding is None:
for ch in self._splitchars:
if ch in string:
break
else:
ch = None
# If there's no available split character then regardless of
# whether the string fits on the line, we have to put it on a line
# by itself.
if ch is None:
if not self._current_line.is_onlyws():
self._lines.append(str(self._current_line))
self._current_line.reset(self._continuation_ws)
self._current_line.push(encoded_string)
else:
self._ascii_split(string, ch)
return
self._spliterate(string, ch, charset)
# Otherwise, we're doing either a Base64 or a quoted-printable
# encoding which means we don't need to split the line on syntactic
# breaks. We can basically just find enough characters to fit on the
# current line, minus the RFC 2047 chrome. What makes this trickier
# though is that we have to split at octet boundaries, not character
# boundaries but it's only safe to split at character boundaries so at
# best we can only get close.
encoded_lines = charset.header_encode_lines(string, self._maxlengths())
# The first element extends the current line, but if it's None then
# nothing more fit on the current line so start a new line.
try:
first_line = encoded_lines.pop(0)
except IndexError:
# There are no encoded lines, so we're done.
return
if first_line is not None:
self._current_line.push(first_line)
self._lines.append(str(self._current_line))
self._current_line.reset(self._continuation_ws)
try:
last_line = encoded_lines.pop()
except IndexError:
# There was only one line.
return
self._current_line.push(last_line)
self._current_line.push(TRANSITIONAL_SPACE)
# Everything else are full lines in themselves.
for line in encoded_lines:
self._lines.append(self._continuation_ws + line)
def _spliterate(self, string, ch, charset):
holding = _Accumulator(transformfunc=charset.header_encode)
def _maxlengths(self):
# The first line's length.
yield self._maxlen - len(self._current_line)
while True:
yield self._maxlen - self._continuation_ws_len
def _ascii_split(self, string, ch):
holding = _Accumulator()
# Split the line on the split character, preserving it. If the split
# character is whitespace RFC 2822 $2.2.3 requires us to fold on the
# whitespace, so that the line leads with the original whitespace we
@ -387,8 +441,7 @@ class _ValueFormatter:
# line, watch out for the current line containing only
# whitespace.
holding.pop()
if len(self._current_line) == 0 and (
len(holding) == 0 or str(holding).isspace()):
if self._current_line.is_onlyws() and holding.is_onlyws():
# Don't start a new line.
holding.push(part)
part = None
@ -492,12 +545,8 @@ def _spliterator(character, string):
class _Accumulator:
def __init__(self, initial_size=0, transformfunc=None):
def __init__(self, initial_size=0):
self._initial_size = initial_size
if transformfunc is None:
self._transformfunc = lambda string: string
else:
self._transformfunc = transformfunc
self._current = []
def push(self, string):
@ -507,14 +556,21 @@ class _Accumulator:
return self._current.pop()
def __len__(self):
return len(str(self)) + self._initial_size
return sum((len(string)
for string in self._current
if string is not TRANSITIONAL_SPACE),
self._initial_size)
def __str__(self):
return self._transformfunc(EMPTYSTRING.join(self._current))
return EMPTYSTRING.join(
(' ' if string is TRANSITIONAL_SPACE else string)
for string in self._current)
def reset(self, string=None):
self._current = []
self._current_len = 0
self._initial_size = 0
if string is not None:
self.push(string)
def is_onlyws(self):
return len(self) == 0 or str(self).isspace()

View File

@ -13,9 +13,9 @@ import warnings
from io import BytesIO, StringIO
# Intrapackage imports
import email.charset
from email import utils
from email import errors
from email.charset import Charset
SEMISPACE = '; '
@ -201,7 +201,7 @@ class Message:
# Incorrect padding
pass
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
in_file = BytesIO((payload + '\n').encode('raw-unicode-escape'))
in_file = BytesIO(bytes(payload + '\n'))
out_file = BytesIO()
try:
uu.decode(in_file, out_file, quiet=True)
@ -211,7 +211,7 @@ class Message:
pass
# Is there a better way to do this? We can't use the bytes
# constructor.
return bytes(ord(c) for c in payload)
return bytes(payload, 'raw-unicode-escape')
def set_payload(self, payload, charset=None):
"""Set the payload to the given value.
@ -236,18 +236,13 @@ class Message:
and encoded properly, if needed, when generating the plain text
representation of the message. MIME headers (MIME-Version,
Content-Type, Content-Transfer-Encoding) will be added as needed.
"""
if charset is None:
self.del_param('charset')
self._charset = None
return
if isinstance(charset, basestring):
charset = email.charset.Charset(charset)
if not isinstance(charset, email.charset.Charset):
raise TypeError(charset)
# BAW: should we accept strings that can serve as arguments to the
# Charset constructor?
if not isinstance(charset, Charset):
charset = Charset(charset)
self._charset = charset
if 'MIME-Version' not in self:
self.add_header('MIME-Version', '1.0')
@ -256,7 +251,7 @@ class Message:
charset=charset.get_output_charset())
else:
self.set_param('charset', charset.get_output_charset())
if str(charset) != charset.get_output_charset():
if charset != charset.get_output_charset():
self._payload = charset.body_encode(self._payload)
if 'Content-Transfer-Encoding' not in self:
cte = charset.get_body_encoding()
@ -757,8 +752,7 @@ class Message:
# LookupError will be raised if the charset isn't known to
# Python. UnicodeError will be raised if the encoded text
# contains a character not in the charset.
as_bytes = charset[2].encode('raw-unicode-escape')
charset = str(as_bytes, pcharset)
charset = str(bytes(charset[2]), pcharset)
except (LookupError, UnicodeError):
charset = charset[2]
# charset characters must be in us-ascii range

View File

@ -29,16 +29,14 @@ wrapping issues, use the email.Header module.
__all__ = [
'body_decode',
'body_encode',
'body_quopri_check',
'body_quopri_len',
'body_length',
'decode',
'decodestring',
'encode',
'encodestring',
'header_decode',
'header_encode',
'header_quopri_check',
'header_quopri_len',
'header_length',
'quote',
'unquote',
]
@ -46,54 +44,65 @@ __all__ = [
import re
from string import ascii_letters, digits, hexdigits
from email.utils import fix_eols
CRLF = '\r\n'
NL = '\n'
EMPTYSTRING = ''
# See also Charset.py
MISC_LEN = 7
# Build a mapping of octets to the expansion of that octet. Since we're only
# going to have 256 of these things, this isn't terribly inefficient
# space-wise. Remember that headers and bodies have different sets of safe
# characters. Initialize both maps with the full expansion, and then override
# the safe bytes with the more compact form.
_QUOPRI_HEADER_MAP = dict((c, '=%02X' % c) for c in range(256))
_QUOPRI_BODY_MAP = _QUOPRI_HEADER_MAP.copy()
HEADER_SAFE_BYTES = (b'-!*+/ ' +
ascii_letters.encode('raw-unicode-escape') +
digits.encode('raw-unicode-escape'))
# Safe header bytes which need no encoding.
for c in b'-!*+/' + bytes(ascii_letters) + bytes(digits):
_QUOPRI_HEADER_MAP[c] = chr(c)
# Headers have one other special encoding; spaces become underscores.
_QUOPRI_HEADER_MAP[ord(' ')] = '_'
BODY_SAFE_BYTES = (b' !"#$%&\'()*+,-./0123456789:;<>'
b'?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`'
b'abcdefghijklmnopqrstuvwxyz{|}~\t')
# Safe body bytes which need no encoding.
for c in (b' !"#$%&\'()*+,-./0123456789:;<>'
b'?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`'
b'abcdefghijklmnopqrstuvwxyz{|}~\t'):
_QUOPRI_BODY_MAP[c] = chr(c)
# Helpers
def header_quopri_check(c):
"""Return True if the character should be escaped with header quopri."""
return c not in HEADER_SAFE_BYTES
def header_check(octet):
"""Return True if the octet should be escaped with header quopri."""
return chr(octet) != _QUOPRI_HEADER_MAP[octet]
def body_quopri_check(c):
"""Return True if the character should be escaped with body quopri."""
return c not in BODY_SAFE_BYTES
def body_check(octet):
"""Return True if the octet should be escaped with body quopri."""
return chr(octet) != _QUOPRI_BODY_MAP[octet]
def header_quopri_len(bytearray):
"""Return the length of bytearray when it is encoded with header quopri.
def header_length(bytearray):
"""Return a header quoted-printable encoding length.
Note that this does not include any RFC 2047 chrome added by
`header_encode()`.
:param bytearray: An array of bytes (a.k.a. octets).
:return: The length in bytes of the byte array when it is encoded with
quoted-printable for headers.
"""
count = 0
for c in bytearray:
count += (3 if header_quopri_check(c) else 1)
return count
return sum(len(_QUOPRI_HEADER_MAP[octet]) for octet in bytearray)
def body_quopri_len(bytearray):
"""Return the length of bytearray when it is encoded with body quopri."""
count = 0
for c in bytearray:
count += (3 if body_quopri_check(c) else 1)
return count
def body_length(bytearray):
"""Return a body quoted-printable encoding length.
:param bytearray: An array of bytes (a.k.a. octets).
:return: The length in bytes of the byte array when it is encoded with
quoted-printable for bodies.
"""
return sum(len(_QUOPRI_BODY_MAP[octet]) for octet in bytearray)
def _max_append(L, s, maxlen, extra=''):
@ -133,29 +142,17 @@ def header_encode(header_bytes, charset='iso-8859-1'):
return str(header_bytes)
# Iterate over every byte, encoding if necessary.
encoded = []
for character in header_bytes:
# Space may be represented as _ instead of =20 for readability
if character == ord(' '):
encoded.append('_')
# These characters can be included verbatim.
elif not header_quopri_check(character):
encoded.append(chr(character))
# Otherwise, replace with hex value like =E2
else:
encoded.append('=%02X' % character)
for octet in header_bytes:
encoded.append(_QUOPRI_HEADER_MAP[octet])
# Now add the RFC chrome to each encoded chunk and glue the chunks
# together.
return '=?%s?q?%s?=' % (charset, EMPTYSTRING.join(encoded))
def encode(body, binary=False, maxlinelen=76, eol=NL):
def body_encode(body, maxlinelen=76, eol=NL):
"""Encode with quoted-printable, wrapping at maxlinelen characters.
If binary is False (the default), end-of-line characters will be converted
to the canonical email end-of-line sequence \\r\\n. Otherwise they will
be left verbatim.
Each line of encoded text will end with eol, which defaults to "\\n". Set
this to "\\r\\n" if you will be using the result of this function directly
in an email.
@ -168,9 +165,6 @@ def encode(body, binary=False, maxlinelen=76, eol=NL):
if not body:
return body
if not binary:
body = fix_eols(body)
# BAW: We're accumulating the body text by string concatenation. That
# can't be very efficient, but I don't have time now to rewrite it. It
# just feels like this algorithm could be more efficient.
@ -195,7 +189,7 @@ def encode(body, binary=False, maxlinelen=76, eol=NL):
for j in range(linelen):
c = line[j]
prev = c
if body_quopri_check(c):
if body_check(c):
c = quote(c)
elif j+1 == linelen:
# Check for whitespace at end of line; special case
@ -231,11 +225,6 @@ def encode(body, binary=False, maxlinelen=76, eol=NL):
return encoded_body
# For convenience and backwards compatibility w/ standard base64 module
body_encode = encode
encodestring = encode
# BAW: I'm not sure if the intent was for the signature of this function to be
# the same as base64MIME.decode() or not...

View File

@ -482,7 +482,7 @@ class TestMessageAPI(TestEmailBase):
msg['content-transfer-encoding'] = 'base64'
msg.set_payload(x)
self.assertEqual(msg.get_payload(decode=True),
bytes(ord(c) for c in x))
bytes(x, 'raw-unicode-escape'))
@ -580,31 +580,31 @@ bug demonstration
g = Generator(sfp)
g.flatten(msg)
eq(sfp.getvalue(), """\
Subject: =?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerd?=
=?iso-8859-1?q?erband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndi?=
=?iso-8859-1?q?schen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Kling?=
=?iso-8859-1?q?en_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_met?=
=?iso-8859-2?q?ropole_se_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?=
=?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE?=
=?utf-8?b?44G+44Gb44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB?=
=?utf-8?b?44GC44Go44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CM?=
=?utf-8?q?Wenn_ist_das_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das?=
=?utf-8?b?IE9kZXIgZGllIEZsaXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBow==?=
=?utf-8?b?44Gm44GE44G+44GZ44CC?=
Subject: =?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderb?=
=?iso-8859-1?q?and_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen?=
=?iso-8859-1?q?_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef?=
=?iso-8859-1?q?=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hrouti?=
=?iso-8859-2?q?ly_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= =?utf-8?b?5q2j56K6?=
=?utf-8?b?44Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb44KT44CC5LiA?=
=?utf-8?b?6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go44Gv44Gn44Gf44KJ?=
=?utf-8?b?44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBpc3QgZGFzIE51bnN0dWNr?=
=?utf-8?b?IGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWloZXJodW5kIGRhcyBPZGVyIGRp?=
=?utf-8?b?ZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI3jgajoqIDjgaPjgabjgYTjgb7jgZk=?=
=?utf-8?b?44CC?=
""")
eq(h.encode(), """\
=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerd?=
=?iso-8859-1?q?erband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndi?=
=?iso-8859-1?q?schen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Kling?=
=?iso-8859-1?q?en_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_met?=
=?iso-8859-2?q?ropole_se_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?=
=?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE?=
=?utf-8?b?44G+44Gb44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB?=
=?utf-8?b?44GC44Go44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CM?=
=?utf-8?q?Wenn_ist_das_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das?=
=?utf-8?b?IE9kZXIgZGllIEZsaXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBow==?=
=?utf-8?b?44Gm44GE44G+44GZ44CC?=""")
eq(h.encode(maxlinelen=76), """\
=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerde?=
=?iso-8859-1?q?rband_komfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndis?=
=?iso-8859-1?q?chen_Wandgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klinge?=
=?iso-8859-1?q?n_bef=F6rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se?=
=?iso-8859-2?q?_hroutily_pod_tlakem_jejich_d=F9vtipu=2E=2E_?=
=?utf-8?b?5q2j56K644Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb?=
=?utf-8?b?44KT44CC5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go?=
=?utf-8?b?44Gv44Gn44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBp?=
=?utf-8?b?c3QgZGFzIE51bnN0dWNrIGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWlo?=
=?utf-8?b?ZXJodW5kIGRhcyBPZGVyIGRpZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI0=?=
=?utf-8?b?44Go6KiA44Gj44Gm44GE44G+44GZ44CC?=""")
def test_long_header_encode(self):
eq = self.ndiffAssertEqual
@ -674,9 +674,14 @@ Test""")
def test_no_split_long_header(self):
eq = self.ndiffAssertEqual
hstr = 'References: ' + 'x' * 80
h = Header(hstr, continuation_ws='\t')
h = Header(hstr)
# These come on two lines because Headers are really field value
# classes and don't really know about their field names.
eq(h.encode(), """\
References: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""")
References:
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx""")
h = Header('x' * 80)
eq(h.encode(), 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
def test_splitting_multiple_long_lines(self):
eq = self.ndiffAssertEqual
@ -722,10 +727,17 @@ from modemcable093.139-201-24.que.mc.videotron.ca ([24.201.139.93]
h = Header('Britische Regierung gibt', 'iso-8859-1',
header_name='Subject')
h.append('gr\xfcnes Licht f\xfcr Offshore-Windkraftprojekte')
eq(h.encode(maxlinelen=76), """\
=?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offs?=
=?iso-8859-1?q?hore-Windkraftprojekte?=""")
msg['Subject'] = h
eq(msg.as_string(), """\
Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr?=
=?iso-8859-1?q?Offshore-Windkraftprojekte?=
eq(msg.as_string(maxheaderlen=76), """\
Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offs?=
=?iso-8859-1?q?hore-Windkraftprojekte?=
""")
eq(msg.as_string(maxheaderlen=0), """\
Subject: =?iso-8859-1?q?Britische_Regierung_gibt_gr=FCnes_Licht_f=FCr_Offshore-Windkraftprojekte?=
""")
@ -748,10 +760,10 @@ Reply-To: Britische Regierung gibt gr\xfcnes Licht f\xfcr Offshore-Windkraftproj
msg = Message()
msg['To'] = to
eq(msg.as_string(maxheaderlen=78), '''\
To: "Someone Test #A" <someone@eecs.umich.edu>, <someone@eecs.umich.edu>,
To: "Someone Test #A" <someone@eecs.umich.edu>,<someone@eecs.umich.edu>,
\t"Someone Test #B" <someone@umich.edu>,
\t"Someone Test #C" <someone@eecs.umich.edu>,
\t"Someone Test #D" <someone@eecs.umich.edu>
"Someone Test #C" <someone@eecs.umich.edu>,
"Someone Test #D" <someone@eecs.umich.edu>
''')
@ -760,7 +772,7 @@ To: "Someone Test #A" <someone@eecs.umich.edu>, <someone@eecs.umich.edu>,
s = 'This is an example of string which has almost the limit of header length.'
h = Header(s)
h.append('Add another line.')
eq(h.encode(), """\
eq(h.encode(maxlinelen=76), """\
This is an example of string which has almost the limit of header length.
Add another line.""")
@ -775,14 +787,17 @@ This is an example of string which has almost the limit of header length.
def test_long_field_name(self):
eq = self.ndiffAssertEqual
fn = 'X-Very-Very-Very-Long-Header-Name'
gs = "Die Mieter treten hier ein werden mit einem Foerderband komfortabel den Korridor entlang, an s\xfcdl\xfcndischen Wandgem\xe4lden vorbei, gegen die rotierenden Klingen bef\xf6rdert. "
gs = ('Die Mieter treten hier ein werden mit einem Foerderband '
'komfortabel den Korridor entlang, an s\xfcdl\xfcndischen '
'Wandgem\xe4lden vorbei, gegen die rotierenden Klingen '
'bef\xf6rdert. ')
h = Header(gs, 'iso-8859-1', header_name=fn)
# BAW: this seems broken because the first line is too long
eq(h.encode(), """\
=?iso-8859-1?q?Die_Mieter_treten_hier_?=
=?iso-8859-1?q?ein_werden_mit_einem_Foerderband_komfortabel_den_Korridor_?=
=?iso-8859-1?q?entlang=2C_an_s=FCdl=FCndischen_Wandgem=E4lden_vorbei=2C_g?=
=?iso-8859-1?q?egen_die_rotierenden_Klingen_bef=F6rdert=2E_?=""")
eq(h.encode(maxlinelen=76), """\
=?iso-8859-1?q?Die_Mieter_treten_hier_e?=
=?iso-8859-1?q?in_werden_mit_einem_Foerderband_komfortabel_den_Korridor_e?=
=?iso-8859-1?q?ntlang=2C_an_s=FCdl=FCndischen_Wandgem=E4lden_vorbei=2C_ge?=
=?iso-8859-1?q?gen_die_rotierenden_Klingen_bef=F6rdert=2E_?=""")
def test_long_received_header(self):
h = ('from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) '
@ -811,9 +826,9 @@ Received-2: from FOO.TLD (vizworld.acl.foo.tld [123.452.678.9]) by
msg['Received-2'] = h
self.ndiffAssertEqual(msg.as_string(maxheaderlen=78), """\
Received-1: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de>
\t(David Bremner's message of "Thu, 6 Mar 2003 13:58:21 +0100")
(David Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\")
Received-2: <15975.17901.207240.414604@sgigritzmann1.mathematik.tu-muenchen.de>
\t(David Bremner's message of "Thu, 6 Mar 2003 13:58:21 +0100")
(David Bremner's message of \"Thu, 6 Mar 2003 13:58:21 +0100\")
""")
@ -837,12 +852,12 @@ Face-2: iVBORw0KGgoAAAANSUhEUgAAADAAAAAwBAMAAAClLOS0AAAAGFBMVEUAAAAkHiJeRUIcGBi9
eq = self.ndiffAssertEqual
m = ('Received: from siimage.com '
'([172.25.1.3]) by zima.siliconimage.com with '
'Microsoft SMTPSVC(5.0.2195.4905);'
'\tWed, 16 Oct 2002 07:41:11 -0700')
'Microsoft SMTPSVC(5.0.2195.4905); '
'Wed, 16 Oct 2002 07:41:11 -0700')
msg = email.message_from_string(m)
eq(msg.as_string(maxheaderlen=78), '''\
Received: from siimage.com ([172.25.1.3]) by zima.siliconimage.com with
\tMicrosoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700
Microsoft SMTPSVC(5.0.2195.4905); Wed, 16 Oct 2002 07:41:11 -0700
''')
@ -1519,7 +1534,7 @@ counter to RFC 2822, there's no separating newline here
# Test RFC 2047 header encoding and decoding
class TestRFC2047(unittest.TestCase):
class TestRFC2047(TestEmailBase):
def test_rfc2047_multiline(self):
eq = self.assertEqual
s = """Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz
@ -1533,9 +1548,9 @@ class TestRFC2047(unittest.TestCase):
header = make_header(dh)
eq(str(header),
'Re: r\xe4ksm\xf6rg\xe5s baz foo bar r\xe4ksm\xf6rg\xe5s')
eq(header.encode(),
"""Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz foo bar
=?mac-iceland?q?r=8Aksm=9Arg=8Cs?=""")
self.ndiffAssertEqual(header.encode(), """\
Re: =?mac-iceland?q?r=8Aksm=9Arg=8Cs?= baz foo bar =?mac-iceland?q?r=8Aksm?=
=?mac-iceland?q?=9Arg=8Cs?=""")
def test_whitespace_eater_unicode(self):
eq = self.assertEqual
@ -2185,14 +2200,6 @@ Foo
utils.formataddr(('A Silly; Person', 'person@dom.ain')),
r'"A Silly; Person" <person@dom.ain>')
def test_fix_eols(self):
eq = self.assertEqual
eq(utils.fix_eols('hello'), 'hello')
eq(utils.fix_eols('hello\n'), 'hello\r\n')
eq(utils.fix_eols('hello\r'), 'hello\r\n')
eq(utils.fix_eols('hello\r\n'), 'hello\r\n')
eq(utils.fix_eols('hello\n\r'), 'hello\r\n\r\n')
def test_charset_richcomparisons(self):
eq = self.assertEqual
ne = self.failIfEqual
@ -2518,8 +2525,8 @@ Here's the message body
class TestBase64(unittest.TestCase):
def test_len(self):
eq = self.assertEqual
eq(base64mime.base64_len('hello'),
len(base64mime.encode('hello', eol='')))
eq(base64mime.header_length('hello'),
len(base64mime.body_encode('hello', eol='')))
for size in range(15):
if size == 0 : bsize = 0
elif size <= 3 : bsize = 4
@ -2527,22 +2534,24 @@ class TestBase64(unittest.TestCase):
elif size <= 9 : bsize = 12
elif size <= 12: bsize = 16
else : bsize = 20
eq(base64mime.base64_len('x'*size), bsize)
eq(base64mime.header_length('x' * size), bsize)
def test_decode(self):
eq = self.assertEqual
eq(base64mime.decode(''), b'')
eq(base64mime.decode(''), '')
eq(base64mime.decode('aGVsbG8='), b'hello')
eq(base64mime.decode('aGVsbG8=', 'X'), b'hello')
eq(base64mime.decode('aGVsbG8NCndvcmxk\n', 'X'), b'helloXworld')
def test_encode(self):
eq = self.assertEqual
eq(base64mime.encode(''), '')
eq(base64mime.encode('hello'), 'aGVsbG8=\n')
eq(base64mime.body_encode(''), '')
eq(base64mime.body_encode('hello'), 'aGVsbG8=\n')
# Test the binary flag
eq(base64mime.encode('hello\n'), 'aGVsbG8K\n')
eq(base64mime.encode('hello\n', 0), 'aGVsbG8NCg==\n')
eq(base64mime.body_encode('hello\n'), 'aGVsbG8K\n')
eq(base64mime.body_encode('hello\n', 0), 'aGVsbG8NCg==\n')
# Test the maxlinelen arg
eq(base64mime.encode('xxxx ' * 20, maxlinelen=40), """\
eq(base64mime.body_encode('xxxx ' * 20, maxlinelen=40), """\
eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg
eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg
eHh4eCB4eHh4IHh4eHggeHh4eCB4eHh4IHh4eHgg
@ -2560,26 +2569,11 @@ eHh4eCB4eHh4IA==\r
eq = self.assertEqual
he = base64mime.header_encode
eq(he('hello'), '=?iso-8859-1?b?aGVsbG8=?=')
eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8NCndvcmxk?=')
eq(he('hello\r\nworld'), '=?iso-8859-1?b?aGVsbG8NCndvcmxk?=')
eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8Kd29ybGQ=?=')
# Test the charset option
eq(he('hello', charset='iso-8859-2'), '=?iso-8859-2?b?aGVsbG8=?=')
eq(he('hello\nworld'), '=?iso-8859-1?b?aGVsbG8Kd29ybGQ=?=')
# Test the maxlinelen argument
eq(he('xxxx ' * 20, maxlinelen=40), """\
=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHggeHg=?=
=?iso-8859-1?b?eHggeHh4eCB4eHh4IHh4eHg=?=
=?iso-8859-1?b?IHh4eHggeHh4eCB4eHh4IHg=?=
=?iso-8859-1?b?eHh4IHh4eHggeHh4eCB4eHg=?=
=?iso-8859-1?b?eCB4eHh4IHh4eHggeHh4eCA=?=
=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHgg?=""")
# Test the eol argument
eq(he('xxxx ' * 20, maxlinelen=40, eol='\r\n'), """\
=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHggeHg=?=\r
=?iso-8859-1?b?eHggeHh4eCB4eHh4IHh4eHg=?=\r
=?iso-8859-1?b?IHh4eHggeHh4eCB4eHh4IHg=?=\r
=?iso-8859-1?b?eHh4IHh4eHggeHh4eCB4eHg=?=\r
=?iso-8859-1?b?eCB4eHh4IHh4eHggeHh4eCA=?=\r
=?iso-8859-1?b?eHh4eCB4eHh4IHh4eHgg?=""")
@ -2591,7 +2585,7 @@ class TestQuopri(unittest.TestCase):
range(ord('a'), ord('z') + 1),
range(ord('A'), ord('Z') + 1),
range(ord('0'), ord('9') + 1),
(c for c in b'!*+-/ ')))
(c for c in b'!*+-/')))
# Set of characters (as byte integers) that do need to be encoded in
# headers.
self.hnon = [c for c in range(256) if c not in self.hlit]
@ -2606,46 +2600,53 @@ class TestQuopri(unittest.TestCase):
self.bnon = [c for c in range(256) if c not in self.blit]
assert len(self.blit) + len(self.bnon) == 256
def test_header_quopri_check(self):
def test_quopri_header_check(self):
for c in self.hlit:
self.failIf(quoprimime.header_quopri_check(c))
self.failIf(quoprimime.header_check(c),
'Should not be header quopri encoded: %s' % chr(c))
for c in self.hnon:
self.failUnless(quoprimime.header_quopri_check(c))
self.failUnless(quoprimime.header_check(c),
'Should be header quopri encoded: %s' % chr(c))
def test_body_quopri_check(self):
def test_quopri_body_check(self):
for c in self.blit:
self.failIf(quoprimime.body_quopri_check(c))
self.failIf(quoprimime.body_check(c),
'Should not be body quopri encoded: %s' % chr(c))
for c in self.bnon:
self.failUnless(quoprimime.body_quopri_check(c))
self.failUnless(quoprimime.body_check(c),
'Should be body quopri encoded: %s' % chr(c))
def test_header_quopri_len(self):
eq = self.assertEqual
eq(quoprimime.header_quopri_len(b'hello'), 5)
# RFC 2047 chrome is not included in header_quopri_len().
eq(quoprimime.header_length(b'hello'), 5)
# RFC 2047 chrome is not included in header_length().
eq(len(quoprimime.header_encode(b'hello', charset='xxx')),
quoprimime.header_quopri_len(b'hello') +
quoprimime.header_length(b'hello') +
# =?xxx?q?...?= means 10 extra characters
10)
eq(quoprimime.header_quopri_len(b'h@e@l@l@o@'), 20)
# RFC 2047 chrome is not included in header_quopri_len().
eq(quoprimime.header_length(b'h@e@l@l@o@'), 20)
# RFC 2047 chrome is not included in header_length().
eq(len(quoprimime.header_encode(b'h@e@l@l@o@', charset='xxx')),
quoprimime.header_quopri_len(b'h@e@l@l@o@') +
quoprimime.header_length(b'h@e@l@l@o@') +
# =?xxx?q?...?= means 10 extra characters
10)
for c in self.hlit:
eq(quoprimime.header_quopri_len(bytes([c])), 1,
eq(quoprimime.header_length(bytes([c])), 1,
'expected length 1 for %r' % chr(c))
for c in self.hnon:
eq(quoprimime.header_quopri_len(bytes([c])), 3,
# Space is special; it's encoded to _
if c == ord(' '):
continue
eq(quoprimime.header_length(bytes([c])), 3,
'expected length 3 for %r' % chr(c))
eq(quoprimime.header_length(b' '), 1)
def test_body_quopri_len(self):
eq = self.assertEqual
bql = quoprimime.body_quopri_len
for c in self.blit:
eq(bql(c), 1)
eq(quoprimime.body_length(bytes([c])), 1)
for c in self.bnon:
eq(bql(c), 3)
eq(quoprimime.body_length(bytes([c])), 3)
def test_quote_unquote_idempotent(self):
for x in range(256):
@ -2670,22 +2671,23 @@ class TestQuopri(unittest.TestCase):
def test_encode(self):
eq = self.assertEqual
eq(quoprimime.encode(''), '')
eq(quoprimime.encode('hello'), 'hello')
eq(quoprimime.body_encode(''), '')
eq(quoprimime.body_encode('hello'), 'hello')
# Test the binary flag
eq(quoprimime.encode('hello\r\nworld'), 'hello\nworld')
eq(quoprimime.encode('hello\r\nworld', 0), 'hello\nworld')
eq(quoprimime.body_encode('hello\r\nworld'), 'hello\nworld')
eq(quoprimime.body_encode('hello\r\nworld', 0), 'hello\nworld')
# Test the maxlinelen arg
eq(quoprimime.encode('xxxx ' * 20, maxlinelen=40), """\
eq(quoprimime.body_encode('xxxx ' * 20, maxlinelen=40), """\
xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx=
xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxx=
x xxxx xxxx xxxx xxxx=20""")
# Test the eol argument
eq(quoprimime.encode('xxxx ' * 20, maxlinelen=40, eol='\r\n'), """\
eq(quoprimime.body_encode('xxxx ' * 20, maxlinelen=40, eol='\r\n'),
"""\
xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxxx=\r
xxxx xxxx xxxx xxxx xxxx xxxx xxxx xxx=\r
x xxxx xxxx xxxx xxxx=20""")
eq(quoprimime.encode("""\
eq(quoprimime.body_encode("""\
one line
two line"""), """\
@ -2704,17 +2706,16 @@ class TestCharset(unittest.TestCase):
except KeyError:
pass
def test_idempotent(self):
def test_codec_encodeable(self):
eq = self.assertEqual
# Make sure us-ascii = no Unicode conversion
c = Charset('us-ascii')
s = 'Hello World!'
sp = c.to_splittable(s)
eq(s, c.from_splittable(sp))
# test 8-bit idempotency with us-ascii
eq(c.header_encode('Hello World!'), 'Hello World!')
# Test 8-bit idempotency with us-ascii
s = '\xa4\xa2\xa4\xa4\xa4\xa6\xa4\xa8\xa4\xaa'
sp = c.to_splittable(s)
eq(s, c.from_splittable(sp))
self.assertRaises(UnicodeError, c.header_encode, s)
c = Charset('utf-8')
eq(c.header_encode(s), '=?utf-8?b?wqTCosKkwqTCpMKmwqTCqMKkwqo=?=')
def test_body_encode(self):
eq = self.assertEqual
@ -2801,43 +2802,46 @@ class TestHeader(TestEmailBase):
h = Header(g_head, g)
h.append(cz_head, cz)
h.append(utf8_head, utf8)
enc = h.encode()
enc = h.encode(maxlinelen=76)
eq(enc, """\
=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderband_ko?=
=?iso-8859-1?q?mfortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen_Wan?=
=?iso-8859-1?q?dgem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef=F6?=
=?iso-8859-1?q?rdert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hroutily?=
=?iso-8859-1?q?Die_Mieter_treten_hier_ein_werden_mit_einem_Foerderband_kom?=
=?iso-8859-1?q?fortabel_den_Korridor_entlang=2C_an_s=FCdl=FCndischen_Wand?=
=?iso-8859-1?q?gem=E4lden_vorbei=2C_gegen_die_rotierenden_Klingen_bef=F6r?=
=?iso-8859-1?q?dert=2E_?= =?iso-8859-2?q?Finan=E8ni_metropole_se_hroutily?=
=?iso-8859-2?q?_pod_tlakem_jejich_d=F9vtipu=2E=2E_?= =?utf-8?b?5q2j56K6?=
=?utf-8?b?44Gr6KiA44GG44Go57+76Kiz44Gv44GV44KM44Gm44GE44G+44Gb44KT44CC?=
=?utf-8?b?5LiA6YOo44Gv44OJ44Kk44OE6Kqe44Gn44GZ44GM44CB44GC44Go44Gv44Gn?=
=?utf-8?b?44Gf44KJ44KB44Gn44GZ44CC5a6f6Zqb44Gr44Gv44CMV2VubiBpc3QgZGFz?=
=?utf-8?q?_Nunstuck_git_und_Slotermeyer=3F_Ja!_Beiherhund_das_Oder_die_Fl?=
=?utf-8?b?aXBwZXJ3YWxkdCBnZXJzcHV0LuOAjeOBqOiogOOBo+OBpuOBhOOBvuOBmQ==?=
=?utf-8?b?44CC?=""")
eq(decode_header(enc),
[(g_head, "iso-8859-1"), (cz_head, "iso-8859-2"),
(utf8_head, "utf-8")])
=?utf-8?b?IE51bnN0dWNrIGdpdCB1bmQgU2xvdGVybWV5ZXI/IEphISBCZWloZXJodW5k?=
=?utf-8?b?IGRhcyBPZGVyIGRpZSBGbGlwcGVyd2FsZHQgZ2Vyc3B1dC7jgI3jgajoqIA=?=
=?utf-8?b?44Gj44Gm44GE44G+44GZ44CC?=""")
decoded = decode_header(enc)
eq(len(decoded), 3)
eq(decoded[0], (g_head, 'iso-8859-1'))
eq(decoded[1], (cz_head, 'iso-8859-2'))
eq(decoded[2], (utf8_head.encode('utf-8'), 'utf-8'))
ustr = str(h)
eq(ustr.encode('utf-8'),
'Die Mieter treten hier ein werden mit einem Foerderband '
'komfortabel den Korridor entlang, an s\xc3\xbcdl\xc3\xbcndischen '
'Wandgem\xc3\xa4lden vorbei, gegen die rotierenden Klingen '
'bef\xc3\xb6rdert. Finan\xc4\x8dni metropole se hroutily pod '
'tlakem jejich d\xc5\xafvtipu.. \xe6\xad\xa3\xe7\xa2\xba\xe3\x81'
'\xab\xe8\xa8\x80\xe3\x81\x86\xe3\x81\xa8\xe7\xbf\xbb\xe8\xa8\xb3'
'\xe3\x81\xaf\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x84\xe3'
'\x81\xbe\xe3\x81\x9b\xe3\x82\x93\xe3\x80\x82\xe4\xb8\x80\xe9\x83'
'\xa8\xe3\x81\xaf\xe3\x83\x89\xe3\x82\xa4\xe3\x83\x84\xe8\xaa\x9e'
'\xe3\x81\xa7\xe3\x81\x99\xe3\x81\x8c\xe3\x80\x81\xe3\x81\x82\xe3'
'\x81\xa8\xe3\x81\xaf\xe3\x81\xa7\xe3\x81\x9f\xe3\x82\x89\xe3\x82'
'\x81\xe3\x81\xa7\xe3\x81\x99\xe3\x80\x82\xe5\xae\x9f\xe9\x9a\x9b'
'\xe3\x81\xab\xe3\x81\xaf\xe3\x80\x8cWenn ist das Nunstuck git '
'und Slotermeyer? Ja! Beiherhund das Oder die Flipperwaldt '
'gersput.\xe3\x80\x8d\xe3\x81\xa8\xe8\xa8\x80\xe3\x81\xa3\xe3\x81'
'\xa6\xe3\x81\x84\xe3\x81\xbe\xe3\x81\x99\xe3\x80\x82')
eq(ustr,
(b'Die Mieter treten hier ein werden mit einem Foerderband '
b'komfortabel den Korridor entlang, an s\xc3\xbcdl\xc3\xbcndischen '
b'Wandgem\xc3\xa4lden vorbei, gegen die rotierenden Klingen '
b'bef\xc3\xb6rdert. Finan\xc4\x8dni metropole se hroutily pod '
b'tlakem jejich d\xc5\xafvtipu.. \xe6\xad\xa3\xe7\xa2\xba\xe3\x81'
b'\xab\xe8\xa8\x80\xe3\x81\x86\xe3\x81\xa8\xe7\xbf\xbb\xe8\xa8\xb3'
b'\xe3\x81\xaf\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xa6\xe3\x81\x84\xe3'
b'\x81\xbe\xe3\x81\x9b\xe3\x82\x93\xe3\x80\x82\xe4\xb8\x80\xe9\x83'
b'\xa8\xe3\x81\xaf\xe3\x83\x89\xe3\x82\xa4\xe3\x83\x84\xe8\xaa\x9e'
b'\xe3\x81\xa7\xe3\x81\x99\xe3\x81\x8c\xe3\x80\x81\xe3\x81\x82\xe3'
b'\x81\xa8\xe3\x81\xaf\xe3\x81\xa7\xe3\x81\x9f\xe3\x82\x89\xe3\x82'
b'\x81\xe3\x81\xa7\xe3\x81\x99\xe3\x80\x82\xe5\xae\x9f\xe9\x9a\x9b'
b'\xe3\x81\xab\xe3\x81\xaf\xe3\x80\x8cWenn ist das Nunstuck git '
b'und Slotermeyer? Ja! Beiherhund das Oder die Flipperwaldt '
b'gersput.\xe3\x80\x8d\xe3\x81\xa8\xe8\xa8\x80\xe3\x81\xa3\xe3\x81'
b'\xa6\xe3\x81\x84\xe3\x81\xbe\xe3\x81\x99\xe3\x80\x82'
).decode('utf-8'))
# Test make_header()
newh = make_header(decode_header(enc))
eq(newh, enc)
eq(newh, h)
def test_empty_header_encode(self):
h = Header()
@ -2848,7 +2852,7 @@ class TestHeader(TestEmailBase):
h = Header()
eq(h, '')
h.append('foo', Charset('iso-8859-1'))
eq(h, '=?iso-8859-1?q?foo?=')
eq(h, 'foo')
def test_explicit_maxlinelen(self):
eq = self.ndiffAssertEqual
@ -2869,39 +2873,128 @@ A very long line that must get split to something other than at the
eq(h.encode(), hstr)
eq(str(h), hstr)
def test_long_splittables_with_trailing_spaces(self):
def test_quopri_splittable(self):
eq = self.ndiffAssertEqual
h = Header(charset='iso-8859-1', maxlinelen=20)
h.append('xxxx ' * 20)
eq(h.encode(), """\
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx?=
=?iso-8859-1?q?xxxx_?=""")
x = 'xxxx ' * 20
h.append(x)
s = h.encode()
eq(s, """\
=?iso-8859-1?q?xxx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_x?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?x_?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?xx?=
=?iso-8859-1?q?_?=""")
eq(x, str(make_header(decode_header(s))))
h = Header(charset='iso-8859-1', maxlinelen=40)
h.append('xxxx ' * 20)
eq(h.encode(), """\
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?=
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?=
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?=
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx?=
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_?=""")
s = h.encode()
eq(s, """\
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_xxx?=
=?iso-8859-1?q?x_xxxx_xxxx_xxxx_xxxx_?=
=?iso-8859-1?q?xxxx_xxxx_xxxx_xxxx_xx?=
=?iso-8859-1?q?xx_xxxx_xxxx_xxxx_xxxx?=
=?iso-8859-1?q?_xxxx_xxxx_?=""")
eq(x, str(make_header(decode_header(s))))
def test_base64_splittable(self):
eq = self.ndiffAssertEqual
h = Header(charset='koi8-r', maxlinelen=20)
x = 'xxxx ' * 20
h.append(x)
s = h.encode()
eq(s, """\
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IHh4?=
=?koi8-r?b?eHgg?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?eCB4?=
=?koi8-r?b?eHh4?=
=?koi8-r?b?IA==?=""")
eq(x, str(make_header(decode_header(s))))
h = Header(charset='koi8-r', maxlinelen=40)
h.append(x)
s = h.encode()
eq(s, """\
=?koi8-r?b?eHh4eCB4eHh4IHh4eHggeHh4?=
=?koi8-r?b?eCB4eHh4IHh4eHggeHh4eCB4?=
=?koi8-r?b?eHh4IHh4eHggeHh4eCB4eHh4?=
=?koi8-r?b?IHh4eHggeHh4eCB4eHh4IHh4?=
=?koi8-r?b?eHggeHh4eCB4eHh4IHh4eHgg?=
=?koi8-r?b?eHh4eCB4eHh4IA==?=""")
eq(x, str(make_header(decode_header(s))))
def test_us_ascii_header(self):
eq = self.assertEqual
@ -2915,7 +3008,7 @@ A very long line that must get split to something other than at the
eq = self.assertEqual
h = Header()
h.append('hello', 'iso-8859-1')
eq(h, '=?iso-8859-1?q?hello?=')
eq(h, 'hello')
## def test_unicode_error(self):
## raises = self.assertRaises

View File

@ -70,16 +70,6 @@ def _bdecode(s):
return value
def fix_eols(s):
"""Replace all line-ending characters with \r\n."""
# Fix newlines with no preceding carriage return
s = re.sub(r'(?<!\r)\n', CRLF, s)
# Fix carriage returns with no following newline
s = re.sub(r'\r(?!\n)', CRLF, s)
return s
def formataddr(pair):
"""The inverse of parseaddr(), this takes a 2-tuple of the form
@ -317,7 +307,7 @@ def collapse_rfc2231_value(value, errors='replace',
# object. We do not want bytes() normal utf-8 decoder, we want a straight
# interpretation of the string as character bytes.
charset, language, text = value
rawbytes = bytes(ord(c) for c in text)
rawbytes = bytes(text, 'raw-unicode-escape')
try:
return str(rawbytes, charset, errors)
except LookupError: