diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 6413ce0de1b..99bea57e66f 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -624,6 +624,8 @@ class _SelectorTransport(transports._FlowControlMixin, self._buffer = self._buffer_factory() self._conn_lost = 0 # Set when call to connection_lost scheduled. self._closing = False # Set when close() called. + self._paused = False # Set when pause_reading() called + if self._server is not None: self._server._attach() loop._transports[self._sock_fd] = self @@ -669,6 +671,25 @@ class _SelectorTransport(transports._FlowControlMixin, def is_closing(self): return self._closing + def is_reading(self): + return not self.is_closing() and not self._paused + + def pause_reading(self): + if not self.is_reading(): + return + self._paused = True + self._loop._remove_reader(self._sock_fd) + if self._loop.get_debug(): + logger.debug("%r pauses reading", self) + + def resume_reading(self): + if self._closing or not self._paused: + return + self._paused = False + self._add_reader(self._sock_fd, self._read_ready) + if self._loop.get_debug(): + logger.debug("%r resumes reading", self) + def close(self): if self._closing: return @@ -728,7 +749,7 @@ class _SelectorTransport(transports._FlowControlMixin, return len(self._buffer) def _add_reader(self, fd, callback, *args): - if self._closing: + if not self.is_reading(): return self._loop._add_reader(fd, callback, *args) @@ -744,7 +765,6 @@ class _SelectorSocketTransport(_SelectorTransport): self._read_ready_cb = None super().__init__(loop, sock, protocol, extra, server) self._eof = False - self._paused = False self._empty_waiter = None # Disable the Nagle algorithm -- small writes will be @@ -769,30 +789,6 @@ class _SelectorSocketTransport(_SelectorTransport): super().set_protocol(protocol) - def is_reading(self): - return not self._paused and not self._closing - - def pause_reading(self): - if not self.is_reading(): - return - self._paused = True - self._loop._remove_reader(self._sock_fd) - if self._loop.get_debug(): - logger.debug("%r pauses reading", self) - - def resume_reading(self): - if self._closing or not self._paused: - return - self._paused = False - self._add_reader(self._sock_fd, self._read_ready) - if self._loop.get_debug(): - logger.debug("%r resumes reading", self) - - def _add_reader(self, fd, callback, *args): - if not self.is_reading(): - return - self._loop._add_reader(fd, callback, *args) - def _read_ready(self): self._read_ready_cb()