asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin
This commit is contained in:
parent
ff827f08ac
commit
c098241342
|
@ -15,7 +15,8 @@ from . import transports
|
|||
from .log import logger
|
||||
|
||||
|
||||
class _ProactorBasePipeTransport(transports.BaseTransport):
|
||||
class _ProactorBasePipeTransport(transports._FlowControlMixin,
|
||||
transports.BaseTransport):
|
||||
"""Base class for pipe and socket transports."""
|
||||
|
||||
def __init__(self, loop, sock, protocol, waiter=None,
|
||||
|
@ -33,8 +34,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
|
|||
self._conn_lost = 0
|
||||
self._closing = False # Set when close() called.
|
||||
self._eof_written = False
|
||||
self._protocol_paused = False
|
||||
self.set_write_buffer_limits()
|
||||
if self._server is not None:
|
||||
self._server.attach(self)
|
||||
self._loop.call_soon(self._protocol.connection_made, self)
|
||||
|
@ -94,56 +93,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
|
|||
server.detach(self)
|
||||
self._server = None
|
||||
|
||||
# XXX The next four methods are nearly identical to corresponding
|
||||
# ones in _SelectorTransport. Maybe refactor buffer management to
|
||||
# share the implementations? (Also these are really only needed
|
||||
# by _ProactorWritePipeTransport but since _buffer is defined on
|
||||
# the base class I am putting it here for now.)
|
||||
|
||||
def _maybe_pause_protocol(self):
|
||||
size = self.get_write_buffer_size()
|
||||
if size <= self._high_water:
|
||||
return
|
||||
if not self._protocol_paused:
|
||||
self._protocol_paused = True
|
||||
try:
|
||||
self._protocol.pause_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.pause_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def _maybe_resume_protocol(self):
|
||||
if (self._protocol_paused and
|
||||
self.get_write_buffer_size() <= self._low_water):
|
||||
self._protocol_paused = False
|
||||
try:
|
||||
self._protocol.resume_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.resume_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def set_write_buffer_limits(self, high=None, low=None):
|
||||
if high is None:
|
||||
if low is None:
|
||||
high = 64*1024
|
||||
else:
|
||||
high = 4*low
|
||||
if low is None:
|
||||
low = high // 4
|
||||
if not high >= low >= 0:
|
||||
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
|
||||
(high, low))
|
||||
self._high_water = high
|
||||
self._low_water = low
|
||||
|
||||
def get_write_buffer_size(self):
|
||||
size = self._pending_write
|
||||
if self._buffer is not None:
|
||||
|
|
|
@ -338,77 +338,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
sock.close()
|
||||
|
||||
|
||||
class _FlowControlMixin(transports.Transport):
|
||||
"""All the logic for (write) flow control in a mix-in base class.
|
||||
|
||||
The subclass must implement get_write_buffer_size(). It must call
|
||||
_maybe_pause_protocol() whenever the write buffer size increases,
|
||||
and _maybe_resume_protocol() whenever it decreases. It may also
|
||||
override set_write_buffer_limits() (e.g. to specify different
|
||||
defaults).
|
||||
|
||||
The subclass constructor must call super().__init__(extra). This
|
||||
will call set_write_buffer_limits().
|
||||
|
||||
The user may call set_write_buffer_limits() and
|
||||
get_write_buffer_size(), and their protocol's pause_writing() and
|
||||
resume_writing() may be called.
|
||||
"""
|
||||
|
||||
def __init__(self, extra=None):
|
||||
super().__init__(extra)
|
||||
self._protocol_paused = False
|
||||
self.set_write_buffer_limits()
|
||||
|
||||
def _maybe_pause_protocol(self):
|
||||
size = self.get_write_buffer_size()
|
||||
if size <= self._high_water:
|
||||
return
|
||||
if not self._protocol_paused:
|
||||
self._protocol_paused = True
|
||||
try:
|
||||
self._protocol.pause_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.pause_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def _maybe_resume_protocol(self):
|
||||
if (self._protocol_paused and
|
||||
self.get_write_buffer_size() <= self._low_water):
|
||||
self._protocol_paused = False
|
||||
try:
|
||||
self._protocol.resume_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.resume_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def set_write_buffer_limits(self, high=None, low=None):
|
||||
if high is None:
|
||||
if low is None:
|
||||
high = 64*1024
|
||||
else:
|
||||
high = 4*low
|
||||
if low is None:
|
||||
low = high // 4
|
||||
if not high >= low >= 0:
|
||||
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
|
||||
(high, low))
|
||||
self._high_water = high
|
||||
self._low_water = low
|
||||
|
||||
def get_write_buffer_size(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class _SelectorTransport(_FlowControlMixin, transports.Transport):
|
||||
class _SelectorTransport(transports._FlowControlMixin,
|
||||
transports.Transport):
|
||||
|
||||
max_size = 256 * 1024 # Buffer size passed to recv().
|
||||
|
||||
|
|
|
@ -219,3 +219,73 @@ class SubprocessTransport(BaseTransport):
|
|||
http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class _FlowControlMixin(Transport):
|
||||
"""All the logic for (write) flow control in a mix-in base class.
|
||||
|
||||
The subclass must implement get_write_buffer_size(). It must call
|
||||
_maybe_pause_protocol() whenever the write buffer size increases,
|
||||
and _maybe_resume_protocol() whenever it decreases. It may also
|
||||
override set_write_buffer_limits() (e.g. to specify different
|
||||
defaults).
|
||||
|
||||
The subclass constructor must call super().__init__(extra). This
|
||||
will call set_write_buffer_limits().
|
||||
|
||||
The user may call set_write_buffer_limits() and
|
||||
get_write_buffer_size(), and their protocol's pause_writing() and
|
||||
resume_writing() may be called.
|
||||
"""
|
||||
|
||||
def __init__(self, extra=None):
|
||||
super().__init__(extra)
|
||||
self._protocol_paused = False
|
||||
self.set_write_buffer_limits()
|
||||
|
||||
def _maybe_pause_protocol(self):
|
||||
size = self.get_write_buffer_size()
|
||||
if size <= self._high_water:
|
||||
return
|
||||
if not self._protocol_paused:
|
||||
self._protocol_paused = True
|
||||
try:
|
||||
self._protocol.pause_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.pause_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def _maybe_resume_protocol(self):
|
||||
if (self._protocol_paused and
|
||||
self.get_write_buffer_size() <= self._low_water):
|
||||
self._protocol_paused = False
|
||||
try:
|
||||
self._protocol.resume_writing()
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.resume_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def set_write_buffer_limits(self, high=None, low=None):
|
||||
if high is None:
|
||||
if low is None:
|
||||
high = 64*1024
|
||||
else:
|
||||
high = 4*low
|
||||
if low is None:
|
||||
low = high // 4
|
||||
if not high >= low >= 0:
|
||||
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
|
||||
(high, low))
|
||||
self._high_water = high
|
||||
self._low_water = low
|
||||
|
||||
def get_write_buffer_size(self):
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -317,7 +317,7 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
self._loop = None
|
||||
|
||||
|
||||
class _UnixWritePipeTransport(selector_events._FlowControlMixin,
|
||||
class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||
transports.WriteTransport):
|
||||
|
||||
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
|
||||
|
|
Loading…
Reference in New Issue