#15222: Insert blank line after each message in mbox mailboxes
This commit is contained in:
parent
468091954f
commit
f39884bb5a
|
@ -208,6 +208,9 @@ class Mailbox:
|
|||
raise ValueError("String input must be ASCII-only; "
|
||||
"use bytes or a Message instead")
|
||||
|
||||
# Whether each message must end in a newline
|
||||
_append_newline = False
|
||||
|
||||
def _dump_message(self, message, target, mangle_from_=False):
|
||||
# This assumes the target file is open in binary mode.
|
||||
"""Dump message contents to target file."""
|
||||
|
@ -219,6 +222,9 @@ class Mailbox:
|
|||
data = buffer.read()
|
||||
data = data.replace(b'\n', linesep)
|
||||
target.write(data)
|
||||
if self._append_newline and not data.endswith(linesep):
|
||||
# Make sure the message ends with a newline
|
||||
target.write(linesep)
|
||||
elif isinstance(message, (str, bytes, io.StringIO)):
|
||||
if isinstance(message, io.StringIO):
|
||||
warnings.warn("Use of StringIO input is deprecated, "
|
||||
|
@ -230,11 +236,15 @@ class Mailbox:
|
|||
message = message.replace(b'\nFrom ', b'\n>From ')
|
||||
message = message.replace(b'\n', linesep)
|
||||
target.write(message)
|
||||
if self._append_newline and not message.endswith(linesep):
|
||||
# Make sure the message ends with a newline
|
||||
target.write(linesep)
|
||||
elif hasattr(message, 'read'):
|
||||
if hasattr(message, 'buffer'):
|
||||
warnings.warn("Use of text mode files is deprecated, "
|
||||
"use a binary mode file instead", DeprecationWarning, 3)
|
||||
message = message.buffer
|
||||
lastline = None
|
||||
while True:
|
||||
line = message.readline()
|
||||
# Universal newline support.
|
||||
|
@ -248,6 +258,10 @@ class Mailbox:
|
|||
line = b'>From ' + line[5:]
|
||||
line = line.replace(b'\n', linesep)
|
||||
target.write(line)
|
||||
lastline = line
|
||||
if self._append_newline and lastline and not lastline.endswith(linesep):
|
||||
# Make sure the message ends with a newline
|
||||
target.write(linesep)
|
||||
else:
|
||||
raise TypeError('Invalid message type: %s' % type(message))
|
||||
|
||||
|
@ -833,30 +847,48 @@ class mbox(_mboxMMDF):
|
|||
|
||||
_mangle_from_ = True
|
||||
|
||||
# All messages must end in a newline character, and
|
||||
# _post_message_hooks outputs an empty line between messages.
|
||||
_append_newline = True
|
||||
|
||||
def __init__(self, path, factory=None, create=True):
|
||||
"""Initialize an mbox mailbox."""
|
||||
self._message_factory = mboxMessage
|
||||
_mboxMMDF.__init__(self, path, factory, create)
|
||||
|
||||
def _pre_message_hook(self, f):
|
||||
"""Called before writing each message to file f."""
|
||||
if f.tell() != 0:
|
||||
f.write(linesep)
|
||||
def _post_message_hook(self, f):
|
||||
"""Called after writing each message to file f."""
|
||||
f.write(linesep)
|
||||
|
||||
def _generate_toc(self):
|
||||
"""Generate key-to-(start, stop) table of contents."""
|
||||
starts, stops = [], []
|
||||
last_was_empty = False
|
||||
self._file.seek(0)
|
||||
while True:
|
||||
line_pos = self._file.tell()
|
||||
line = self._file.readline()
|
||||
if line.startswith(b'From '):
|
||||
if len(stops) < len(starts):
|
||||
stops.append(line_pos - len(linesep))
|
||||
if last_was_empty:
|
||||
stops.append(line_pos - len(linesep))
|
||||
else:
|
||||
# The last line before the "From " line wasn't
|
||||
# blank, but we consider it a start of a
|
||||
# message anyway.
|
||||
stops.append(line_pos)
|
||||
starts.append(line_pos)
|
||||
last_was_empty = False
|
||||
elif not line:
|
||||
stops.append(line_pos)
|
||||
if last_was_empty:
|
||||
stops.append(line_pos - len(linesep))
|
||||
else:
|
||||
stops.append(line_pos)
|
||||
break
|
||||
elif line == linesep:
|
||||
last_was_empty = True
|
||||
else:
|
||||
last_was_empty = False
|
||||
self._toc = dict(enumerate(zip(starts, stops)))
|
||||
self._next_key = len(self._toc)
|
||||
self._file_length = self._file.tell()
|
||||
|
|
|
@ -1113,6 +1113,29 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase):
|
|||
perms = st.st_mode
|
||||
self.assertFalse((perms & 0o111)) # Execute bits should all be off.
|
||||
|
||||
def test_terminating_newline(self):
|
||||
message = email.message.Message()
|
||||
message['From'] = 'john@example.com'
|
||||
message.set_payload('No newline at the end')
|
||||
i = self._box.add(message)
|
||||
|
||||
# A newline should have been appended to the payload
|
||||
message = self._box.get(i)
|
||||
self.assertEqual(message.get_payload(), 'No newline at the end\n')
|
||||
|
||||
def test_message_separator(self):
|
||||
# Check there's always a single blank line after each message
|
||||
self._box.add('From: foo\n\n0') # No newline at the end
|
||||
with open(self._path) as f:
|
||||
data = f.read()
|
||||
self.assertEqual(data[-3:], '0\n\n')
|
||||
|
||||
self._box.add('From: foo\n\n0\n') # Newline at the end
|
||||
with open(self._path) as f:
|
||||
data = f.read()
|
||||
self.assertEqual(data[-3:], '0\n\n')
|
||||
|
||||
|
||||
class TestMMDF(_TestMboxMMDF, unittest.TestCase):
|
||||
|
||||
_factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
|
||||
|
|
Loading…
Reference in New Issue