Improved tests
This commit is contained in:
parent
9ec3c07f6e
commit
21ddbc19a1
|
@ -26,6 +26,7 @@ import binascii, errno, random, re, socket, subprocess, sys, time, calendar
|
|||
from collections import namedtuple
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from io import DEFAULT_BUFFER_SIZE
|
||||
from typing import Tuple
|
||||
|
||||
try:
|
||||
import ssl
|
||||
|
@ -664,39 +665,43 @@ class IMAP4:
|
|||
typ, dat = self._simple_command(name, directory, pattern)
|
||||
return self._untagged_response(typ, dat, name)
|
||||
|
||||
def move_messages(self, target: str, messages: str) -> None:
|
||||
def move_messages(self, target: str, msgs: str) -> Tuple[str, str]:
|
||||
"""
|
||||
Higher level command to move message inside of one account.
|
||||
Following RFC-6851.
|
||||
|
||||
:param target: name of the folder to move messages into
|
||||
:param messages: UID sequence set (according to
|
||||
:param str target: name of the folder to move messages into
|
||||
:param str messages: UID sequence set (according to
|
||||
https://tools.ietf.org/html/rfc3501#section-9)
|
||||
:return: type, data tuple; type is 'OK' or something else in
|
||||
case of failure.
|
||||
:rtype: tuple [str, str]
|
||||
"""
|
||||
if self._features_available.MOVE:
|
||||
typ, dat = self._simple_command('UID', 'MOVE',
|
||||
messages, target)
|
||||
ok, data = self._untagged_response(typ, dat, 'MOVE')
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot move messages to folder %s' % target)
|
||||
msgs, target)
|
||||
return self._untagged_response(typ, dat, 'MOVE')
|
||||
elif self._features_available.UIDPLUS:
|
||||
ok, data = self.uid('COPY', '%s %s' % (messages, target))
|
||||
ok, data = self.uid('COPY', f'{msgs} {target}')
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot copy messages to folder %s' % target)
|
||||
return ok, f'Cannot copy messages {msgs} to folder {target}'
|
||||
ok, data = self.uid('STORE',
|
||||
r'+FLAGS.SILENT (\DELETED) %s' % messages)
|
||||
f'+FLAGS.SILENT (\\DELETED) {msgs}')
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot delete messages.')
|
||||
ok, data = self.uid('EXPUNGE', messages)
|
||||
return ok, f'Cannot delete messages {msgs}.'
|
||||
ok, data = self.uid('EXPUNGE', msgs)
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot expunge messages.')
|
||||
return ok, f'Cannot expunge messages {msgs}.'
|
||||
return ok, f'Messages {msgs} moved to {target}.'
|
||||
else:
|
||||
ok, data = self.uid('COPY', '%s %s' % (messages, target))
|
||||
ok, data = self.uid('COPY', f'{msgs} {target}')
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot copy messages to folder %s' % target)
|
||||
return ok, f'Cannot copy messages {msgs} to folder {target}'
|
||||
ok, data = self.uid('STORE',
|
||||
r'+FLAGS.SILENT (\DELETED) %s' % messages)
|
||||
f'+FLAGS.SILENT (\\DELETED) {msgs}')
|
||||
if ok != 'OK':
|
||||
raise IOError('Cannot delete messages.')
|
||||
return ok, f'Cannot delete messages {msgs}.'
|
||||
return ok, f'Messages {msgs} moved to {target}.'
|
||||
|
||||
def myrights(self, mailbox):
|
||||
"""Show my ACLs for a mailbox (i.e. the rights that I have on mailbox).
|
||||
|
|
|
@ -558,36 +558,38 @@ class NewIMAPTestsMixin():
|
|||
self._send_tagged(tag, 'OK', 'okay')
|
||||
|
||||
def cmd_CREATE(self, tag, args):
|
||||
print(f'args = {args}')
|
||||
self._send_textline('* CREATE ' + args[0])
|
||||
self._send_tagged(tag, 'OK', 'okay')
|
||||
|
||||
def cmd_MOVE(self, tag, args):
|
||||
self._send_textline('* MOVE ' + args[0])
|
||||
self._send_tagged(tag, 'OK', 'okay')
|
||||
|
||||
def cmd_SELECT(self, tag, args):
|
||||
self.server.is_selected = True
|
||||
self._send_textline('* 2 EXISTS')
|
||||
self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.')
|
||||
|
||||
client, _ = self._setup(MoveServer)
|
||||
typ, data = client.login('user', 'pass')
|
||||
self.assertEqual(len(client._features_available), 2)
|
||||
typ, data = client.create('source')
|
||||
self.assertEqual(typ, 'OK')
|
||||
typ, data = client.create('target')
|
||||
self.assertEqual(typ, 'OK')
|
||||
typ, data = client.select('source')
|
||||
|
||||
msg = EmailMessage()
|
||||
msg.set_content('This is a testing message')
|
||||
msg['Subject'] = 'Test message'
|
||||
msg['From'] = 'test@example.com'
|
||||
msg['To'] = 'testee@example.com'
|
||||
# typ, data = client.append('source', None, None, msg.as_bytes())
|
||||
# self.assertEqual(typ, 'OK')
|
||||
# typ, data = client.select('source')
|
||||
# self.assertEqual(typ, 'OK')
|
||||
# self.assertEqual(int(data[0]), 1)
|
||||
# typ, data = client.search(None, 'source')
|
||||
# self.assertEqual(typ, 'OK')
|
||||
# msg_id =
|
||||
# typ, data = client.move_messages()
|
||||
# self.assertEqual(typ, 'OK')
|
||||
# typ, data = client.search(None, 'target')
|
||||
# self.assertEqual(typ, 'OK')
|
||||
# self.assertEqual(int(data[0]), 1)
|
||||
msg.set_content('This is a testing message')
|
||||
print(f'=========================\nmsg:\n{msg}=============')
|
||||
typ, data = client.append('source', None, None, msg.as_bytes())
|
||||
|
||||
typ, data = client.search(None, 'ALL')
|
||||
typ, data = client.move_messages('target', data[0])
|
||||
self.assertEqual(typ, 'OK')
|
||||
typ, data = client.search(None, 'target', 'ALL')
|
||||
self.assertEqual(typ, 'OK')
|
||||
self.assertEqual(int(data[0]), 1)
|
||||
|
||||
|
||||
class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase):
|
||||
|
|
Loading…
Reference in New Issue