diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index b2ac632fa4c..d72f9274a83 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -15,7 +15,8 @@ from . import transports from .log import logger -class _ProactorBasePipeTransport(transports.BaseTransport): +class _ProactorBasePipeTransport(transports._FlowControlMixin, + transports.BaseTransport): """Base class for pipe and socket transports.""" def __init__(self, loop, sock, protocol, waiter=None, @@ -33,8 +34,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport): self._conn_lost = 0 self._closing = False # Set when close() called. self._eof_written = False - self._protocol_paused = False - self.set_write_buffer_limits() if self._server is not None: self._server.attach(self) self._loop.call_soon(self._protocol.connection_made, self) @@ -94,56 +93,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport): server.detach(self) self._server = None - # XXX The next four methods are nearly identical to corresponding - # ones in _SelectorTransport. Maybe refactor buffer management to - # share the implementations? (Also these are really only needed - # by _ProactorWritePipeTransport but since _buffer is defined on - # the base class I am putting it here for now.) - - def _maybe_pause_protocol(self): - size = self.get_write_buffer_size() - if size <= self._high_water: - return - if not self._protocol_paused: - self._protocol_paused = True - try: - self._protocol.pause_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.pause_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def _maybe_resume_protocol(self): - if (self._protocol_paused and - self.get_write_buffer_size() <= self._low_water): - self._protocol_paused = False - try: - self._protocol.resume_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.resume_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def set_write_buffer_limits(self, high=None, low=None): - if high is None: - if low is None: - high = 64*1024 - else: - high = 4*low - if low is None: - low = high // 4 - if not high >= low >= 0: - raise ValueError('high (%r) must be >= low (%r) must be >= 0' % - (high, low)) - self._high_water = high - self._low_water = low - def get_write_buffer_size(self): size = self._pending_write if self._buffer is not None: diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index fb86f8245b1..869d66e0c8f 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -338,77 +338,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop): sock.close() -class _FlowControlMixin(transports.Transport): - """All the logic for (write) flow control in a mix-in base class. - - The subclass must implement get_write_buffer_size(). It must call - _maybe_pause_protocol() whenever the write buffer size increases, - and _maybe_resume_protocol() whenever it decreases. It may also - override set_write_buffer_limits() (e.g. to specify different - defaults). - - The subclass constructor must call super().__init__(extra). This - will call set_write_buffer_limits(). - - The user may call set_write_buffer_limits() and - get_write_buffer_size(), and their protocol's pause_writing() and - resume_writing() may be called. - """ - - def __init__(self, extra=None): - super().__init__(extra) - self._protocol_paused = False - self.set_write_buffer_limits() - - def _maybe_pause_protocol(self): - size = self.get_write_buffer_size() - if size <= self._high_water: - return - if not self._protocol_paused: - self._protocol_paused = True - try: - self._protocol.pause_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.pause_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def _maybe_resume_protocol(self): - if (self._protocol_paused and - self.get_write_buffer_size() <= self._low_water): - self._protocol_paused = False - try: - self._protocol.resume_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.resume_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def set_write_buffer_limits(self, high=None, low=None): - if high is None: - if low is None: - high = 64*1024 - else: - high = 4*low - if low is None: - low = high // 4 - if not high >= low >= 0: - raise ValueError('high (%r) must be >= low (%r) must be >= 0' % - (high, low)) - self._high_water = high - self._low_water = low - - def get_write_buffer_size(self): - raise NotImplementedError - - -class _SelectorTransport(_FlowControlMixin, transports.Transport): +class _SelectorTransport(transports._FlowControlMixin, + transports.Transport): max_size = 256 * 1024 # Buffer size passed to recv(). diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py index 67ae7fda903..5b975aa7315 100644 --- a/Lib/asyncio/transports.py +++ b/Lib/asyncio/transports.py @@ -219,3 +219,73 @@ class SubprocessTransport(BaseTransport): http://docs.python.org/3/library/subprocess#subprocess.Popen.kill """ raise NotImplementedError + + +class _FlowControlMixin(Transport): + """All the logic for (write) flow control in a mix-in base class. + + The subclass must implement get_write_buffer_size(). It must call + _maybe_pause_protocol() whenever the write buffer size increases, + and _maybe_resume_protocol() whenever it decreases. It may also + override set_write_buffer_limits() (e.g. to specify different + defaults). + + The subclass constructor must call super().__init__(extra). This + will call set_write_buffer_limits(). + + The user may call set_write_buffer_limits() and + get_write_buffer_size(), and their protocol's pause_writing() and + resume_writing() may be called. + """ + + def __init__(self, extra=None): + super().__init__(extra) + self._protocol_paused = False + self.set_write_buffer_limits() + + def _maybe_pause_protocol(self): + size = self.get_write_buffer_size() + if size <= self._high_water: + return + if not self._protocol_paused: + self._protocol_paused = True + try: + self._protocol.pause_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def _maybe_resume_protocol(self): + if (self._protocol_paused and + self.get_write_buffer_size() <= self._low_water): + self._protocol_paused = False + try: + self._protocol.resume_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def set_write_buffer_limits(self, high=None, low=None): + if high is None: + if low is None: + high = 64*1024 + else: + high = 4*low + if low is None: + low = high // 4 + if not high >= low >= 0: + raise ValueError('high (%r) must be >= low (%r) must be >= 0' % + (high, low)) + self._high_water = high + self._low_water = low + + def get_write_buffer_size(self): + raise NotImplementedError diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 9a40c04d1ea..748452c5886 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -317,7 +317,7 @@ class _UnixReadPipeTransport(transports.ReadTransport): self._loop = None -class _UnixWritePipeTransport(selector_events._FlowControlMixin, +class _UnixWritePipeTransport(transports._FlowControlMixin, transports.WriteTransport): def __init__(self, loop, pipe, protocol, waiter=None, extra=None):