693 lines
22 KiB
Python
693 lines
22 KiB
Python
"""Event loop and event loop policy."""
|
|
|
|
__all__ = ['AbstractEventLoopPolicy',
|
|
'AbstractEventLoop', 'AbstractServer',
|
|
'Handle', 'TimerHandle',
|
|
'get_event_loop_policy', 'set_event_loop_policy',
|
|
'get_event_loop', 'set_event_loop', 'new_event_loop',
|
|
'get_child_watcher', 'set_child_watcher',
|
|
'_set_running_loop', '_get_running_loop',
|
|
]
|
|
|
|
import functools
|
|
import inspect
|
|
import os
|
|
import reprlib
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import traceback
|
|
|
|
|
|
def _get_function_source(func):
|
|
func = inspect.unwrap(func)
|
|
if inspect.isfunction(func):
|
|
code = func.__code__
|
|
return (code.co_filename, code.co_firstlineno)
|
|
if isinstance(func, functools.partial):
|
|
return _get_function_source(func.func)
|
|
if isinstance(func, functools.partialmethod):
|
|
return _get_function_source(func.func)
|
|
return None
|
|
|
|
|
|
def _format_args_and_kwargs(args, kwargs):
|
|
"""Format function arguments and keyword arguments.
|
|
|
|
Special case for a single parameter: ('hello',) is formatted as ('hello').
|
|
"""
|
|
# use reprlib to limit the length of the output
|
|
items = []
|
|
if args:
|
|
items.extend(reprlib.repr(arg) for arg in args)
|
|
if kwargs:
|
|
items.extend('{}={}'.format(k, reprlib.repr(v))
|
|
for k, v in kwargs.items())
|
|
return '(' + ', '.join(items) + ')'
|
|
|
|
|
|
def _format_callback(func, args, kwargs, suffix=''):
|
|
if isinstance(func, functools.partial):
|
|
suffix = _format_args_and_kwargs(args, kwargs) + suffix
|
|
return _format_callback(func.func, func.args, func.keywords, suffix)
|
|
|
|
if hasattr(func, '__qualname__'):
|
|
func_repr = getattr(func, '__qualname__')
|
|
elif hasattr(func, '__name__'):
|
|
func_repr = getattr(func, '__name__')
|
|
else:
|
|
func_repr = repr(func)
|
|
|
|
func_repr += _format_args_and_kwargs(args, kwargs)
|
|
if suffix:
|
|
func_repr += suffix
|
|
return func_repr
|
|
|
|
def _format_callback_source(func, args):
|
|
func_repr = _format_callback(func, args, None)
|
|
source = _get_function_source(func)
|
|
if source:
|
|
func_repr += ' at %s:%s' % source
|
|
return func_repr
|
|
|
|
|
|
class Handle:
|
|
"""Object returned by callback registration methods."""
|
|
|
|
__slots__ = ('_callback', '_args', '_cancelled', '_loop',
|
|
'_source_traceback', '_repr', '__weakref__')
|
|
|
|
def __init__(self, callback, args, loop):
|
|
self._loop = loop
|
|
self._callback = callback
|
|
self._args = args
|
|
self._cancelled = False
|
|
self._repr = None
|
|
if self._loop.get_debug():
|
|
self._source_traceback = traceback.extract_stack(sys._getframe(1))
|
|
else:
|
|
self._source_traceback = None
|
|
|
|
def _repr_info(self):
|
|
info = [self.__class__.__name__]
|
|
if self._cancelled:
|
|
info.append('cancelled')
|
|
if self._callback is not None:
|
|
info.append(_format_callback_source(self._callback, self._args))
|
|
if self._source_traceback:
|
|
frame = self._source_traceback[-1]
|
|
info.append('created at %s:%s' % (frame[0], frame[1]))
|
|
return info
|
|
|
|
def __repr__(self):
|
|
if self._repr is not None:
|
|
return self._repr
|
|
info = self._repr_info()
|
|
return '<%s>' % ' '.join(info)
|
|
|
|
def cancel(self):
|
|
if not self._cancelled:
|
|
self._cancelled = True
|
|
if self._loop.get_debug():
|
|
# Keep a representation in debug mode to keep callback and
|
|
# parameters. For example, to log the warning
|
|
# "Executing <Handle...> took 2.5 second"
|
|
self._repr = repr(self)
|
|
self._callback = None
|
|
self._args = None
|
|
|
|
def _run(self):
|
|
try:
|
|
self._callback(*self._args)
|
|
except Exception as exc:
|
|
cb = _format_callback_source(self._callback, self._args)
|
|
msg = 'Exception in callback {}'.format(cb)
|
|
context = {
|
|
'message': msg,
|
|
'exception': exc,
|
|
'handle': self,
|
|
}
|
|
if self._source_traceback:
|
|
context['source_traceback'] = self._source_traceback
|
|
self._loop.call_exception_handler(context)
|
|
self = None # Needed to break cycles when an exception occurs.
|
|
|
|
|
|
class TimerHandle(Handle):
|
|
"""Object returned by timed callback registration methods."""
|
|
|
|
__slots__ = ['_scheduled', '_when']
|
|
|
|
def __init__(self, when, callback, args, loop):
|
|
assert when is not None
|
|
super().__init__(callback, args, loop)
|
|
if self._source_traceback:
|
|
del self._source_traceback[-1]
|
|
self._when = when
|
|
self._scheduled = False
|
|
|
|
def _repr_info(self):
|
|
info = super()._repr_info()
|
|
pos = 2 if self._cancelled else 1
|
|
info.insert(pos, 'when=%s' % self._when)
|
|
return info
|
|
|
|
def __hash__(self):
|
|
return hash(self._when)
|
|
|
|
def __lt__(self, other):
|
|
return self._when < other._when
|
|
|
|
def __le__(self, other):
|
|
if self._when < other._when:
|
|
return True
|
|
return self.__eq__(other)
|
|
|
|
def __gt__(self, other):
|
|
return self._when > other._when
|
|
|
|
def __ge__(self, other):
|
|
if self._when > other._when:
|
|
return True
|
|
return self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if isinstance(other, TimerHandle):
|
|
return (self._when == other._when and
|
|
self._callback == other._callback and
|
|
self._args == other._args and
|
|
self._cancelled == other._cancelled)
|
|
return NotImplemented
|
|
|
|
def __ne__(self, other):
|
|
equal = self.__eq__(other)
|
|
return NotImplemented if equal is NotImplemented else not equal
|
|
|
|
def cancel(self):
|
|
if not self._cancelled:
|
|
self._loop._timer_handle_cancelled(self)
|
|
super().cancel()
|
|
|
|
|
|
class AbstractServer:
|
|
"""Abstract server returned by create_server()."""
|
|
|
|
def close(self):
|
|
"""Stop serving. This leaves existing connections open."""
|
|
return NotImplemented
|
|
|
|
def wait_closed(self):
|
|
"""Coroutine to wait until service is closed."""
|
|
return NotImplemented
|
|
|
|
|
|
class AbstractEventLoop:
|
|
"""Abstract event loop."""
|
|
|
|
# Running and stopping the event loop.
|
|
|
|
def run_forever(self):
|
|
"""Run the event loop until stop() is called."""
|
|
raise NotImplementedError
|
|
|
|
def run_until_complete(self, future):
|
|
"""Run the event loop until a Future is done.
|
|
|
|
Return the Future's result, or raise its exception.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def stop(self):
|
|
"""Stop the event loop as soon as reasonable.
|
|
|
|
Exactly how soon that is may depend on the implementation, but
|
|
no more I/O callbacks should be scheduled.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def is_running(self):
|
|
"""Return whether the event loop is currently running."""
|
|
raise NotImplementedError
|
|
|
|
def is_closed(self):
|
|
"""Returns True if the event loop was closed."""
|
|
raise NotImplementedError
|
|
|
|
def close(self):
|
|
"""Close the loop.
|
|
|
|
The loop should not be running.
|
|
|
|
This is idempotent and irreversible.
|
|
|
|
No other methods should be called after this one.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def shutdown_asyncgens(self):
|
|
"""Shutdown all active asynchronous generators."""
|
|
raise NotImplementedError
|
|
|
|
# Methods scheduling callbacks. All these return Handles.
|
|
|
|
def _timer_handle_cancelled(self, handle):
|
|
"""Notification that a TimerHandle has been cancelled."""
|
|
raise NotImplementedError
|
|
|
|
def call_soon(self, callback, *args):
|
|
return self.call_later(0, callback, *args)
|
|
|
|
def call_later(self, delay, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def call_at(self, when, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def time(self):
|
|
raise NotImplementedError
|
|
|
|
def create_future(self):
|
|
raise NotImplementedError
|
|
|
|
# Method scheduling a coroutine object: create a task.
|
|
|
|
def create_task(self, coro):
|
|
raise NotImplementedError
|
|
|
|
# Methods for interacting with threads.
|
|
|
|
def call_soon_threadsafe(self, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def run_in_executor(self, executor, func, *args):
|
|
raise NotImplementedError
|
|
|
|
def set_default_executor(self, executor):
|
|
raise NotImplementedError
|
|
|
|
# Network I/O methods returning Futures.
|
|
|
|
def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
|
|
raise NotImplementedError
|
|
|
|
def getnameinfo(self, sockaddr, flags=0):
|
|
raise NotImplementedError
|
|
|
|
def create_connection(self, protocol_factory, host=None, port=None, *,
|
|
ssl=None, family=0, proto=0, flags=0, sock=None,
|
|
local_addr=None, server_hostname=None):
|
|
raise NotImplementedError
|
|
|
|
def create_server(self, protocol_factory, host=None, port=None, *,
|
|
family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
|
|
sock=None, backlog=100, ssl=None, reuse_address=None,
|
|
reuse_port=None):
|
|
"""A coroutine which creates a TCP server bound to host and port.
|
|
|
|
The return value is a Server object which can be used to stop
|
|
the service.
|
|
|
|
If host is an empty string or None all interfaces are assumed
|
|
and a list of multiple sockets will be returned (most likely
|
|
one for IPv4 and another one for IPv6). The host parameter can also be a
|
|
sequence (e.g. list) of hosts to bind to.
|
|
|
|
family can be set to either AF_INET or AF_INET6 to force the
|
|
socket to use IPv4 or IPv6. If not set it will be determined
|
|
from host (defaults to AF_UNSPEC).
|
|
|
|
flags is a bitmask for getaddrinfo().
|
|
|
|
sock can optionally be specified in order to use a preexisting
|
|
socket object.
|
|
|
|
backlog is the maximum number of queued connections passed to
|
|
listen() (defaults to 100).
|
|
|
|
ssl can be set to an SSLContext to enable SSL over the
|
|
accepted connections.
|
|
|
|
reuse_address tells the kernel to reuse a local socket in
|
|
TIME_WAIT state, without waiting for its natural timeout to
|
|
expire. If not specified will automatically be set to True on
|
|
UNIX.
|
|
|
|
reuse_port tells the kernel to allow this endpoint to be bound to
|
|
the same port as other existing endpoints are bound to, so long as
|
|
they all set this flag when being created. This option is not
|
|
supported on Windows.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def create_unix_connection(self, protocol_factory, path, *,
|
|
ssl=None, sock=None,
|
|
server_hostname=None):
|
|
raise NotImplementedError
|
|
|
|
def create_unix_server(self, protocol_factory, path, *,
|
|
sock=None, backlog=100, ssl=None):
|
|
"""A coroutine which creates a UNIX Domain Socket server.
|
|
|
|
The return value is a Server object, which can be used to stop
|
|
the service.
|
|
|
|
path is a str, representing a file systsem path to bind the
|
|
server socket to.
|
|
|
|
sock can optionally be specified in order to use a preexisting
|
|
socket object.
|
|
|
|
backlog is the maximum number of queued connections passed to
|
|
listen() (defaults to 100).
|
|
|
|
ssl can be set to an SSLContext to enable SSL over the
|
|
accepted connections.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
def create_datagram_endpoint(self, protocol_factory,
|
|
local_addr=None, remote_addr=None, *,
|
|
family=0, proto=0, flags=0,
|
|
reuse_address=None, reuse_port=None,
|
|
allow_broadcast=None, sock=None):
|
|
"""A coroutine which creates a datagram endpoint.
|
|
|
|
This method will try to establish the endpoint in the background.
|
|
When successful, the coroutine returns a (transport, protocol) pair.
|
|
|
|
protocol_factory must be a callable returning a protocol instance.
|
|
|
|
socket family AF_INET or socket.AF_INET6 depending on host (or
|
|
family if specified), socket type SOCK_DGRAM.
|
|
|
|
reuse_address tells the kernel to reuse a local socket in
|
|
TIME_WAIT state, without waiting for its natural timeout to
|
|
expire. If not specified it will automatically be set to True on
|
|
UNIX.
|
|
|
|
reuse_port tells the kernel to allow this endpoint to be bound to
|
|
the same port as other existing endpoints are bound to, so long as
|
|
they all set this flag when being created. This option is not
|
|
supported on Windows and some UNIX's. If the
|
|
:py:data:`~socket.SO_REUSEPORT` constant is not defined then this
|
|
capability is unsupported.
|
|
|
|
allow_broadcast tells the kernel to allow this endpoint to send
|
|
messages to the broadcast address.
|
|
|
|
sock can optionally be specified in order to use a preexisting
|
|
socket object.
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
# Pipes and subprocesses.
|
|
|
|
def connect_read_pipe(self, protocol_factory, pipe):
|
|
"""Register read pipe in event loop. Set the pipe to non-blocking mode.
|
|
|
|
protocol_factory should instantiate object with Protocol interface.
|
|
pipe is a file-like object.
|
|
Return pair (transport, protocol), where transport supports the
|
|
ReadTransport interface."""
|
|
# The reason to accept file-like object instead of just file descriptor
|
|
# is: we need to own pipe and close it at transport finishing
|
|
# Can got complicated errors if pass f.fileno(),
|
|
# close fd in pipe transport then close f and vise versa.
|
|
raise NotImplementedError
|
|
|
|
def connect_write_pipe(self, protocol_factory, pipe):
|
|
"""Register write pipe in event loop.
|
|
|
|
protocol_factory should instantiate object with BaseProtocol interface.
|
|
Pipe is file-like object already switched to nonblocking.
|
|
Return pair (transport, protocol), where transport support
|
|
WriteTransport interface."""
|
|
# The reason to accept file-like object instead of just file descriptor
|
|
# is: we need to own pipe and close it at transport finishing
|
|
# Can got complicated errors if pass f.fileno(),
|
|
# close fd in pipe transport then close f and vise versa.
|
|
raise NotImplementedError
|
|
|
|
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
**kwargs):
|
|
raise NotImplementedError
|
|
|
|
def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
**kwargs):
|
|
raise NotImplementedError
|
|
|
|
# Ready-based callback registration methods.
|
|
# The add_*() methods return None.
|
|
# The remove_*() methods return True if something was removed,
|
|
# False if there was nothing to delete.
|
|
|
|
def add_reader(self, fd, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def remove_reader(self, fd):
|
|
raise NotImplementedError
|
|
|
|
def add_writer(self, fd, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def remove_writer(self, fd):
|
|
raise NotImplementedError
|
|
|
|
# Completion based I/O methods returning Futures.
|
|
|
|
def sock_recv(self, sock, nbytes):
|
|
raise NotImplementedError
|
|
|
|
def sock_sendall(self, sock, data):
|
|
raise NotImplementedError
|
|
|
|
def sock_connect(self, sock, address):
|
|
raise NotImplementedError
|
|
|
|
def sock_accept(self, sock):
|
|
raise NotImplementedError
|
|
|
|
# Signal handling.
|
|
|
|
def add_signal_handler(self, sig, callback, *args):
|
|
raise NotImplementedError
|
|
|
|
def remove_signal_handler(self, sig):
|
|
raise NotImplementedError
|
|
|
|
# Task factory.
|
|
|
|
def set_task_factory(self, factory):
|
|
raise NotImplementedError
|
|
|
|
def get_task_factory(self):
|
|
raise NotImplementedError
|
|
|
|
# Error handlers.
|
|
|
|
def get_exception_handler(self):
|
|
raise NotImplementedError
|
|
|
|
def set_exception_handler(self, handler):
|
|
raise NotImplementedError
|
|
|
|
def default_exception_handler(self, context):
|
|
raise NotImplementedError
|
|
|
|
def call_exception_handler(self, context):
|
|
raise NotImplementedError
|
|
|
|
# Debug flag management.
|
|
|
|
def get_debug(self):
|
|
raise NotImplementedError
|
|
|
|
def set_debug(self, enabled):
|
|
raise NotImplementedError
|
|
|
|
|
|
class AbstractEventLoopPolicy:
|
|
"""Abstract policy for accessing the event loop."""
|
|
|
|
def get_event_loop(self):
|
|
"""Get the event loop for the current context.
|
|
|
|
Returns an event loop object implementing the BaseEventLoop interface,
|
|
or raises an exception in case no event loop has been set for the
|
|
current context and the current policy does not specify to create one.
|
|
|
|
It should never return None."""
|
|
raise NotImplementedError
|
|
|
|
def set_event_loop(self, loop):
|
|
"""Set the event loop for the current context to loop."""
|
|
raise NotImplementedError
|
|
|
|
def new_event_loop(self):
|
|
"""Create and return a new event loop object according to this
|
|
policy's rules. If there's need to set this loop as the event loop for
|
|
the current context, set_event_loop must be called explicitly."""
|
|
raise NotImplementedError
|
|
|
|
# Child processes handling (Unix only).
|
|
|
|
def get_child_watcher(self):
|
|
"Get the watcher for child processes."
|
|
raise NotImplementedError
|
|
|
|
def set_child_watcher(self, watcher):
|
|
"""Set the watcher for child processes."""
|
|
raise NotImplementedError
|
|
|
|
|
|
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
|
|
"""Default policy implementation for accessing the event loop.
|
|
|
|
In this policy, each thread has its own event loop. However, we
|
|
only automatically create an event loop by default for the main
|
|
thread; other threads by default have no event loop.
|
|
|
|
Other policies may have different rules (e.g. a single global
|
|
event loop, or automatically creating an event loop per thread, or
|
|
using some other notion of context to which an event loop is
|
|
associated).
|
|
"""
|
|
|
|
_loop_factory = None
|
|
|
|
class _Local(threading.local):
|
|
_loop = None
|
|
_set_called = False
|
|
|
|
def __init__(self):
|
|
self._local = self._Local()
|
|
|
|
def get_event_loop(self):
|
|
"""Get the event loop.
|
|
|
|
This may be None or an instance of EventLoop.
|
|
"""
|
|
if (self._local._loop is None and
|
|
not self._local._set_called and
|
|
isinstance(threading.current_thread(), threading._MainThread)):
|
|
self.set_event_loop(self.new_event_loop())
|
|
if self._local._loop is None:
|
|
raise RuntimeError('There is no current event loop in thread %r.'
|
|
% threading.current_thread().name)
|
|
return self._local._loop
|
|
|
|
def set_event_loop(self, loop):
|
|
"""Set the event loop."""
|
|
self._local._set_called = True
|
|
assert loop is None or isinstance(loop, AbstractEventLoop)
|
|
self._local._loop = loop
|
|
|
|
def new_event_loop(self):
|
|
"""Create a new event loop.
|
|
|
|
You must call set_event_loop() to make this the current event
|
|
loop.
|
|
"""
|
|
return self._loop_factory()
|
|
|
|
|
|
# Event loop policy. The policy itself is always global, even if the
|
|
# policy's rules say that there is an event loop per thread (or other
|
|
# notion of context). The default policy is installed by the first
|
|
# call to get_event_loop_policy().
|
|
_event_loop_policy = None
|
|
|
|
# Lock for protecting the on-the-fly creation of the event loop policy.
|
|
_lock = threading.Lock()
|
|
|
|
|
|
# A TLS for the running event loop, used by _get_running_loop.
|
|
class _RunningLoop(threading.local):
|
|
loop_pid = (None, None)
|
|
|
|
|
|
_running_loop = _RunningLoop()
|
|
|
|
|
|
def _get_running_loop():
|
|
"""Return the running event loop or None.
|
|
|
|
This is a low-level function intended to be used by event loops.
|
|
This function is thread-specific.
|
|
"""
|
|
running_loop, pid = _running_loop.loop_pid
|
|
if running_loop is not None and pid == os.getpid():
|
|
return running_loop
|
|
|
|
|
|
def _set_running_loop(loop):
|
|
"""Set the running event loop.
|
|
|
|
This is a low-level function intended to be used by event loops.
|
|
This function is thread-specific.
|
|
"""
|
|
_running_loop.loop_pid = (loop, os.getpid())
|
|
|
|
|
|
def _init_event_loop_policy():
|
|
global _event_loop_policy
|
|
with _lock:
|
|
if _event_loop_policy is None: # pragma: no branch
|
|
from . import DefaultEventLoopPolicy
|
|
_event_loop_policy = DefaultEventLoopPolicy()
|
|
|
|
|
|
def get_event_loop_policy():
|
|
"""Get the current event loop policy."""
|
|
if _event_loop_policy is None:
|
|
_init_event_loop_policy()
|
|
return _event_loop_policy
|
|
|
|
|
|
def set_event_loop_policy(policy):
|
|
"""Set the current event loop policy.
|
|
|
|
If policy is None, the default policy is restored."""
|
|
global _event_loop_policy
|
|
assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
|
|
_event_loop_policy = policy
|
|
|
|
|
|
def get_event_loop():
|
|
"""Return an asyncio event loop.
|
|
|
|
When called from a coroutine or a callback (e.g. scheduled with call_soon
|
|
or similar API), this function will always return the running event loop.
|
|
|
|
If there is no running event loop set, the function will return
|
|
the result of `get_event_loop_policy().get_event_loop()` call.
|
|
"""
|
|
current_loop = _get_running_loop()
|
|
if current_loop is not None:
|
|
return current_loop
|
|
return get_event_loop_policy().get_event_loop()
|
|
|
|
|
|
def set_event_loop(loop):
|
|
"""Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
|
|
get_event_loop_policy().set_event_loop(loop)
|
|
|
|
|
|
def new_event_loop():
|
|
"""Equivalent to calling get_event_loop_policy().new_event_loop()."""
|
|
return get_event_loop_policy().new_event_loop()
|
|
|
|
|
|
def get_child_watcher():
|
|
"""Equivalent to calling get_event_loop_policy().get_child_watcher()."""
|
|
return get_event_loop_policy().get_child_watcher()
|
|
|
|
|
|
def set_child_watcher(watcher):
|
|
"""Equivalent to calling
|
|
get_event_loop_policy().set_child_watcher(watcher)."""
|
|
return get_event_loop_policy().set_child_watcher(watcher)
|