mirror of https://github.com/python/cpython
bpo-34622: Extract asyncio exceptions into a separate module (GH-9141)
This commit is contained in:
parent
7c7605ff11
commit
0baa72f4b2
|
@ -8,6 +8,7 @@ import sys
|
||||||
from .base_events import *
|
from .base_events import *
|
||||||
from .coroutines import *
|
from .coroutines import *
|
||||||
from .events import *
|
from .events import *
|
||||||
|
from .exceptions import *
|
||||||
from .futures import *
|
from .futures import *
|
||||||
from .locks import *
|
from .locks import *
|
||||||
from .protocols import *
|
from .protocols import *
|
||||||
|
@ -25,6 +26,7 @@ from .tasks import _all_tasks_compat # NoQA
|
||||||
__all__ = (base_events.__all__ +
|
__all__ = (base_events.__all__ +
|
||||||
coroutines.__all__ +
|
coroutines.__all__ +
|
||||||
events.__all__ +
|
events.__all__ +
|
||||||
|
exceptions.__all__ +
|
||||||
futures.__all__ +
|
futures.__all__ +
|
||||||
locks.__all__ +
|
locks.__all__ +
|
||||||
protocols.__all__ +
|
protocols.__all__ +
|
||||||
|
|
|
@ -37,6 +37,7 @@ except ImportError: # pragma: no cover
|
||||||
from . import constants
|
from . import constants
|
||||||
from . import coroutines
|
from . import coroutines
|
||||||
from . import events
|
from . import events
|
||||||
|
from . import exceptions
|
||||||
from . import futures
|
from . import futures
|
||||||
from . import protocols
|
from . import protocols
|
||||||
from . import sslproto
|
from . import sslproto
|
||||||
|
@ -327,7 +328,7 @@ class Server(events.AbstractServer):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._serving_forever_fut
|
await self._serving_forever_fut
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
try:
|
try:
|
||||||
self.close()
|
self.close()
|
||||||
await self.wait_closed()
|
await self.wait_closed()
|
||||||
|
@ -800,7 +801,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
try:
|
try:
|
||||||
return await self._sock_sendfile_native(sock, file,
|
return await self._sock_sendfile_native(sock, file,
|
||||||
offset, count)
|
offset, count)
|
||||||
except events.SendfileNotAvailableError as exc:
|
except exceptions.SendfileNotAvailableError as exc:
|
||||||
if not fallback:
|
if not fallback:
|
||||||
raise
|
raise
|
||||||
return await self._sock_sendfile_fallback(sock, file,
|
return await self._sock_sendfile_fallback(sock, file,
|
||||||
|
@ -809,7 +810,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
async def _sock_sendfile_native(self, sock, file, offset, count):
|
async def _sock_sendfile_native(self, sock, file, offset, count):
|
||||||
# NB: sendfile syscall is not supported for SSL sockets and
|
# NB: sendfile syscall is not supported for SSL sockets and
|
||||||
# non-mmap files even if sendfile is supported by OS
|
# non-mmap files even if sendfile is supported by OS
|
||||||
raise events.SendfileNotAvailableError(
|
raise exceptions.SendfileNotAvailableError(
|
||||||
f"syscall sendfile is not available for socket {sock!r} "
|
f"syscall sendfile is not available for socket {sock!r} "
|
||||||
"and file {file!r} combination")
|
"and file {file!r} combination")
|
||||||
|
|
||||||
|
@ -1053,7 +1054,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
try:
|
try:
|
||||||
return await self._sendfile_native(transport, file,
|
return await self._sendfile_native(transport, file,
|
||||||
offset, count)
|
offset, count)
|
||||||
except events.SendfileNotAvailableError as exc:
|
except exceptions.SendfileNotAvailableError as exc:
|
||||||
if not fallback:
|
if not fallback:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -1066,7 +1067,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
||||||
offset, count)
|
offset, count)
|
||||||
|
|
||||||
async def _sendfile_native(self, transp, file, offset, count):
|
async def _sendfile_native(self, transp, file, offset, count):
|
||||||
raise events.SendfileNotAvailableError(
|
raise exceptions.SendfileNotAvailableError(
|
||||||
"sendfile syscall is not supported")
|
"sendfile syscall is not supported")
|
||||||
|
|
||||||
async def _sendfile_fallback(self, transp, file, offset, count):
|
async def _sendfile_fallback(self, transp, file, offset, count):
|
||||||
|
|
|
@ -1,15 +1,9 @@
|
||||||
__all__ = ()
|
__all__ = ()
|
||||||
|
|
||||||
import concurrent.futures
|
|
||||||
import reprlib
|
import reprlib
|
||||||
|
|
||||||
from . import format_helpers
|
from . import format_helpers
|
||||||
|
|
||||||
CancelledError = concurrent.futures.CancelledError
|
|
||||||
TimeoutError = concurrent.futures.TimeoutError
|
|
||||||
InvalidStateError = concurrent.futures.InvalidStateError
|
|
||||||
|
|
||||||
|
|
||||||
# States for Future.
|
# States for Future.
|
||||||
_PENDING = 'PENDING'
|
_PENDING = 'PENDING'
|
||||||
_CANCELLED = 'CANCELLED'
|
_CANCELLED = 'CANCELLED'
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'AbstractEventLoopPolicy',
|
'AbstractEventLoopPolicy',
|
||||||
'AbstractEventLoop', 'AbstractServer',
|
'AbstractEventLoop', 'AbstractServer',
|
||||||
'Handle', 'TimerHandle', 'SendfileNotAvailableError',
|
'Handle', 'TimerHandle',
|
||||||
'get_event_loop_policy', 'set_event_loop_policy',
|
'get_event_loop_policy', 'set_event_loop_policy',
|
||||||
'get_event_loop', 'set_event_loop', 'new_event_loop',
|
'get_event_loop', 'set_event_loop', 'new_event_loop',
|
||||||
'get_child_watcher', 'set_child_watcher',
|
'get_child_watcher', 'set_child_watcher',
|
||||||
|
@ -19,14 +19,7 @@ import sys
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
from . import format_helpers
|
from . import format_helpers
|
||||||
|
from . import exceptions
|
||||||
|
|
||||||
class SendfileNotAvailableError(RuntimeError):
|
|
||||||
"""Sendfile syscall is not available.
|
|
||||||
|
|
||||||
Raised if OS does not support sendfile syscall for given socket or
|
|
||||||
file type.
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class Handle:
|
class Handle:
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
"""asyncio exceptions."""
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ('CancelledError', 'InvalidStateError', 'TimeoutError',
|
||||||
|
'IncompleteReadError', 'LimitOverrunError',
|
||||||
|
'SendfileNotAvailableError')
|
||||||
|
|
||||||
|
import concurrent.futures
|
||||||
|
from . import base_futures
|
||||||
|
|
||||||
|
|
||||||
|
class CancelledError(concurrent.futures.CancelledError):
|
||||||
|
"""The Future or Task was cancelled."""
|
||||||
|
|
||||||
|
|
||||||
|
class TimeoutError(concurrent.futures.TimeoutError):
|
||||||
|
"""The operation exceeded the given deadline."""
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidStateError(concurrent.futures.InvalidStateError):
|
||||||
|
"""The operation is not allowed in this state."""
|
||||||
|
|
||||||
|
|
||||||
|
class SendfileNotAvailableError(RuntimeError):
|
||||||
|
"""Sendfile syscall is not available.
|
||||||
|
|
||||||
|
Raised if OS does not support sendfile syscall for given socket or
|
||||||
|
file type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class IncompleteReadError(EOFError):
|
||||||
|
"""
|
||||||
|
Incomplete read error. Attributes:
|
||||||
|
|
||||||
|
- partial: read bytes string before the end of stream was reached
|
||||||
|
- expected: total number of expected bytes (or None if unknown)
|
||||||
|
"""
|
||||||
|
def __init__(self, partial, expected):
|
||||||
|
super().__init__(f'{len(partial)} bytes read on a total of '
|
||||||
|
f'{expected!r} expected bytes')
|
||||||
|
self.partial = partial
|
||||||
|
self.expected = expected
|
||||||
|
|
||||||
|
def __reduce__(self):
|
||||||
|
return type(self), (self.partial, self.expected)
|
||||||
|
|
||||||
|
|
||||||
|
class LimitOverrunError(Exception):
|
||||||
|
"""Reached the buffer limit while looking for a separator.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
- consumed: total number of to be consumed bytes.
|
||||||
|
"""
|
||||||
|
def __init__(self, message, consumed):
|
||||||
|
super().__init__(message)
|
||||||
|
self.consumed = consumed
|
||||||
|
|
||||||
|
def __reduce__(self):
|
||||||
|
return type(self), (self.args[0], self.consumed)
|
|
@ -1,7 +1,6 @@
|
||||||
"""A Future class similar to the one in PEP 3148."""
|
"""A Future class similar to the one in PEP 3148."""
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'CancelledError', 'TimeoutError', 'InvalidStateError',
|
|
||||||
'Future', 'wrap_future', 'isfuture',
|
'Future', 'wrap_future', 'isfuture',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,12 +11,10 @@ import sys
|
||||||
|
|
||||||
from . import base_futures
|
from . import base_futures
|
||||||
from . import events
|
from . import events
|
||||||
|
from . import exceptions
|
||||||
from . import format_helpers
|
from . import format_helpers
|
||||||
|
|
||||||
|
|
||||||
CancelledError = base_futures.CancelledError
|
|
||||||
InvalidStateError = base_futures.InvalidStateError
|
|
||||||
TimeoutError = base_futures.TimeoutError
|
|
||||||
isfuture = base_futures.isfuture
|
isfuture = base_futures.isfuture
|
||||||
|
|
||||||
|
|
||||||
|
@ -170,9 +167,9 @@ class Future:
|
||||||
the future is done and has an exception set, this exception is raised.
|
the future is done and has an exception set, this exception is raised.
|
||||||
"""
|
"""
|
||||||
if self._state == _CANCELLED:
|
if self._state == _CANCELLED:
|
||||||
raise CancelledError
|
raise exceptions.CancelledError
|
||||||
if self._state != _FINISHED:
|
if self._state != _FINISHED:
|
||||||
raise InvalidStateError('Result is not ready.')
|
raise exceptions.InvalidStateError('Result is not ready.')
|
||||||
self.__log_traceback = False
|
self.__log_traceback = False
|
||||||
if self._exception is not None:
|
if self._exception is not None:
|
||||||
raise self._exception
|
raise self._exception
|
||||||
|
@ -187,9 +184,9 @@ class Future:
|
||||||
InvalidStateError.
|
InvalidStateError.
|
||||||
"""
|
"""
|
||||||
if self._state == _CANCELLED:
|
if self._state == _CANCELLED:
|
||||||
raise CancelledError
|
raise exceptions.CancelledError
|
||||||
if self._state != _FINISHED:
|
if self._state != _FINISHED:
|
||||||
raise InvalidStateError('Exception is not set.')
|
raise exceptions.InvalidStateError('Exception is not set.')
|
||||||
self.__log_traceback = False
|
self.__log_traceback = False
|
||||||
return self._exception
|
return self._exception
|
||||||
|
|
||||||
|
@ -231,7 +228,7 @@ class Future:
|
||||||
InvalidStateError.
|
InvalidStateError.
|
||||||
"""
|
"""
|
||||||
if self._state != _PENDING:
|
if self._state != _PENDING:
|
||||||
raise InvalidStateError('{}: {!r}'.format(self._state, self))
|
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
|
||||||
self._result = result
|
self._result = result
|
||||||
self._state = _FINISHED
|
self._state = _FINISHED
|
||||||
self.__schedule_callbacks()
|
self.__schedule_callbacks()
|
||||||
|
@ -243,7 +240,7 @@ class Future:
|
||||||
InvalidStateError.
|
InvalidStateError.
|
||||||
"""
|
"""
|
||||||
if self._state != _PENDING:
|
if self._state != _PENDING:
|
||||||
raise InvalidStateError('{}: {!r}'.format(self._state, self))
|
raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
|
||||||
if isinstance(exception, type):
|
if isinstance(exception, type):
|
||||||
exception = exception()
|
exception = exception()
|
||||||
if type(exception) is StopIteration:
|
if type(exception) is StopIteration:
|
||||||
|
@ -288,6 +285,18 @@ def _set_result_unless_cancelled(fut, result):
|
||||||
fut.set_result(result)
|
fut.set_result(result)
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_future_exc(exc):
|
||||||
|
exc_class = type(exc)
|
||||||
|
if exc_class is concurrent.futures.CancelledError:
|
||||||
|
return exceptions.CancelledError(*exc.args)
|
||||||
|
elif exc_class is concurrent.futures.TimeoutError:
|
||||||
|
return exceptions.TimeoutError(*exc.args)
|
||||||
|
elif exc_class is concurrent.futures.InvalidStateError:
|
||||||
|
return exceptions.InvalidStateError(*exc.args)
|
||||||
|
else:
|
||||||
|
return exc
|
||||||
|
|
||||||
|
|
||||||
def _set_concurrent_future_state(concurrent, source):
|
def _set_concurrent_future_state(concurrent, source):
|
||||||
"""Copy state from a future to a concurrent.futures.Future."""
|
"""Copy state from a future to a concurrent.futures.Future."""
|
||||||
assert source.done()
|
assert source.done()
|
||||||
|
@ -297,7 +306,7 @@ def _set_concurrent_future_state(concurrent, source):
|
||||||
return
|
return
|
||||||
exception = source.exception()
|
exception = source.exception()
|
||||||
if exception is not None:
|
if exception is not None:
|
||||||
concurrent.set_exception(exception)
|
concurrent.set_exception(_convert_future_exc(exception))
|
||||||
else:
|
else:
|
||||||
result = source.result()
|
result = source.result()
|
||||||
concurrent.set_result(result)
|
concurrent.set_result(result)
|
||||||
|
@ -317,7 +326,7 @@ def _copy_future_state(source, dest):
|
||||||
else:
|
else:
|
||||||
exception = source.exception()
|
exception = source.exception()
|
||||||
if exception is not None:
|
if exception is not None:
|
||||||
dest.set_exception(exception)
|
dest.set_exception(_convert_future_exc(exception))
|
||||||
else:
|
else:
|
||||||
result = source.result()
|
result = source.result()
|
||||||
dest.set_result(result)
|
dest.set_result(result)
|
||||||
|
|
|
@ -7,6 +7,7 @@ import warnings
|
||||||
|
|
||||||
from . import events
|
from . import events
|
||||||
from . import futures
|
from . import futures
|
||||||
|
from . import exceptions
|
||||||
from .coroutines import coroutine
|
from .coroutines import coroutine
|
||||||
|
|
||||||
|
|
||||||
|
@ -192,7 +193,7 @@ class Lock(_ContextManagerMixin):
|
||||||
await fut
|
await fut
|
||||||
finally:
|
finally:
|
||||||
self._waiters.remove(fut)
|
self._waiters.remove(fut)
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
if not self._locked:
|
if not self._locked:
|
||||||
self._wake_up_first()
|
self._wake_up_first()
|
||||||
raise
|
raise
|
||||||
|
@ -363,11 +364,11 @@ class Condition(_ContextManagerMixin):
|
||||||
try:
|
try:
|
||||||
await self.acquire()
|
await self.acquire()
|
||||||
break
|
break
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
cancelled = True
|
cancelled = True
|
||||||
|
|
||||||
if cancelled:
|
if cancelled:
|
||||||
raise futures.CancelledError
|
raise exceptions.CancelledError
|
||||||
|
|
||||||
async def wait_for(self, predicate):
|
async def wait_for(self, predicate):
|
||||||
"""Wait until a predicate becomes true.
|
"""Wait until a predicate becomes true.
|
||||||
|
|
|
@ -15,6 +15,7 @@ from . import base_events
|
||||||
from . import constants
|
from . import constants
|
||||||
from . import events
|
from . import events
|
||||||
from . import futures
|
from . import futures
|
||||||
|
from . import exceptions
|
||||||
from . import protocols
|
from . import protocols
|
||||||
from . import sslproto
|
from . import sslproto
|
||||||
from . import transports
|
from . import transports
|
||||||
|
@ -282,7 +283,7 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
|
||||||
self._force_close(exc)
|
self._force_close(exc)
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
self._fatal_error(exc, 'Fatal read error on pipe transport')
|
self._fatal_error(exc, 'Fatal read error on pipe transport')
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
if not self._closing:
|
if not self._closing:
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
|
@ -555,11 +556,11 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
||||||
try:
|
try:
|
||||||
fileno = file.fileno()
|
fileno = file.fileno()
|
||||||
except (AttributeError, io.UnsupportedOperation) as err:
|
except (AttributeError, io.UnsupportedOperation) as err:
|
||||||
raise events.SendfileNotAvailableError("not a regular file")
|
raise exceptions.SendfileNotAvailableError("not a regular file")
|
||||||
try:
|
try:
|
||||||
fsize = os.fstat(fileno).st_size
|
fsize = os.fstat(fileno).st_size
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
raise events.SendfileNotAvailableError("not a regular file")
|
raise exceptions.SendfileNotAvailableError("not a regular file")
|
||||||
blocksize = count if count else fsize
|
blocksize = count if count else fsize
|
||||||
if not blocksize:
|
if not blocksize:
|
||||||
return 0 # empty file
|
return 0 # empty file
|
||||||
|
@ -615,7 +616,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
||||||
if f is not None:
|
if f is not None:
|
||||||
f.result() # may raise
|
f.result() # may raise
|
||||||
f = self._proactor.recv(self._ssock, 4096)
|
f = self._proactor.recv(self._ssock, 4096)
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
# _close_self_pipe() has been called, stop waiting for data
|
# _close_self_pipe() has been called, stop waiting for data
|
||||||
return
|
return
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
@ -666,7 +667,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
||||||
elif self._debug:
|
elif self._debug:
|
||||||
logger.debug("Accept failed on socket %r",
|
logger.debug("Accept failed on socket %r",
|
||||||
sock, exc_info=True)
|
sock, exc_info=True)
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
sock.close()
|
sock.close()
|
||||||
else:
|
else:
|
||||||
self._accept_futures[sock.fileno()] = f
|
self._accept_futures[sock.fileno()] = f
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
__all__ = (
|
__all__ = (
|
||||||
'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
|
'StreamReader', 'StreamWriter', 'StreamReaderProtocol',
|
||||||
'open_connection', 'start_server',
|
'open_connection', 'start_server')
|
||||||
'IncompleteReadError', 'LimitOverrunError',
|
|
||||||
)
|
|
||||||
|
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
|
@ -11,6 +9,7 @@ if hasattr(socket, 'AF_UNIX'):
|
||||||
|
|
||||||
from . import coroutines
|
from . import coroutines
|
||||||
from . import events
|
from . import events
|
||||||
|
from . import exceptions
|
||||||
from . import protocols
|
from . import protocols
|
||||||
from .log import logger
|
from .log import logger
|
||||||
from .tasks import sleep
|
from .tasks import sleep
|
||||||
|
@ -19,37 +18,6 @@ from .tasks import sleep
|
||||||
_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
|
_DEFAULT_LIMIT = 2 ** 16 # 64 KiB
|
||||||
|
|
||||||
|
|
||||||
class IncompleteReadError(EOFError):
|
|
||||||
"""
|
|
||||||
Incomplete read error. Attributes:
|
|
||||||
|
|
||||||
- partial: read bytes string before the end of stream was reached
|
|
||||||
- expected: total number of expected bytes (or None if unknown)
|
|
||||||
"""
|
|
||||||
def __init__(self, partial, expected):
|
|
||||||
super().__init__(f'{len(partial)} bytes read on a total of '
|
|
||||||
f'{expected!r} expected bytes')
|
|
||||||
self.partial = partial
|
|
||||||
self.expected = expected
|
|
||||||
|
|
||||||
def __reduce__(self):
|
|
||||||
return type(self), (self.partial, self.expected)
|
|
||||||
|
|
||||||
|
|
||||||
class LimitOverrunError(Exception):
|
|
||||||
"""Reached the buffer limit while looking for a separator.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
- consumed: total number of to be consumed bytes.
|
|
||||||
"""
|
|
||||||
def __init__(self, message, consumed):
|
|
||||||
super().__init__(message)
|
|
||||||
self.consumed = consumed
|
|
||||||
|
|
||||||
def __reduce__(self):
|
|
||||||
return type(self), (self.args[0], self.consumed)
|
|
||||||
|
|
||||||
|
|
||||||
async def open_connection(host=None, port=None, *,
|
async def open_connection(host=None, port=None, *,
|
||||||
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
loop=None, limit=_DEFAULT_LIMIT, **kwds):
|
||||||
"""A wrapper for create_connection() returning a (reader, writer) pair.
|
"""A wrapper for create_connection() returning a (reader, writer) pair.
|
||||||
|
@ -494,9 +462,9 @@ class StreamReader:
|
||||||
seplen = len(sep)
|
seplen = len(sep)
|
||||||
try:
|
try:
|
||||||
line = await self.readuntil(sep)
|
line = await self.readuntil(sep)
|
||||||
except IncompleteReadError as e:
|
except exceptions.IncompleteReadError as e:
|
||||||
return e.partial
|
return e.partial
|
||||||
except LimitOverrunError as e:
|
except exceptions.LimitOverrunError as e:
|
||||||
if self._buffer.startswith(sep, e.consumed):
|
if self._buffer.startswith(sep, e.consumed):
|
||||||
del self._buffer[:e.consumed + seplen]
|
del self._buffer[:e.consumed + seplen]
|
||||||
else:
|
else:
|
||||||
|
@ -571,7 +539,7 @@ class StreamReader:
|
||||||
# see upper comment for explanation.
|
# see upper comment for explanation.
|
||||||
offset = buflen + 1 - seplen
|
offset = buflen + 1 - seplen
|
||||||
if offset > self._limit:
|
if offset > self._limit:
|
||||||
raise LimitOverrunError(
|
raise exceptions.LimitOverrunError(
|
||||||
'Separator is not found, and chunk exceed the limit',
|
'Separator is not found, and chunk exceed the limit',
|
||||||
offset)
|
offset)
|
||||||
|
|
||||||
|
@ -582,13 +550,13 @@ class StreamReader:
|
||||||
if self._eof:
|
if self._eof:
|
||||||
chunk = bytes(self._buffer)
|
chunk = bytes(self._buffer)
|
||||||
self._buffer.clear()
|
self._buffer.clear()
|
||||||
raise IncompleteReadError(chunk, None)
|
raise exceptions.IncompleteReadError(chunk, None)
|
||||||
|
|
||||||
# _wait_for_data() will resume reading if stream was paused.
|
# _wait_for_data() will resume reading if stream was paused.
|
||||||
await self._wait_for_data('readuntil')
|
await self._wait_for_data('readuntil')
|
||||||
|
|
||||||
if isep > self._limit:
|
if isep > self._limit:
|
||||||
raise LimitOverrunError(
|
raise exceptions.LimitOverrunError(
|
||||||
'Separator is found, but chunk is longer than limit', isep)
|
'Separator is found, but chunk is longer than limit', isep)
|
||||||
|
|
||||||
chunk = self._buffer[:isep + seplen]
|
chunk = self._buffer[:isep + seplen]
|
||||||
|
@ -674,7 +642,7 @@ class StreamReader:
|
||||||
if self._eof:
|
if self._eof:
|
||||||
incomplete = bytes(self._buffer)
|
incomplete = bytes(self._buffer)
|
||||||
self._buffer.clear()
|
self._buffer.clear()
|
||||||
raise IncompleteReadError(incomplete, n)
|
raise exceptions.IncompleteReadError(incomplete, n)
|
||||||
|
|
||||||
await self._wait_for_data('readexactly')
|
await self._wait_for_data('readexactly')
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,7 @@ import weakref
|
||||||
from . import base_tasks
|
from . import base_tasks
|
||||||
from . import coroutines
|
from . import coroutines
|
||||||
from . import events
|
from . import events
|
||||||
|
from . import exceptions
|
||||||
from . import futures
|
from . import futures
|
||||||
from .coroutines import coroutine
|
from .coroutines import coroutine
|
||||||
|
|
||||||
|
@ -228,11 +229,11 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
|
||||||
|
|
||||||
def __step(self, exc=None):
|
def __step(self, exc=None):
|
||||||
if self.done():
|
if self.done():
|
||||||
raise futures.InvalidStateError(
|
raise exceptions.InvalidStateError(
|
||||||
f'_step(): already done: {self!r}, {exc!r}')
|
f'_step(): already done: {self!r}, {exc!r}')
|
||||||
if self._must_cancel:
|
if self._must_cancel:
|
||||||
if not isinstance(exc, futures.CancelledError):
|
if not isinstance(exc, exceptions.CancelledError):
|
||||||
exc = futures.CancelledError()
|
exc = exceptions.CancelledError()
|
||||||
self._must_cancel = False
|
self._must_cancel = False
|
||||||
coro = self._coro
|
coro = self._coro
|
||||||
self._fut_waiter = None
|
self._fut_waiter = None
|
||||||
|
@ -250,10 +251,10 @@ class Task(futures._PyFuture): # Inherit Python Task implementation
|
||||||
if self._must_cancel:
|
if self._must_cancel:
|
||||||
# Task is cancelled right before coro stops.
|
# Task is cancelled right before coro stops.
|
||||||
self._must_cancel = False
|
self._must_cancel = False
|
||||||
super().set_exception(futures.CancelledError())
|
super().set_exception(exceptions.CancelledError())
|
||||||
else:
|
else:
|
||||||
super().set_result(exc.value)
|
super().set_result(exc.value)
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
super().cancel() # I.e., Future.cancel(self).
|
super().cancel() # I.e., Future.cancel(self).
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
super().set_exception(exc)
|
super().set_exception(exc)
|
||||||
|
@ -419,7 +420,7 @@ async def wait_for(fut, timeout, *, loop=None):
|
||||||
return fut.result()
|
return fut.result()
|
||||||
|
|
||||||
fut.cancel()
|
fut.cancel()
|
||||||
raise futures.TimeoutError()
|
raise exceptions.TimeoutError()
|
||||||
|
|
||||||
waiter = loop.create_future()
|
waiter = loop.create_future()
|
||||||
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
|
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
|
||||||
|
@ -432,7 +433,7 @@ async def wait_for(fut, timeout, *, loop=None):
|
||||||
# wait until the future completes or the timeout
|
# wait until the future completes or the timeout
|
||||||
try:
|
try:
|
||||||
await waiter
|
await waiter
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
fut.remove_done_callback(cb)
|
fut.remove_done_callback(cb)
|
||||||
fut.cancel()
|
fut.cancel()
|
||||||
raise
|
raise
|
||||||
|
@ -445,7 +446,7 @@ async def wait_for(fut, timeout, *, loop=None):
|
||||||
# after wait_for() returns.
|
# after wait_for() returns.
|
||||||
# See https://bugs.python.org/issue32751
|
# See https://bugs.python.org/issue32751
|
||||||
await _cancel_and_wait(fut, loop=loop)
|
await _cancel_and_wait(fut, loop=loop)
|
||||||
raise futures.TimeoutError()
|
raise exceptions.TimeoutError()
|
||||||
finally:
|
finally:
|
||||||
timeout_handle.cancel()
|
timeout_handle.cancel()
|
||||||
|
|
||||||
|
@ -554,7 +555,7 @@ def as_completed(fs, *, loop=None, timeout=None):
|
||||||
f = await done.get()
|
f = await done.get()
|
||||||
if f is None:
|
if f is None:
|
||||||
# Dummy value from _on_timeout().
|
# Dummy value from _on_timeout().
|
||||||
raise futures.TimeoutError
|
raise exceptions.TimeoutError
|
||||||
return f.result() # May raise f.exception().
|
return f.result() # May raise f.exception().
|
||||||
|
|
||||||
for f in todo:
|
for f in todo:
|
||||||
|
@ -701,7 +702,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
|
||||||
# Check if 'fut' is cancelled first, as
|
# Check if 'fut' is cancelled first, as
|
||||||
# 'fut.exception()' will *raise* a CancelledError
|
# 'fut.exception()' will *raise* a CancelledError
|
||||||
# instead of returning it.
|
# instead of returning it.
|
||||||
exc = futures.CancelledError()
|
exc = exceptions.CancelledError()
|
||||||
outer.set_exception(exc)
|
outer.set_exception(exc)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
|
@ -720,7 +721,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
|
||||||
# Check if 'fut' is cancelled first, as
|
# Check if 'fut' is cancelled first, as
|
||||||
# 'fut.exception()' will *raise* a CancelledError
|
# 'fut.exception()' will *raise* a CancelledError
|
||||||
# instead of returning it.
|
# instead of returning it.
|
||||||
res = futures.CancelledError()
|
res = exceptions.CancelledError()
|
||||||
else:
|
else:
|
||||||
res = fut.exception()
|
res = fut.exception()
|
||||||
if res is None:
|
if res is None:
|
||||||
|
@ -731,7 +732,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
|
||||||
# If gather is being cancelled we must propagate the
|
# If gather is being cancelled we must propagate the
|
||||||
# cancellation regardless of *return_exceptions* argument.
|
# cancellation regardless of *return_exceptions* argument.
|
||||||
# See issue 32684.
|
# See issue 32684.
|
||||||
outer.set_exception(futures.CancelledError())
|
outer.set_exception(exceptions.CancelledError())
|
||||||
else:
|
else:
|
||||||
outer.set_result(results)
|
outer.set_result(results)
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ from . import base_subprocess
|
||||||
from . import constants
|
from . import constants
|
||||||
from . import coroutines
|
from . import coroutines
|
||||||
from . import events
|
from . import events
|
||||||
|
from . import exceptions
|
||||||
from . import futures
|
from . import futures
|
||||||
from . import selector_events
|
from . import selector_events
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
@ -319,16 +320,16 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
||||||
try:
|
try:
|
||||||
os.sendfile
|
os.sendfile
|
||||||
except AttributeError as exc:
|
except AttributeError as exc:
|
||||||
raise events.SendfileNotAvailableError(
|
raise exceptions.SendfileNotAvailableError(
|
||||||
"os.sendfile() is not available")
|
"os.sendfile() is not available")
|
||||||
try:
|
try:
|
||||||
fileno = file.fileno()
|
fileno = file.fileno()
|
||||||
except (AttributeError, io.UnsupportedOperation) as err:
|
except (AttributeError, io.UnsupportedOperation) as err:
|
||||||
raise events.SendfileNotAvailableError("not a regular file")
|
raise exceptions.SendfileNotAvailableError("not a regular file")
|
||||||
try:
|
try:
|
||||||
fsize = os.fstat(fileno).st_size
|
fsize = os.fstat(fileno).st_size
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
raise events.SendfileNotAvailableError("not a regular file")
|
raise exceptions.SendfileNotAvailableError("not a regular file")
|
||||||
blocksize = count if count else fsize
|
blocksize = count if count else fsize
|
||||||
if not blocksize:
|
if not blocksize:
|
||||||
return 0 # empty file
|
return 0 # empty file
|
||||||
|
@ -382,7 +383,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
||||||
# one being 'file' is not a regular mmap(2)-like
|
# one being 'file' is not a regular mmap(2)-like
|
||||||
# file, in which case we'll fall back on using
|
# file, in which case we'll fall back on using
|
||||||
# plain send().
|
# plain send().
|
||||||
err = events.SendfileNotAvailableError(
|
err = exceptions.SendfileNotAvailableError(
|
||||||
"os.sendfile call failed")
|
"os.sendfile call failed")
|
||||||
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
|
self._sock_sendfile_update_filepos(fileno, offset, total_sent)
|
||||||
fut.set_exception(err)
|
fut.set_exception(err)
|
||||||
|
|
|
@ -12,6 +12,7 @@ import weakref
|
||||||
from . import events
|
from . import events
|
||||||
from . import base_subprocess
|
from . import base_subprocess
|
||||||
from . import futures
|
from . import futures
|
||||||
|
from . import exceptions
|
||||||
from . import proactor_events
|
from . import proactor_events
|
||||||
from . import selector_events
|
from . import selector_events
|
||||||
from . import tasks
|
from . import tasks
|
||||||
|
@ -351,7 +352,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
||||||
elif self._debug:
|
elif self._debug:
|
||||||
logger.warning("Accept pipe failed on pipe %r",
|
logger.warning("Accept pipe failed on pipe %r",
|
||||||
pipe, exc_info=True)
|
pipe, exc_info=True)
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
if pipe:
|
if pipe:
|
||||||
pipe.close()
|
pipe.close()
|
||||||
else:
|
else:
|
||||||
|
@ -497,7 +498,7 @@ class IocpProactor:
|
||||||
# Coroutine closing the accept socket if the future is cancelled
|
# Coroutine closing the accept socket if the future is cancelled
|
||||||
try:
|
try:
|
||||||
await future
|
await future
|
||||||
except futures.CancelledError:
|
except exceptions.CancelledError:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
|
@ -1946,7 +1946,7 @@ class BaseLoopSockSendfileTests(test_utils.TestCase):
|
||||||
def test__sock_sendfile_native_failure(self):
|
def test__sock_sendfile_native_failure(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
|
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"sendfile is not available"):
|
"sendfile is not available"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
|
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -1957,7 +1957,7 @@ class BaseLoopSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_no_fallback(self):
|
def test_sock_sendfile_no_fallback(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
|
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"sendfile is not available"):
|
"sendfile is not available"):
|
||||||
self.run_loop(self.loop.sock_sendfile(sock, self.file,
|
self.run_loop(self.loop.sock_sendfile(sock, self.file,
|
||||||
fallback=False))
|
fallback=False))
|
||||||
|
|
|
@ -2393,7 +2393,7 @@ class SendfileMixin(SendfileBase):
|
||||||
|
|
||||||
self.loop._sendfile_native = sendfile_native
|
self.loop._sendfile_native = sendfile_native
|
||||||
|
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not supported"):
|
"not supported"):
|
||||||
self.run_loop(
|
self.run_loop(
|
||||||
self.loop.sendfile(cli_proto.transport, self.file,
|
self.loop.sendfile(cli_proto.transport, self.file,
|
||||||
|
|
|
@ -951,7 +951,7 @@ class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_not_a_file(self):
|
def test_sock_sendfile_not_a_file(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = object()
|
f = object()
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -960,7 +960,7 @@ class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_iobuffer(self):
|
def test_sock_sendfile_iobuffer(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = io.BytesIO()
|
f = io.BytesIO()
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -970,7 +970,7 @@ class ProactorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = mock.Mock()
|
f = mock.Mock()
|
||||||
f.fileno.return_value = -1
|
f.fileno.return_value = -1
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
|
|
@ -521,7 +521,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_not_available(self):
|
def test_sock_sendfile_not_available(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
with mock.patch('asyncio.unix_events.os', spec=[]):
|
with mock.patch('asyncio.unix_events.os', spec=[]):
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"os[.]sendfile[(][)] is not available"):
|
"os[.]sendfile[(][)] is not available"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
|
self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -530,7 +530,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_not_a_file(self):
|
def test_sock_sendfile_not_a_file(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = object()
|
f = object()
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -539,7 +539,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
def test_sock_sendfile_iobuffer(self):
|
def test_sock_sendfile_iobuffer(self):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = io.BytesIO()
|
f = io.BytesIO()
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -549,7 +549,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
sock, proto = self.prepare()
|
sock, proto = self.prepare()
|
||||||
f = mock.Mock()
|
f = mock.Mock()
|
||||||
f.fileno.return_value = -1
|
f.fileno.return_value = -1
|
||||||
with self.assertRaisesRegex(events.SendfileNotAvailableError,
|
with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
|
||||||
"not a regular file"):
|
"not a regular file"):
|
||||||
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
self.run_loop(self.loop._sock_sendfile_native(sock, f,
|
||||||
0, None))
|
0, None))
|
||||||
|
@ -605,7 +605,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
with self.assertRaises(KeyError):
|
with self.assertRaises(KeyError):
|
||||||
self.loop._selector.get_key(sock)
|
self.loop._selector.get_key(sock)
|
||||||
exc = fut.exception()
|
exc = fut.exception()
|
||||||
self.assertIsInstance(exc, events.SendfileNotAvailableError)
|
self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
|
||||||
self.assertEqual(0, self.file.tell())
|
self.assertEqual(0, self.file.tell())
|
||||||
|
|
||||||
def test_sock_sendfile_os_error_next_call(self):
|
def test_sock_sendfile_os_error_next_call(self):
|
||||||
|
@ -630,7 +630,7 @@ class SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
|
||||||
|
|
||||||
fileno = self.file.fileno()
|
fileno = self.file.fileno()
|
||||||
fut = self.loop.create_future()
|
fut = self.loop.create_future()
|
||||||
err = events.SendfileNotAvailableError()
|
err = asyncio.SendfileNotAvailableError()
|
||||||
with mock.patch('os.sendfile', side_effect=err):
|
with mock.patch('os.sendfile', side_effect=err):
|
||||||
self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
|
self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
|
||||||
sock, fileno,
|
sock, fileno,
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
Create a dedicated ``asyncio.CancelledError``, ``asyncio.InvalidStateError``
|
||||||
|
and ``asyncio.TimeoutError`` exception classes. Inherit them from
|
||||||
|
corresponding exceptions from ``concurrent.futures`` package. Extract
|
||||||
|
``asyncio`` exceptions into a separate file.
|
|
@ -3306,6 +3306,8 @@ module_init(void)
|
||||||
|
|
||||||
WITH_MOD("asyncio.base_futures")
|
WITH_MOD("asyncio.base_futures")
|
||||||
GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info")
|
GET_MOD_ATTR(asyncio_future_repr_info_func, "_future_repr_info")
|
||||||
|
|
||||||
|
WITH_MOD("asyncio.exceptions")
|
||||||
GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError")
|
GET_MOD_ATTR(asyncio_InvalidStateError, "InvalidStateError")
|
||||||
GET_MOD_ATTR(asyncio_CancelledError, "CancelledError")
|
GET_MOD_ATTR(asyncio_CancelledError, "CancelledError")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue