import os import time import stat import socket import email import email.Message import rfc822 import re import StringIO from test import test_support import unittest import mailbox import glob try: import fcntl except ImportError: pass class TestBase(unittest.TestCase): def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message self.assert_(isinstance(msg, email.Message.Message)) self.assert_(isinstance(msg, mailbox.Message)) for key, value in _sample_headers.iteritems(): self.assert_(value in msg.get_all(key)) self.assert_(msg.is_multipart()) self.assert_(len(msg.get_payload()) == len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) self.assert_(isinstance(part, email.Message.Message)) self.assert_(not isinstance(part, mailbox.Message)) self.assert_(part.get_payload() == payload) def _delete_recursively(self, target): # Delete a file or delete a directory recursively if os.path.isdir(target): for path, dirs, files in os.walk(target, topdown=False): for name in files: os.remove(os.path.join(path, name)) for name in dirs: os.rmdir(os.path.join(path, name)) os.rmdir(target) elif os.path.exists(target): os.remove(target) class TestMailbox(TestBase): _factory = None # Overridden by subclasses to reuse tests _template = 'From: foo\n\n%s' def setUp(self): self._path = test_support.TESTFN self._box = self._factory(self._path) def tearDown(self): self._box.close() self._delete_recursively(self._path) def test_add(self): # Add copies of a sample message keys = [] keys.append(self._box.add(self._template % 0)) self.assert_(len(self._box) == 1) keys.append(self._box.add(mailbox.Message(_sample_message))) self.assert_(len(self._box) == 2) keys.append(self._box.add(email.message_from_string(_sample_message))) self.assert_(len(self._box) == 3) keys.append(self._box.add(StringIO.StringIO(_sample_message))) self.assert_(len(self._box) == 4) keys.append(self._box.add(_sample_message)) self.assert_(len(self._box) == 5) self.assert_(self._box.get_string(keys[0]) == self._template % 0) for i in (1, 2, 3, 4): self._check_sample(self._box[keys[i]]) def test_remove(self): # Remove messages using remove() self._test_remove_or_delitem(self._box.remove) def test_delitem(self): # Remove messages using __delitem__() self._test_remove_or_delitem(self._box.__delitem__) def _test_remove_or_delitem(self, method): # (Used by test_remove() and test_delitem().) key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assert_(len(self._box) == 2) method(key0) l = len(self._box) self.assert_(l == 1, "actual l: %s" % l) self.assertRaises(KeyError, lambda: self._box[key0]) self.assertRaises(KeyError, lambda: method(key0)) self.assert_(self._box.get_string(key1) == self._template % 1) key2 = self._box.add(self._template % 2) self.assert_(len(self._box) == 2) method(key2) l = len(self._box) self.assert_(l == 1, "actual l: %s" % l) self.assertRaises(KeyError, lambda: self._box[key2]) self.assertRaises(KeyError, lambda: method(key2)) self.assert_(self._box.get_string(key1) == self._template % 1) method(key1) self.assert_(len(self._box) == 0) self.assertRaises(KeyError, lambda: self._box[key1]) self.assertRaises(KeyError, lambda: method(key1)) def test_discard(self, repetitions=10): # Discard messages key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assert_(len(self._box) == 2) self._box.discard(key0) self.assert_(len(self._box) == 1) self.assertRaises(KeyError, lambda: self._box[key0]) self._box.discard(key0) self.assert_(len(self._box) == 1) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) msg = self._box.get(key0) self.assert_(msg['from'] == 'foo') self.assert_(msg.get_payload() == '0') self.assert_(self._box.get('foo') is None) self.assert_(self._box.get('foo', False) is False) self._box.close() self._box = self._factory(self._path, factory=rfc822.Message) key1 = self._box.add(self._template % 1) msg = self._box.get(key1) self.assert_(msg['from'] == 'foo') self.assert_(msg.fp.read() == '1') def test_getitem(self): # Retrieve message using __getitem__() key0 = self._box.add(self._template % 0) msg = self._box[key0] self.assert_(msg['from'] == 'foo') self.assert_(msg.get_payload() == '0') self.assertRaises(KeyError, lambda: self._box['foo']) self._box.discard(key0) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get_message(self): # Get Message representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) msg0 = self._box.get_message(key0) self.assert_(isinstance(msg0, mailbox.Message)) self.assert_(msg0['from'] == 'foo') self.assert_(msg0.get_payload() == '0') self._check_sample(self._box.get_message(key1)) def test_get_string(self): # Get string representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assert_(self._box.get_string(key0) == self._template % 0) self.assert_(self._box.get_string(key1) == _sample_message) def test_get_file(self): # Get file representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assert_(self._box.get_file(key0).read().replace(os.linesep, '\n') == self._template % 0) self.assert_(self._box.get_file(key1).read().replace(os.linesep, '\n') == _sample_message) def test_iterkeys(self): # Get keys using iterkeys() self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) def test_keys(self): # Get keys using keys() self._check_iteration(self._box.keys, do_keys=True, do_values=False) def test_itervalues(self): # Get values using itervalues() self._check_iteration(self._box.itervalues, do_keys=False, do_values=True) def test_iter(self): # Get values using __iter__() self._check_iteration(self._box.__iter__, do_keys=False, do_values=True) def test_values(self): # Get values using values() self._check_iteration(self._box.values, do_keys=False, do_values=True) def test_iteritems(self): # Get keys and values using iteritems() self._check_iteration(self._box.iteritems, do_keys=True, do_values=True) def test_items(self): # Get keys and values using items() self._check_iteration(self._box.items, do_keys=True, do_values=True) def _check_iteration(self, method, do_keys, do_values, repetitions=10): for value in method(): self.fail("Not empty") keys, values = [], [] for i in xrange(repetitions): keys.append(self._box.add(self._template % i)) values.append(self._template % i) if do_keys and not do_values: returned_keys = list(method()) elif do_values and not do_keys: returned_values = list(method()) else: returned_keys, returned_values = [], [] for key, value in method(): returned_keys.append(key) returned_values.append(value) if do_keys: self.assert_(len(keys) == len(returned_keys)) self.assert_(set(keys) == set(returned_keys)) if do_values: count = 0 for value in returned_values: self.assert_(value['from'] == 'foo') self.assert_(int(value.get_payload()) < repetitions) count += 1 self.assert_(len(values) == count) def test_has_key(self): # Check existence of keys using has_key() self._test_has_key_or_contains(self._box.has_key) def test_contains(self): # Check existence of keys using __contains__() self._test_has_key_or_contains(self._box.__contains__) def _test_has_key_or_contains(self, method): # (Used by test_has_key() and test_contains().) self.assert_(not method('foo')) key0 = self._box.add(self._template % 0) self.assert_(method(key0)) self.assert_(not method('foo')) key1 = self._box.add(self._template % 1) self.assert_(method(key1)) self.assert_(method(key0)) self.assert_(not method('foo')) self._box.remove(key0) self.assert_(not method(key0)) self.assert_(method(key1)) self.assert_(not method('foo')) self._box.remove(key1) self.assert_(not method(key1)) self.assert_(not method(key0)) self.assert_(not method('foo')) def test_len(self, repetitions=10): # Get message count keys = [] for i in xrange(repetitions): self.assert_(len(self._box) == i) keys.append(self._box.add(self._template % i)) self.assert_(len(self._box) == i + 1) for i in xrange(repetitions): self.assert_(len(self._box) == repetitions - i) self._box.remove(keys[i]) self.assert_(len(self._box) == repetitions - i - 1) def test_set_item(self): # Modify messages using __setitem__() key0 = self._box.add(self._template % 'original 0') self.assert_(self._box.get_string(key0) == \ self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') self.assert_(self._box.get_string(key1) == \ self._template % 'original 1') self._box[key0] = self._template % 'changed 0' self.assert_(self._box.get_string(key0) == \ self._template % 'changed 0') self._box[key1] = self._template % 'changed 1' self.assert_(self._box.get_string(key1) == \ self._template % 'changed 1') self._box[key0] = _sample_message self._check_sample(self._box[key0]) self._box[key1] = self._box[key0] self._check_sample(self._box[key1]) self._box[key0] = self._template % 'original 0' self.assert_(self._box.get_string(key0) == self._template % 'original 0') self._check_sample(self._box[key1]) self.assertRaises(KeyError, lambda: self._box.__setitem__('foo', 'bar')) self.assertRaises(KeyError, lambda: self._box['foo']) self.assert_(len(self._box) == 2) def test_clear(self, iterations=10): # Remove all messages using clear() keys = [] for i in xrange(iterations): self._box.add(self._template % i) for i, key in enumerate(keys): self.assert_(self._box.get_string(key) == self._template % i) self._box.clear() self.assert_(len(self._box) == 0) for i, key in enumerate(keys): self.assertRaises(KeyError, lambda: self._box.get_string(key)) def test_pop(self): # Get and remove a message using pop() key0 = self._box.add(self._template % 0) self.assert_(key0 in self._box) key1 = self._box.add(self._template % 1) self.assert_(key1 in self._box) self.assert_(self._box.pop(key0).get_payload() == '0') self.assert_(key0 not in self._box) self.assert_(key1 in self._box) key2 = self._box.add(self._template % 2) self.assert_(key2 in self._box) self.assert_(self._box.pop(key2).get_payload() == '2') self.assert_(key2 not in self._box) self.assert_(key1 in self._box) self.assert_(self._box.pop(key1).get_payload() == '1') self.assert_(key1 not in self._box) self.assert_(len(self._box) == 0) def test_popitem(self, iterations=10): # Get and remove an arbitrary (key, message) using popitem() keys = [] for i in xrange(10): keys.append(self._box.add(self._template % i)) seen = [] for i in xrange(10): key, msg = self._box.popitem() self.assert_(key in keys) self.assert_(key not in seen) seen.append(key) self.assert_(int(msg.get_payload()) == keys.index(key)) self.assert_(len(self._box) == 0) for key in keys: self.assertRaises(KeyError, lambda: self._box[key]) def test_update(self): # Modify multiple messages using update() key0 = self._box.add(self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') key2 = self._box.add(self._template % 'original 2') self._box.update({key0: self._template % 'changed 0', key2: _sample_message}) self.assert_(len(self._box) == 3) self.assert_(self._box.get_string(key0) == self._template % 'changed 0') self.assert_(self._box.get_string(key1) == self._template % 'original 1') self._check_sample(self._box[key2]) self._box.update([(key2, self._template % 'changed 2'), (key1, self._template % 'changed 1'), (key0, self._template % 'original 0')]) self.assert_(len(self._box) == 3) self.assert_(self._box.get_string(key0) == self._template % 'original 0') self.assert_(self._box.get_string(key1) == self._template % 'changed 1') self.assert_(self._box.get_string(key2) == self._template % 'changed 2') self.assertRaises(KeyError, lambda: self._box.update({'foo': 'bar', key0: self._template % "changed 0"})) self.assert_(len(self._box) == 3) self.assert_(self._box.get_string(key0) == self._template % "changed 0") self.assert_(self._box.get_string(key1) == self._template % "changed 1") self.assert_(self._box.get_string(key2) == self._template % "changed 2") def test_flush(self): # Write changes to disk self._test_flush_or_close(self._box.flush) def test_lock_unlock(self): # Lock and unlock the mailbox self.assert_(not os.path.exists(self._get_lock_path())) self._box.lock() self.assert_(os.path.exists(self._get_lock_path())) self._box.unlock() self.assert_(not os.path.exists(self._get_lock_path())) def test_close(self): # Close mailbox and flush changes to disk self._test_flush_or_close(self._box.close) def _test_flush_or_close(self, method): contents = [self._template % i for i in xrange(3)] self._box.add(contents[0]) self._box.add(contents[1]) self._box.add(contents[2]) method() self._box = self._factory(self._path) keys = self._box.keys() self.assert_(len(keys) == 3) for key in keys: self.assert_(self._box.get_string(key) in contents) def test_dump_message(self): # Write message representations to disk for input in (email.message_from_string(_sample_message), _sample_message, StringIO.StringIO(_sample_message)): output = StringIO.StringIO() self._box._dump_message(input, output) self.assert_(output.getvalue() == _sample_message.replace('\n', os.linesep)) output = StringIO.StringIO() self.assertRaises(TypeError, lambda: self._box._dump_message(None, output)) def _get_lock_path(self): # Return the path of the dot lock file. May be overridden. return self._path + '.lock' class TestMailboxSuperclass(TestBase): def test_notimplemented(self): # Test that all Mailbox methods raise NotImplementedException. box = mailbox.Mailbox('path') self.assertRaises(NotImplementedError, lambda: box.add('')) self.assertRaises(NotImplementedError, lambda: box.remove('')) self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) self.assertRaises(NotImplementedError, lambda: box.discard('')) self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) self.assertRaises(NotImplementedError, lambda: box.iterkeys()) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.itervalues().next()) self.assertRaises(NotImplementedError, lambda: box.__iter__().next()) self.assertRaises(NotImplementedError, lambda: box.values()) self.assertRaises(NotImplementedError, lambda: box.iteritems().next()) self.assertRaises(NotImplementedError, lambda: box.items()) self.assertRaises(NotImplementedError, lambda: box.get('')) self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) self.assertRaises(NotImplementedError, lambda: box.get_message('')) self.assertRaises(NotImplementedError, lambda: box.get_string('')) self.assertRaises(NotImplementedError, lambda: box.get_file('')) self.assertRaises(NotImplementedError, lambda: box.has_key('')) self.assertRaises(NotImplementedError, lambda: box.__contains__('')) self.assertRaises(NotImplementedError, lambda: box.__len__()) self.assertRaises(NotImplementedError, lambda: box.clear()) self.assertRaises(NotImplementedError, lambda: box.pop('')) self.assertRaises(NotImplementedError, lambda: box.popitem()) self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) self.assertRaises(NotImplementedError, lambda: box.flush()) self.assertRaises(NotImplementedError, lambda: box.lock()) self.assertRaises(NotImplementedError, lambda: box.unlock()) self.assertRaises(NotImplementedError, lambda: box.close()) class TestMaildir(TestMailbox): _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) def setUp(self): TestMailbox.setUp(self) if os.name in ('nt', 'os2'): self._box.colon = '!' def test_add_MM(self): # Add a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_info('foo') key = self._box.add(msg) self.assert_(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % (key, self._box.colon)))) def test_get_MM(self): # Get a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_flags('RF') key = self._box.add(msg) msg_returned = self._box.get_message(key) self.assert_(isinstance(msg_returned, mailbox.MaildirMessage)) self.assert_(msg_returned.get_subdir() == 'cur') self.assert_(msg_returned.get_flags() == 'FR') def test_set_MM(self): # Set with a MaildirMessage instance msg0 = mailbox.MaildirMessage(self._template % 0) msg0.set_flags('TP') key = self._box.add(msg0) msg_returned = self._box.get_message(key) self.assert_(msg_returned.get_subdir() == 'new') self.assert_(msg_returned.get_flags() == 'PT') msg1 = mailbox.MaildirMessage(self._template % 1) self._box[key] = msg1 msg_returned = self._box.get_message(key) self.assert_(msg_returned.get_subdir() == 'new') self.assert_(msg_returned.get_flags() == '') self.assert_(msg_returned.get_payload() == '1') msg2 = mailbox.MaildirMessage(self._template % 2) msg2.set_info('2,S') self._box[key] = msg2 self._box[key] = self._template % 3 msg_returned = self._box.get_message(key) self.assert_(msg_returned.get_subdir() == 'new') self.assert_(msg_returned.get_flags() == 'S') self.assert_(msg_returned.get_payload() == '3') def test_initialize_new(self): # Initialize a non-existent mailbox self.tearDown() self._box = mailbox.Maildir(self._path) self._check_basics(factory=rfc822.Message) self._delete_recursively(self._path) self._box = self._factory(self._path, factory=None) self._check_basics() def test_initialize_existing(self): # Initialize an existing mailbox self.tearDown() for subdir in '', 'tmp', 'new', 'cur': os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) self._box = mailbox.Maildir(self._path) self._check_basics(factory=rfc822.Message) self._box = mailbox.Maildir(self._path, factory=None) self._check_basics() def _check_basics(self, factory=None): # (Used by test_open_new() and test_open_existing().) self.assertEqual(self._box._path, os.path.abspath(self._path)) self.assertEqual(self._box._factory, factory) for subdir in '', 'tmp', 'new', 'cur': path = os.path.join(self._path, subdir) mode = os.stat(path)[stat.ST_MODE] self.assert_(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) def test_list_folders(self): # List folders self._box.add_folder('one') self._box.add_folder('two') self._box.add_folder('three') self.assert_(len(self._box.list_folders()) == 3) self.assert_(set(self._box.list_folders()) == set(('one', 'two', 'three'))) def test_get_folder(self): # Open folders self._box.add_folder('foo.bar') folder0 = self._box.get_folder('foo.bar') folder0.add(self._template % 'bar') self.assert_(os.path.isdir(os.path.join(self._path, '.foo.bar'))) folder1 = self._box.get_folder('foo.bar') self.assert_(folder1.get_string(folder1.keys()[0]) == \ self._template % 'bar') def test_add_and_remove_folders(self): # Delete folders self._box.add_folder('one') self._box.add_folder('two') self.assert_(len(self._box.list_folders()) == 2) self.assert_(set(self._box.list_folders()) == set(('one', 'two'))) self._box.remove_folder('one') self.assert_(len(self._box.list_folders()) == 1) self.assert_(set(self._box.list_folders()) == set(('two',))) self._box.add_folder('three') self.assert_(len(self._box.list_folders()) == 2) self.assert_(set(self._box.list_folders()) == set(('two', 'three'))) self._box.remove_folder('three') self.assert_(len(self._box.list_folders()) == 1) self.assert_(set(self._box.list_folders()) == set(('two',))) self._box.remove_folder('two') self.assert_(len(self._box.list_folders()) == 0) self.assert_(self._box.list_folders() == []) def test_clean(self): # Remove old files from 'tmp' foo_path = os.path.join(self._path, 'tmp', 'foo') bar_path = os.path.join(self._path, 'tmp', 'bar') f = open(foo_path, 'w') f.write("@") f.close() f = open(bar_path, 'w') f.write("@") f.close() self._box.clean() self.assert_(os.path.exists(foo_path)) self.assert_(os.path.exists(bar_path)) foo_stat = os.stat(foo_path) os.utime(foo_path, (time.time() - 129600 - 2, foo_stat.st_mtime)) self._box.clean() self.assert_(not os.path.exists(foo_path)) self.assert_(os.path.exists(bar_path)) def test_create_tmp(self, repetitions=10): # Create files in tmp directory hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') pid = os.getpid() pattern = re.compile(r"(?P