Move ReadTransport SelectorSocketTransport override to SelectorTransport parent class

This commit is contained in:
itay azolay 2020-01-12 18:42:34 +02:00
parent c4b3444f9b
commit 9da5fce8e2
1 changed files with 22 additions and 26 deletions

View File

@ -624,6 +624,8 @@ class _SelectorTransport(transports._FlowControlMixin,
self._buffer = self._buffer_factory() self._buffer = self._buffer_factory()
self._conn_lost = 0 # Set when call to connection_lost scheduled. self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called. self._closing = False # Set when close() called.
self._paused = False # Set when pause_reading() called
if self._server is not None: if self._server is not None:
self._server._attach() self._server._attach()
loop._transports[self._sock_fd] = self loop._transports[self._sock_fd] = self
@ -669,6 +671,25 @@ class _SelectorTransport(transports._FlowControlMixin,
def is_closing(self): def is_closing(self):
return self._closing return self._closing
def is_reading(self):
return not self.is_closing() and not self._paused
def pause_reading(self):
if not self.is_reading():
return
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
def resume_reading(self):
if self._closing or not self._paused:
return
self._paused = False
self._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def close(self): def close(self):
if self._closing: if self._closing:
return return
@ -728,7 +749,7 @@ class _SelectorTransport(transports._FlowControlMixin,
return len(self._buffer) return len(self._buffer)
def _add_reader(self, fd, callback, *args): def _add_reader(self, fd, callback, *args):
if self._closing: if not self.is_reading():
return return
self._loop._add_reader(fd, callback, *args) self._loop._add_reader(fd, callback, *args)
@ -744,7 +765,6 @@ class _SelectorSocketTransport(_SelectorTransport):
self._read_ready_cb = None self._read_ready_cb = None
super().__init__(loop, sock, protocol, extra, server) super().__init__(loop, sock, protocol, extra, server)
self._eof = False self._eof = False
self._paused = False
self._empty_waiter = None self._empty_waiter = None
# Disable the Nagle algorithm -- small writes will be # Disable the Nagle algorithm -- small writes will be
@ -769,30 +789,6 @@ class _SelectorSocketTransport(_SelectorTransport):
super().set_protocol(protocol) super().set_protocol(protocol)
def is_reading(self):
return not self._paused and not self._closing
def pause_reading(self):
if not self.is_reading():
return
self._paused = True
self._loop._remove_reader(self._sock_fd)
if self._loop.get_debug():
logger.debug("%r pauses reading", self)
def resume_reading(self):
if self._closing or not self._paused:
return
self._paused = False
self._add_reader(self._sock_fd, self._read_ready)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
def _add_reader(self, fd, callback, *args):
if not self.is_reading():
return
self._loop._add_reader(fd, callback, *args)
def _read_ready(self): def _read_ready(self):
self._read_ready_cb() self._read_ready_cb()