From d74900ebb5a22b387b49684990da1925e1d6bdc9 Mon Sep 17 00:00:00 2001 From: Josiah Carlson Date: Mon, 7 Jul 2008 04:15:08 +0000 Subject: [PATCH] Committing Py3k version of changelist 64080 and 64257, along with updated tests for smtpd, which required updating with the new semantics. --- Doc/library/asynchat.rst | 12 +++ Doc/library/asyncore.rst | 15 ++++ Lib/asynchat.py | 166 +++++++++++++++++++++--------------- Lib/asyncore.py | 173 ++++++++++++++++++++++++++------------ Lib/smtpd.py | 8 +- Lib/test/test_asynchat.py | 14 +-- Lib/test/test_asyncore.py | 7 +- Lib/test/test_smtplib.py | 8 ++ 8 files changed, 269 insertions(+), 134 deletions(-) diff --git a/Doc/library/asynchat.rst b/Doc/library/asynchat.rst index 6f154417f59..ff6432e3884 100644 --- a/Doc/library/asynchat.rst +++ b/Doc/library/asynchat.rst @@ -81,6 +81,12 @@ connection requests. :exc:`NotImplementedError` exception. +.. method:: async_chat._collect_incoming_data(data) + + Sample implementation of a data collection rutine to be used in conjunction + with :meth:`_get_data` in a user-specified :meth:`found_terminator`. + + .. method:: async_chat.discard_buffers() In emergencies this method will discard any data held in the input and/or @@ -95,6 +101,12 @@ connection requests. should be available via an instance attribute. +.. method:: async_chat._get_data() + + Will return and clear the data received with the sample + :meth:`_collect_incoming_data` implementation. + + .. method:: async_chat.get_terminator() Returns the current terminator for the channel. diff --git a/Doc/library/asyncore.rst b/Doc/library/asyncore.rst index 7242e736393..658c8ea808d 100644 --- a/Doc/library/asyncore.rst +++ b/Doc/library/asyncore.rst @@ -222,6 +222,21 @@ any that have been added to the map during asynchronous service) is closed. flushed). Sockets are automatically closed when they are garbage-collected. +.. class:: file_dispatcher() + + A file_dispatcher takes a file descriptor or file object along with an + optional map argument and wraps it for use with the :cfunc:`poll` or + :cfunc:`loop` functions. If provided a file object or anything with a + :cfunc:`fileno` method, that method will be called and passed to the + :class:`file_wrapper` constructor. Availability: UNIX. + +.. class:: file_wrapper() + + A file_wrapper takes an integer file descriptor and calls :func:`os.dup` to + duplicate the handle so that the original handle may be closed independently + of the file_wrapper. This class implements sufficient methods to emulate a + socket for use by the :class:`file_dispatcher` class. Availability: UNIX. + .. _asyncore-example: diff --git a/Lib/asynchat.py b/Lib/asynchat.py index 0e2457f8ece..ae82cfadcd0 100644 --- a/Lib/asynchat.py +++ b/Lib/asynchat.py @@ -45,12 +45,23 @@ command will be accumulated (using your own 'collect_incoming_data' method) up to the terminator, and then control will be returned to you - by calling your self.found_terminator() method. """ - -import sys import socket import asyncore from collections import deque +def buffer(obj, start=None, stop=None): + # if memoryview objects gain slicing semantics, + # this function will change for the better + # memoryview used for the TypeError + memoryview(obj) + if start == None: + start = 0 + if stop == None: + stop = len(obj) + x = obj[start:stop] + ## print("buffer type is: %s"%(type(x),)) + return x + class async_chat (asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add the two methods collect_incoming_data() and found_terminator()""" @@ -60,20 +71,47 @@ class async_chat (asyncore.dispatcher): ac_in_buffer_size = 4096 ac_out_buffer_size = 4096 + # we don't want to enable the use of encoding by default, because that is a + # sign of an application bug that we don't want to pass silently + + use_encoding = 0 + encoding = 'latin1' + def __init__ (self, conn=None): + # for string terminator matching self.ac_in_buffer = b'' - self.ac_out_buffer = b'' - self.producer_fifo = fifo() + + # we use a list here rather than cStringIO for a few reasons... + # del lst[:] is faster than sio.truncate(0) + # lst = [] is faster than sio.truncate(0) + # cStringIO will be gaining unicode support in py3k, which + # will negatively affect the performance of bytes compared to + # a ''.join() equivalent + self.incoming = [] + + # we toss the use of the "simple producer" and replace it with + # a pure deque, which the original fifo was a wrapping of + self.producer_fifo = deque() asyncore.dispatcher.__init__ (self, conn) def collect_incoming_data(self, data): raise NotImplementedError("must be implemented in subclass") + def _collect_incoming_data(self, data): + self.incoming.append(data) + + def _get_data(self): + d = b''.join(self.incoming) + del self.incoming[:] + return d + def found_terminator(self): raise NotImplementedError("must be implemented in subclass") def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, an integer, or None" + if isinstance(term, str) and self.use_encoding: + term = bytes(term, self.encoding) self.terminator = term def get_terminator (self): @@ -92,14 +130,14 @@ class async_chat (asyncore.dispatcher): self.handle_error() return - if isinstance(data, str): - data = data.encode('ascii') - self.ac_in_buffer = self.ac_in_buffer + bytes(data) + if isinstance(data, str) and self.use_encoding: + data = bytes(str, self.encoding) + self.ac_in_buffer = self.ac_in_buffer + data # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator - # combos with a single recv(1024). + # combos with a single recv(4096). while self.ac_in_buffer: lb = len(self.ac_in_buffer) @@ -108,7 +146,7 @@ class async_chat (asyncore.dispatcher): # no terminator, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = b'' - elif isinstance(terminator, int) or isinstance(terminator, int): + elif isinstance(terminator, int): # numeric terminator n = terminator if lb < n: @@ -129,8 +167,6 @@ class async_chat (asyncore.dispatcher): # 3) end of buffer does not match any prefix: # collect data terminator_len = len(terminator) - if isinstance(terminator, str): - terminator = terminator.encode('ascii') index = self.ac_in_buffer.find(terminator) if index != -1: # we found the terminator @@ -155,91 +191,87 @@ class async_chat (asyncore.dispatcher): self.ac_in_buffer = b'' def handle_write (self): - self.initiate_send () + self.initiate_send() def handle_close (self): self.close() def push (self, data): - self.producer_fifo.push (simple_producer (data)) + sabs = self.ac_out_buffer_size + if len(data) > sabs: + for i in range(0, len(data), sabs): + self.producer_fifo.append(data[i:i+sabs]) + else: + self.producer_fifo.append(data) self.initiate_send() def push_with_producer (self, producer): - self.producer_fifo.push (producer) + self.producer_fifo.append(producer) self.initiate_send() def readable (self): "predicate for inclusion in the readable for select()" - return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + # cannot use the old predicate, it violates the claim of the + # set_terminator method. + + # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) + return 1 def writable (self): "predicate for inclusion in the writable for select()" - # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) - # this is about twice as fast, though not as clear. - return not ( - (self.ac_out_buffer == b'') and - self.producer_fifo.is_empty() and - self.connected - ) + return self.producer_fifo or (not self.connected) def close_when_done (self): "automatically close this channel once the outgoing queue is empty" - self.producer_fifo.push (None) + self.producer_fifo.append(None) - # refill the outgoing buffer by calling the more() method - # of the first producer in the queue - def refill_buffer (self): - while 1: - if len(self.producer_fifo): - p = self.producer_fifo.first() - # a 'None' in the producer fifo is a sentinel, - # telling us to close the channel. - if p is None: - if not self.ac_out_buffer: - self.producer_fifo.pop() - self.close() + def initiate_send(self): + while self.producer_fifo and self.connected: + first = self.producer_fifo[0] + # handle empty string/buffer or None entry + if not first: + del self.producer_fifo[0] + if first is None: + ## print("first is None") + self.handle_close() return - elif isinstance(p, str) or isinstance(p, bytes): - if isinstance(p, str): - p = p.encode('ascii') - self.producer_fifo.pop() - self.ac_out_buffer = self.ac_out_buffer + p - return - data = p.more() - if data: - if isinstance(data, str): - data = data.encode('ascii') - self.ac_out_buffer = self.ac_out_buffer + bytes(data) - return - else: - self.producer_fifo.pop() - else: - return + ## print("first is not None") - def initiate_send (self): - obs = self.ac_out_buffer_size - # try to refill the buffer - if (len (self.ac_out_buffer) < obs): - self.refill_buffer() - - if self.ac_out_buffer and self.connected: - # try to send the buffer + # handle classic producer behavior + obs = self.ac_out_buffer_size try: - num_sent = self.send (self.ac_out_buffer[:obs]) - if num_sent: - self.ac_out_buffer = self.ac_out_buffer[num_sent:] + data = buffer(first, 0, obs) + except TypeError: + data = first.more() + if data: + self.producer_fifo.appendleft(data) + else: + del self.producer_fifo[0] + continue - except socket.error as why: + if isinstance(data, str) and self.use_encoding: + data = bytes(data, self.encoding) + + # send the data + try: + num_sent = self.send(data) + except socket.error: self.handle_error() return + if num_sent: + if num_sent < len(data) or obs < len(first): + self.producer_fifo[0] = first[num_sent:] + else: + del self.producer_fifo[0] + # we tried to send some actual data + return + def discard_buffers (self): # Emergencies only! self.ac_in_buffer = b'' - self.ac_out_buffer = b'' - while self.producer_fifo: - self.producer_fifo.pop() - + del self.incoming[:] + self.producer_fifo.clear() class simple_producer: diff --git a/Lib/asyncore.py b/Lib/asyncore.py index 2ec2e0dce34..e82d24bf4ff 100644 --- a/Lib/asyncore.py +++ b/Lib/asyncore.py @@ -50,23 +50,28 @@ import select import socket import sys import time - import os from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ - ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode + ENOTCONN, ESHUTDOWN, EINTR, EISCONN, EBADF, ECONNABORTED, errorcode try: socket_map except NameError: socket_map = {} +def _strerror(err): + res = os.strerror(err) + if res == 'Unknown error': + res = errorcode[err] + return res + class ExitNow(Exception): pass def read(obj): try: obj.handle_read_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -74,15 +79,15 @@ def read(obj): def write(obj): try: obj.handle_write_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() -def _exception (obj): +def _exception(obj): try: obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -95,7 +100,7 @@ def readwrite(obj, flags): obj.handle_write_event() if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): obj.handle_expt_event() - except ExitNow: + except (ExitNow, KeyboardInterrupt, SystemExit): raise except: obj.handle_error() @@ -105,7 +110,7 @@ def poll(timeout=0.0, map=None): map = socket_map if map: r = []; w = []; e = [] - for fd, obj in map.items(): + for fd, obj in list(map.items()): is_r = obj.readable() is_w = obj.writable() if is_r: @@ -116,14 +121,15 @@ def poll(timeout=0.0, map=None): e.append(fd) if [] == r == w == e: time.sleep(timeout) - else: - try: - r, w, e = select.select(r, w, e, timeout) - except select.error as err: - if err.args[0] != EINTR: - raise - else: - return + return + + try: + r, w, e = select.select(r, w, e, timeout) + except select.error as err: + if err[0] != EINTR: + raise + else: + return for fd in r: obj = map.get(fd) @@ -152,7 +158,7 @@ def poll2(timeout=0.0, map=None): timeout = int(timeout*1000) pollster = select.poll() if map: - for fd, obj in map.items(): + for fd, obj in list(map.items()): flags = 0 if obj.readable(): flags |= select.POLLIN | select.POLLPRI @@ -166,7 +172,7 @@ def poll2(timeout=0.0, map=None): try: r = pollster.poll(timeout) except select.error as err: - if err.args[0] != EINTR: + if err[0] != EINTR: raise r = [] for fd, flags in r: @@ -209,18 +215,29 @@ class dispatcher: else: self._map = map + self._fileno = None + if sock: + # Set to nonblocking just to make sure for cases where we + # get a socket from a blocking source. + sock.setblocking(0) self.set_socket(sock, map) - # I think it should inherit this anyway - self.socket.setblocking(0) self.connected = True - # XXX Does the constructor require that the socket passed - # be connected? + # The constructor no longer requires that the socket + # passed be connected. try: self.addr = sock.getpeername() - except socket.error: - # The addr isn't crucial - pass + except socket.error as err: + if err[0] == ENOTCONN: + # To handle the case where we got an unconnected + # socket. + self.connected = False + else: + # The socket is broken in some unknown way, alert + # the user and remove it from the map (to prevent + # polling of broken sockets). + self.del_channel(map) + raise else: self.socket = None @@ -254,10 +271,9 @@ class dispatcher: def create_socket(self, family, type): self.family_and_type = family, type - self.socket = socket.socket(family, type) - self.socket.setblocking(0) - self._fileno = self.socket.fileno() - self.add_channel() + sock = socket.socket(family, type) + sock.setblocking(0) + self.set_socket(sock) def set_socket(self, sock, map=None): self.socket = sock @@ -295,7 +311,7 @@ class dispatcher: def listen(self, num): self.accepting = True if os.name == 'nt' and num > 5: - num = 1 + num = 5 return self.socket.listen(num) def bind(self, addr): @@ -310,8 +326,7 @@ class dispatcher: return if err in (0, EISCONN): self.addr = address - self.connected = True - self.handle_connect() + self.handle_connect_event() else: raise socket.error(err, errorcode[err]) @@ -321,7 +336,7 @@ class dispatcher: conn, addr = self.socket.accept() return conn, addr except socket.error as why: - if why.args[0] == EWOULDBLOCK: + if why[0] == EWOULDBLOCK: pass else: raise @@ -331,11 +346,13 @@ class dispatcher: result = self.socket.send(data) return result except socket.error as why: - if why.args[0] == EWOULDBLOCK: + if why[0] == EWOULDBLOCK: + return 0 + elif why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): + self.handle_close() return 0 else: raise - return 0 def recv(self, buffer_size): try: @@ -349,15 +366,21 @@ class dispatcher: return data except socket.error as why: # winsock sometimes throws ENOTCONN - if why.args[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: + if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED]: self.handle_close() return b'' else: raise def close(self): + self.connected = False + self.accepting = False self.del_channel() - self.socket.close() + try: + self.socket.close() + except socket.error as why: + if why[0] not in (ENOTCONN, EBADF): + raise # cheap inheritance, used to pass all other attribute # references to the underlying socket object. @@ -377,27 +400,53 @@ class dispatcher: def handle_read_event(self): if self.accepting: - # for an accepting socket, getting a read implies - # that we are connected - if not self.connected: - self.connected = True + # accepting sockets are never connected, they "spawn" new + # sockets that are connected self.handle_accept() elif not self.connected: - self.handle_connect() - self.connected = True + self.handle_connect_event() self.handle_read() else: self.handle_read() + def handle_connect_event(self): + self.connected = True + self.handle_connect() + def handle_write_event(self): - # getting a write implies that we are connected + if self.accepting: + # Accepting sockets shouldn't get a write event. + # We will pretend it didn't happen. + return + if not self.connected: - self.handle_connect() - self.connected = True + #check for errors + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + if err != 0: + raise socket.error(err, _strerror(err)) + + self.handle_connect_event() self.handle_write() def handle_expt_event(self): - self.handle_expt() + # if the handle_expt is the same default worthless method, + # we'll not even bother calling it, we'll instead generate + # a useful error + x = True + try: + y1 = self.handle_expt.__func__ + y2 = dispatcher.handle_expt + x = y1 is y2 + except AttributeError: + pass + + if x: + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + msg = _strerror(err) + + raise socket.error(err, msg) + else: + self.handle_expt() def handle_error(self): nil, t, v, tbinfo = compact_traceback() @@ -461,7 +510,6 @@ class dispatcher_with_send(dispatcher): return (not self.connected) or len(self.out_buffer) def send(self, data): - assert isinstance(data, bytes) if self.debug: self.log_info('sending %s' % repr(data)) self.out_buffer = self.out_buffer + data @@ -474,7 +522,8 @@ class dispatcher_with_send(dispatcher): def compact_traceback(): t, v, tb = sys.exc_info() tbinfo = [] - assert tb # Must have a traceback + if not tb: # Must have a traceback + raise AssertionError("traceback does not exist") while tb: tbinfo.append(( tb.tb_frame.f_code.co_filename, @@ -490,11 +539,22 @@ def compact_traceback(): info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) return (file, function, line), t, v, info -def close_all(map=None): +def close_all(map=None, ignore_all=False): if map is None: map = socket_map - for x in map.values(): - x.socket.close() + for x in list(map.values()): + try: + x.close() + except OSError as x: + if x[0] == EBADF: + pass + elif not ignore_all: + raise + except (ExitNow, KeyboardInterrupt, SystemExit): + raise + except: + if not ignore_all: + raise map.clear() # Asynchronous File I/O: @@ -514,11 +574,12 @@ if os.name == 'posix': import fcntl class file_wrapper: - # here we override just enough to make a file + # Here we override just enough to make a file # look like a socket for the purposes of asyncore. + # The passed fd is automatically os.dup()'d def __init__(self, fd): - self.fd = fd + self.fd = os.dup(fd) def recv(self, *args): return os.read(self.fd, *args) @@ -540,6 +601,10 @@ if os.name == 'posix': def __init__(self, fd, map=None): dispatcher.__init__(self, None, map) self.connected = True + try: + fd = fd.fileno() + except AttributeError: + pass self.set_file(fd) # set it to non-blocking mode flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) diff --git a/Lib/smtpd.py b/Lib/smtpd.py index 3197ca9699a..ea59aef87ba 100755 --- a/Lib/smtpd.py +++ b/Lib/smtpd.py @@ -124,11 +124,11 @@ class SMTPChannel(asynchat.async_chat): self.__peer = conn.getpeername() print('Peer:', repr(self.__peer), file=DEBUGSTREAM) self.push('220 %s %s' % (self.__fqdn, __version__)) - self.set_terminator('\r\n') + self.set_terminator(b'\r\n') # Overrides base class for convenience def push(self, msg): - asynchat.async_chat.push(self, msg + '\r\n') + asynchat.async_chat.push(self, bytes(msg + '\r\n', 'ascii')) # Implementation of base class abstract method def collect_incoming_data(self, data): @@ -177,7 +177,7 @@ class SMTPChannel(asynchat.async_chat): self.__rcpttos = [] self.__mailfrom = None self.__state = self.COMMAND - self.set_terminator('\r\n') + self.set_terminator(b'\r\n') if not status: self.push('250 Ok') else: @@ -264,7 +264,7 @@ class SMTPChannel(asynchat.async_chat): self.push('501 Syntax: DATA') return self.__state = self.DATA - self.set_terminator('\r\n.\r\n') + self.set_terminator(b'\r\n.\r\n') self.push('354 End data with .') diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py index 082fde98239..db0b194a9ae 100644 --- a/Lib/test/test_asynchat.py +++ b/Lib/test/test_asynchat.py @@ -105,8 +105,8 @@ class TestAsynchat(unittest.TestCase): time.sleep(0.01) # Give server time to start accepting. c = echo_client(term, s.port) c.push(b"hello ") - c.push(bytes("world%s" % term, "ascii")) - c.push(bytes("I'm not dead yet!%s" % term, "ascii")) + c.push(b"world" + term) + c.push(b"I'm not dead yet!" + term) c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() @@ -120,17 +120,17 @@ class TestAsynchat(unittest.TestCase): def test_line_terminator1(self): # test one-character terminator for l in (1,2,3): - self.line_terminator_check('\n', l) + self.line_terminator_check(b'\n', l) def test_line_terminator2(self): # test two-character terminator for l in (1,2,3): - self.line_terminator_check('\r\n', l) + self.line_terminator_check(b'\r\n', l) def test_line_terminator3(self): # test three-character terminator for l in (1,2,3): - self.line_terminator_check('qqq', l) + self.line_terminator_check(b'qqq', l) def numeric_terminator_check(self, termlen): # Try reading a fixed number of bytes @@ -190,7 +190,7 @@ class TestAsynchat(unittest.TestCase): # checks that empty lines are handled correctly s, event = start_echo_server() c = echo_client(b'\n', s.port) - c.push("hello world\n\nI'm not dead yet!\n") + c.push(b"hello world\n\nI'm not dead yet!\n") c.push(SERVER_QUIT) asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) s.join() @@ -201,7 +201,7 @@ class TestAsynchat(unittest.TestCase): def test_close_when_done(self): s, event = start_echo_server() c = echo_client(b'\n', s.port) - c.push("hello world\nI'm not dead yet!\n") + c.push(b"hello world\nI'm not dead yet!\n") c.push(SERVER_QUIT) c.close_when_done() asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01) diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py index eb52687866d..716368b84f6 100644 --- a/Lib/test/test_asyncore.py +++ b/Lib/test/test_asyncore.py @@ -28,6 +28,9 @@ class dummychannel: def __init__(self): self.socket = dummysocket() + def close(self): + self.socket.close() + class exitingdummy: def __init__(self): pass @@ -382,8 +385,8 @@ if hasattr(asyncore, 'file_wrapper'): fd = os.open(TESTFN, os.O_RDONLY) w = asyncore.file_wrapper(fd) - self.assertEqual(w.fd, fd) - self.assertEqual(w.fileno(), fd) + self.assertNotEqual(w.fd, fd) + self.assertNotEqual(w.fileno(), fd) self.assertEqual(w.recv(13), b"It's not dead") self.assertEqual(w.read(6), b", it's") w.close() diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py index 3f3ba6b2b70..6a94658a70a 100644 --- a/Lib/test/test_smtplib.py +++ b/Lib/test/test_smtplib.py @@ -14,6 +14,14 @@ from test import support HOST = support.HOST +if sys.platform == 'darwin': + # select.poll returns a select.POLLHUP at the end of the tests + # on darwin, so just ignore it + def handle_expt(self): + pass + smtpd.SMTPChannel.handle_expt = handle_expt + + def server(evt, buf, serv): serv.listen(5) evt.set()