This commit is contained in:
Raymond Hettinger 2015-01-29 22:02:17 -08:00
commit 91496a08d4
27 changed files with 530 additions and 173 deletions

View File

@ -40,6 +40,43 @@ Examples of effects of the debug mode:
<asyncio-logger>`.
Cancellation
------------
Cancellation of tasks is not common in classic programming. In asynchronous
programming, not only it is something common, but you have to prepare your
code to handle it.
Futures and tasks can be cancelled explicitly with their :meth:`Future.cancel`
method. The :func:`wait_for` function cancels the waited task when the timeout
occurs. There are many other cases where a task can be cancelled indirectly.
Don't call :meth:`~Future.set_result` or :meth:`~Future.set_exception` method
of :class:`Future` if the future is cancelled: it would fail with an exception.
For example, write::
if not fut.cancelled():
fut.set_result('done')
Don't schedule directly a call to the :meth:`~Future.set_result` or the
:meth:`~Future.set_exception` method of a future with
:meth:`BaseEventLoop.call_soon`: the future can be cancelled before its method
is called.
If you wait for a future, you should check early if the future was cancelled to
avoid useless operations. Example::
@coroutine
def slow_operation(fut):
if fut.cancelled():
return
# ... slow computation ...
yield from fut
# ...
The :func:`shield` function can also be used to ignore cancellation.
.. _asyncio-multithreading:
Concurrency and multithreading
@ -335,3 +372,14 @@ traceback where the task was created. Example of log in debug mode::
:ref:`Detect coroutine objects never scheduled <asyncio-coroutine-not-scheduled>`.
Close transports
----------------
When a transport is no more needed, call its ``close()`` method to release
resources.
If a transport (or an event loop) is not closed explicitly, a
:exc:`ResourceWarning` warning will be emitted in its destructor. The
:exc:`ResourceWarning` warnings are hidden by default: use the ``-Wd`` command
line option of Python to show them.

View File

@ -641,7 +641,7 @@ Server
Server listening on sockets.
Object created by the :meth:`BaseEventLoop.create_server` method and the
:func:`start_server` function. Don't instanciate the class directly.
:func:`start_server` function. Don't instantiate the class directly.
.. method:: close()

View File

@ -374,6 +374,14 @@ The following callbacks are called on :class:`Protocol` instances:
a connection. However, :meth:`eof_received` is called at most once
and, if called, :meth:`data_received` won't be called after it.
State machine:
start -> :meth:`~BaseProtocol.connection_made`
[-> :meth:`~Protocol.data_received` \*]
[-> :meth:`~Protocol.eof_received` ?]
-> :meth:`~BaseProtocol.connection_lost` -> end
Datagram protocols
------------------

View File

@ -4,6 +4,29 @@
Synchronization primitives
==========================
Locks:
* :class:`Lock`
* :class:`Event`
* :class:`Condition`
* :class:`Semaphore`
* :class:`BoundedSemaphore`
Queues:
* :class:`Queue`
* :class:`PriorityQueue`
* :class:`LifoQueue`
* :class:`JoinableQueue`
asyncio locks and queues API were designed to be close to classes of the
:mod:`threading` module (:class:`~threading.Lock`, :class:`~threading.Event`,
:class:`~threading.Condition`, :class:`~threading.Semaphore`,
:class:`~threading.BoundedSemaphore`) and the :mod:`queue` module
(:class:`~queue.Queue`, :class:`~queue.PriorityQueue`,
:class:`~queue.LifoQueue`), but they have no *timeout* parameter. The
:func:`asyncio.wait_for` function can be used to cancel a task after a timeout.
Locks
-----

View File

@ -485,6 +485,12 @@ Changes in the Python API
Changes in the C API
--------------------
* The undocumented :c:member:`~PyMemoryViewObject.format` member of the
(non-public) :c:type:`PyMemoryViewObject` structure has been removed.
All extensions relying on the relevant parts in ``memoryobject.h``
must be rebuilt.
* The :c:type:`PyMemAllocator` structure was renamed to
:c:type:`PyMemAllocatorEx` and a new ``calloc`` field was added.

View File

@ -45,9 +45,6 @@ typedef struct {
} _PyManagedBufferObject;
/* static storage used for casting between formats */
#define _Py_MEMORYVIEW_MAX_FORMAT 3 /* must be >= 3 */
/* memoryview state flags */
#define _Py_MEMORYVIEW_RELEASED 0x001 /* access to master buffer blocked */
#define _Py_MEMORYVIEW_C 0x002 /* C-contiguous layout */
@ -62,7 +59,6 @@ typedef struct {
int flags; /* state flags */
Py_ssize_t exports; /* number of buffer re-exports */
Py_buffer view; /* private copy of the exporter's view */
char format[_Py_MEMORYVIEW_MAX_FORMAT]; /* used for casting */
PyObject *weakreflist;
Py_ssize_t ob_array[1]; /* shape, strides, suboffsets */
} PyMemoryViewObject;

View File

@ -26,6 +26,7 @@ import threading
import time
import traceback
import sys
import warnings
from . import coroutines
from . import events
@ -333,6 +334,16 @@ class BaseEventLoop(events.AbstractEventLoop):
"""Returns True if the event loop was closed."""
return self._closed
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if not self.is_closed():
warnings.warn("unclosed event loop %r" % self, ResourceWarning)
if not self.is_running():
self.close()
def is_running(self):
"""Returns True if the event loop is running."""
return (self._owner is not None)

View File

@ -1,6 +1,9 @@
import collections
import subprocess
import sys
import warnings
from . import futures
from . import protocols
from . import transports
from .coroutines import coroutine
@ -11,26 +14,32 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
waiter=None, extra=None, **kwargs):
super().__init__(extra)
self._closed = False
self._protocol = protocol
self._loop = loop
self._proc = None
self._pid = None
self._returncode = None
self._exit_waiters = []
self._pending_calls = collections.deque()
self._pipes = {}
self._finished = False
if stdin == subprocess.PIPE:
self._pipes[0] = None
if stdout == subprocess.PIPE:
self._pipes[1] = None
if stderr == subprocess.PIPE:
self._pipes[2] = None
self._pending_calls = collections.deque()
self._finished = False
self._returncode = None
# Create the child process: set the _proc attribute
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._pid = self._proc.pid
self._extra['subprocess'] = self._proc
if self._loop.get_debug():
if isinstance(args, (bytes, str)):
program = args
@ -39,8 +48,13 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
logger.debug('process %r created: pid %s',
program, self._pid)
self._loop.create_task(self._connect_pipes(waiter))
def __repr__(self):
info = [self.__class__.__name__, 'pid=%s' % self._pid]
info = [self.__class__.__name__]
if self._closed:
info.append('closed')
info.append('pid=%s' % self._pid)
if self._returncode is not None:
info.append('returncode=%s' % self._returncode)
@ -70,12 +84,34 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
raise NotImplementedError
def close(self):
if self._closed:
return
self._closed = True
for proto in self._pipes.values():
if proto is None:
continue
proto.pipe.close()
if self._returncode is None:
self.terminate()
if self._proc is not None and self._returncode is None:
if self._loop.get_debug():
logger.warning('Close running child process: kill %r', self)
try:
self._proc.kill()
except ProcessLookupError:
pass
# Don't clear the _proc reference yet: _post_init() may still run
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if not self._closed:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self.close()
def get_pid(self):
return self._pid
@ -89,58 +125,40 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
else:
return None
def _check_proc(self):
if self._proc is None:
raise ProcessLookupError()
def send_signal(self, signal):
self._check_proc()
self._proc.send_signal(signal)
def terminate(self):
self._check_proc()
self._proc.terminate()
def kill(self):
self._check_proc()
self._proc.kill()
def _kill_wait(self):
"""Close pipes, kill the subprocess and read its return status.
Function called when an exception is raised during the creation
of a subprocess.
"""
if self._loop.get_debug():
logger.warning('Exception during subprocess creation, '
'kill the subprocess %r',
self,
exc_info=True)
proc = self._proc
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
if proc.stdin:
proc.stdin.close()
try:
proc.kill()
except ProcessLookupError:
pass
self._returncode = proc.wait()
self.close()
@coroutine
def _post_init(self):
def _connect_pipes(self, waiter):
try:
proc = self._proc
loop = self._loop
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
self._pipes[0] = pipe
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
self._pipes[1] = pipe
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
lambda: ReadSubprocessPipeProto(self, 2),
@ -149,13 +167,16 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
assert self._pending_calls is not None
self._loop.call_soon(self._protocol.connection_made, self)
loop.call_soon(self._protocol.connection_made, self)
for callback, data in self._pending_calls:
self._loop.call_soon(callback, *data)
loop.call_soon(callback, *data)
self._pending_calls = None
except:
self._kill_wait()
raise
except Exception as exc:
if waiter is not None and not waiter.cancelled():
waiter.set_exception(exc)
else:
if waiter is not None and not waiter.cancelled():
waiter.set_result(None)
def _call(self, cb, *data):
if self._pending_calls is not None:
@ -180,6 +201,23 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
self._call(self._protocol.process_exited)
self._try_finish()
# wake up futures waiting for wait()
for waiter in self._exit_waiters:
if not waiter.cancelled():
waiter.set_result(returncode)
self._exit_waiters = None
def _wait(self):
"""Wait until the process exit and return the process return code.
This method is a coroutine."""
if self._returncode is not None:
return self._returncode
waiter = futures.Future(loop=self._loop)
self._exit_waiters.append(waiter)
return (yield from waiter)
def _try_finish(self):
assert not self._finished
if self._returncode is None:
@ -193,9 +231,9 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
try:
self._protocol.connection_lost(exc)
finally:
self._loop = None
self._proc = None
self._protocol = None
self._loop = None
class WriteSubprocessPipeProto(protocols.BaseProtocol):

View File

@ -195,9 +195,9 @@ class Future:
info = self._repr_info()
return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
# On Python 3.3 or older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks to
# the PEP 442.
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if _PY34:
def __del__(self):
if not self._log_traceback:

View File

@ -7,6 +7,8 @@ proactor is only implemented on Windows with IOCP.
__all__ = ['BaseProactorEventLoop']
import socket
import sys
import warnings
from . import base_events
from . import constants
@ -74,6 +76,15 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._read_fut.cancel()
self._read_fut = None
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if self._sock is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
if self._loop.get_debug():

View File

@ -78,6 +78,11 @@ class Protocol(BaseProtocol):
State machine of calls:
start -> CM [-> DR*] [-> ER?] -> CL -> end
* CM: connection_made()
* DR: data_received()
* ER: eof_received()
* CL: connection_lost()
"""
def data_received(self, data):

View File

@ -10,6 +10,8 @@ import collections
import errno
import functools
import socket
import sys
import warnings
try:
import ssl
except ImportError: # pragma: no cover
@ -22,6 +24,7 @@ from . import futures
from . import selectors
from . import transports
from . import sslproto
from .coroutines import coroutine
from .log import logger
@ -181,16 +184,47 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
else:
raise # The event loop will catch, log and ignore it.
else:
extra = {'peername': addr}
accept = self._accept_connection2(protocol_factory, conn, extra,
sslcontext, server)
self.create_task(accept)
@coroutine
def _accept_connection2(self, protocol_factory, conn, extra,
sslcontext=None, server=None):
protocol = None
transport = None
try:
protocol = protocol_factory()
waiter = futures.Future(loop=self)
if sslcontext:
self._make_ssl_transport(
conn, protocol, sslcontext,
server_side=True, extra={'peername': addr}, server=server)
transport = self._make_ssl_transport(
conn, protocol, sslcontext, waiter=waiter,
server_side=True, extra=extra, server=server)
else:
self._make_socket_transport(
conn, protocol , extra={'peername': addr},
transport = self._make_socket_transport(
conn, protocol, waiter=waiter, extra=extra,
server=server)
try:
yield from waiter
except:
transport.close()
raise
# It's now up to the protocol to handle the connection.
except Exception as exc:
if self.get_debug():
context = {
'message': ('Error on transport creation '
'for incoming connection'),
'exception': exc,
}
if protocol is not None:
context['protocol'] = protocol
if transport is not None:
context['transport'] = transport
self.call_exception_handler(context)
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
@ -467,7 +501,12 @@ class _SelectorTransport(transports._FlowControlMixin,
_buffer_factory = bytearray # Constructs initial value for self._buffer.
def __init__(self, loop, sock, protocol, extra, server=None):
# Attribute used in the destructor: it must be set even if the constructor
# is not called (see _SelectorSslTransport which may start by raising an
# exception)
_sock = None
def __init__(self, loop, sock, protocol, extra=None, server=None):
super().__init__(extra, loop)
self._extra['socket'] = sock
self._extra['sockname'] = sock.getsockname()
@ -479,6 +518,7 @@ class _SelectorTransport(transports._FlowControlMixin,
self._sock = sock
self._sock_fd = sock.fileno()
self._protocol = protocol
self._protocol_connected = True
self._server = server
self._buffer = self._buffer_factory()
self._conn_lost = 0 # Set when call to connection_lost scheduled.
@ -526,6 +566,15 @@ class _SelectorTransport(transports._FlowControlMixin,
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, None)
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if self._sock is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self._sock.close()
def _fatal_error(self, exc, message='Fatal error on transport'):
# Should be called from exception handler only.
if isinstance(exc, (BrokenPipeError,
@ -555,6 +604,7 @@ class _SelectorTransport(transports._FlowControlMixin,
def _call_connection_lost(self, exc):
try:
if self._protocol_connected:
self._protocol.connection_lost(exc)
finally:
self._sock.close()
@ -718,6 +768,8 @@ class _SelectorSslTransport(_SelectorTransport):
sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
super().__init__(loop, sslsock, protocol, extra, server)
# the protocol connection is only made after the SSL handshake
self._protocol_connected = False
self._server_hostname = server_hostname
self._waiter = waiter
@ -797,6 +849,7 @@ class _SelectorSslTransport(_SelectorTransport):
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._protocol_connected = True
self._loop.call_soon(self._protocol.connection_made, self)
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(self._wakeup_waiter)
@ -928,8 +981,10 @@ class _SelectorDatagramTransport(_SelectorTransport):
waiter=None, extra=None):
super().__init__(loop, sock, protocol, extra)
self._address = address
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._sock_fd, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)

View File

@ -1,4 +1,6 @@
import collections
import sys
import warnings
try:
import ssl
except ImportError: # pragma: no cover
@ -295,6 +297,7 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
self._loop = loop
self._ssl_protocol = ssl_protocol
self._app_protocol = app_protocol
self._closed = False
def get_extra_info(self, name, default=None):
"""Get optional transport information."""
@ -308,8 +311,18 @@ class _SSLProtocolTransport(transports._FlowControlMixin,
protocol's connection_lost() method will (eventually) called
with None as its argument.
"""
self._closed = True
self._ssl_protocol._start_shutdown()
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if not self._closed:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self.close()
def pause_reading(self):
"""Pause the receiving end.

View File

@ -25,8 +25,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
super().__init__(loop=loop)
self._limit = limit
self.stdin = self.stdout = self.stderr = None
self.waiter = futures.Future(loop=loop)
self._waiters = collections.deque()
self._transport = None
def __repr__(self):
@ -61,9 +59,6 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader=None,
loop=self._loop)
if not self.waiter.cancelled():
self.waiter.set_result(None)
def pipe_data_received(self, fd, data):
if fd == 1:
reader = self.stdout
@ -94,16 +89,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader.set_exception(exc)
def process_exited(self):
returncode = self._transport.get_returncode()
self._transport.close()
self._transport = None
# wake up futures waiting for wait()
while self._waiters:
waiter = self._waiters.popleft()
if not waiter.cancelled():
waiter.set_result(returncode)
class Process:
def __init__(self, transport, protocol, loop):
@ -124,30 +112,18 @@ class Process:
@coroutine
def wait(self):
"""Wait until the process exit and return the process return code."""
returncode = self._transport.get_returncode()
if returncode is not None:
return returncode
"""Wait until the process exit and return the process return code.
waiter = futures.Future(loop=self._loop)
self._protocol._waiters.append(waiter)
yield from waiter
return waiter.result()
def _check_alive(self):
if self._transport.get_returncode() is not None:
raise ProcessLookupError()
This method is a coroutine."""
return (yield from self._transport._wait())
def send_signal(self, signal):
self._check_alive()
self._transport.send_signal(signal)
def terminate(self):
self._check_alive()
self._transport.terminate()
def kill(self):
self._check_alive()
self._transport.kill()
@coroutine
@ -221,11 +197,6 @@ def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
protocol_factory,
cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)
@coroutine
@ -241,9 +212,4 @@ def create_subprocess_exec(program, *args, stdin=None, stdout=None,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
try:
yield from protocol.waiter
except:
transport._kill_wait()
raise
return Process(transport, protocol, loop)

View File

@ -592,7 +592,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
fut.exception()
return
if fut._state == futures._CANCELLED:
if fut.cancelled():
res = futures.CancelledError()
if not return_exceptions:
outer.set_exception(res)

View File

@ -8,6 +8,7 @@ import stat
import subprocess
import sys
import threading
import warnings
from . import base_events
@ -15,6 +16,7 @@ from . import base_subprocess
from . import constants
from . import coroutines
from . import events
from . import futures
from . import selector_events
from . import selectors
from . import transports
@ -174,16 +176,20 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
waiter = futures.Future(loop=self)
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=extra, **kwargs)
try:
yield from transp._post_init()
except:
transp.close()
raise
waiter=waiter, extra=extra,
**kwargs)
watcher.add_child_handler(transp.get_pid(),
self._child_watcher_callback, transp)
try:
yield from waiter
except:
transp.close()
yield from transp._wait()
raise
return transp
@ -298,8 +304,10 @@ class _UnixReadPipeTransport(transports.ReadTransport):
_set_nonblocking(self._fileno)
self._protocol = protocol
self._closing = False
self._loop.add_reader(self._fileno, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
@ -351,6 +359,15 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if not self._closing:
self._close(None)
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if self._pipe is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self._pipe.close()
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
if (isinstance(exc, OSError) and exc.errno == errno.EIO):
@ -401,13 +418,16 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
# On AIX, the reader trick only works for sockets.
# On other platforms it works for pipes and sockets.
# (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
self._loop.add_reader(self._fileno, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
# On AIX, the reader trick (to be notified when the read end of the
# socket is closed) only works for sockets. On other platforms it
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
@ -524,6 +544,15 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# write_eof is all what we needed to close the write pipe
self.write_eof()
# On Python 3.3 and older, objects with a destructor part of a reference
# cycle are never destroyed. It's not more the case on Python 3.4 thanks
# to the PEP 442.
if sys.version_info >= (3, 4):
def __del__(self):
if self._pipe is not None:
warnings.warn("unclosed transport %r" % self, ResourceWarning)
self._pipe.close()
def abort(self):
self._close(None)
@ -750,7 +779,7 @@ class SafeChildWatcher(BaseChildWatcher):
pass
def add_child_handler(self, pid, callback, *args):
self._callbacks[pid] = callback, args
self._callbacks[pid] = (callback, args)
# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)

View File

@ -366,13 +366,16 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
waiter = futures.Future(loop=self)
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=extra, **kwargs)
waiter=waiter, extra=extra,
**kwargs)
try:
yield from transp._post_init()
yield from waiter
except:
transp.close()
yield from transp._wait()
raise
return transp

View File

@ -14,6 +14,7 @@ import os
import socket
import subprocess
import tempfile
import warnings
__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
@ -156,7 +157,10 @@ class PipeHandle:
CloseHandle(self._handle)
self._handle = None
__del__ = close
def __del__(self):
if self._handle is not None:
warnings.warn("unclosed %r" % self, ResourceWarning)
self.close()
def __enter__(self):
return self

View File

@ -886,14 +886,19 @@ class EventLoopTestsMixin:
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
# no CA loaded
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
with mock.patch.object(self.loop, 'call_exception_handler'):
with test_utils.disable_logger():
with self.assertRaisesRegex(ssl.SSLError,
'certificate verify failed '):
self.loop.run_until_complete(f_c)
# execute the loop to log the connection error
test_utils.run_briefly(self.loop)
# close connection
self.assertIsNone(proto.transport)
server.close()
@ -919,15 +924,20 @@ class EventLoopTestsMixin:
f_c = self.loop.create_unix_connection(MyProto, path,
ssl=sslcontext_client,
server_hostname='invalid')
with mock.patch.object(self.loop, 'call_exception_handler'):
with test_utils.disable_logger():
with self.assertRaisesRegex(ssl.SSLError,
'certificate verify failed '):
self.loop.run_until_complete(f_c)
# execute the loop to log the connection error
test_utils.run_briefly(self.loop)
# close connection
self.assertIsNone(proto.transport)
server.close()
def test_legacy_create_unix_server_ssl_verify_failed(self):
with test_utils.force_legacy_ssl_support():
self.test_create_unix_server_ssl_verify_failed()
@ -949,6 +959,7 @@ class EventLoopTestsMixin:
# incorrect server_hostname
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
with mock.patch.object(self.loop, 'call_exception_handler'):
with test_utils.disable_logger():
with self.assertRaisesRegex(
ssl.CertificateError,
@ -1540,9 +1551,10 @@ class SubprocessTestsMixin:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
self.assertEqual(b'Python The Winner', proto.data[1])
def test_subprocess_interactive(self):
@ -1556,7 +1568,6 @@ class SubprocessTestsMixin:
self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
try:
stdin = transp.get_pipe_transport(0)
stdin.write(b'Python ')
self.loop.run_until_complete(proto.got_data[1].wait())
@ -1566,11 +1577,11 @@ class SubprocessTestsMixin:
stdin.write(b'The Winner')
self.loop.run_until_complete(proto.got_data[1].wait())
self.assertEqual(b'Python The Winner', proto.data[1])
finally:
transp.close()
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
def test_subprocess_shell(self):
connect = self.loop.subprocess_shell(
@ -1728,9 +1739,10 @@ class SubprocessTestsMixin:
# GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using
# WriteFile() we get ERROR_BROKEN_PIPE as expected.)
self.assertEqual(b'ERR:OSError', proto.data[2])
with test_utils.disable_logger():
transp.close()
self.loop.run_until_complete(proto.completed)
self.check_terminated(proto.returncode)
self.check_killed(proto.returncode)
def test_subprocess_wait_no_same_group(self):
# start the new process in a new session

View File

@ -499,8 +499,12 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self.proactor.accept.assert_called_with(self.sock)
def test_socketpair(self):
class EventLoop(BaseProactorEventLoop):
# override the destructor to not log a ResourceWarning
def __del__(self):
pass
self.assertRaises(
NotImplementedError, BaseProactorEventLoop, self.proactor)
NotImplementedError, EventLoop, self.proactor)
def test_make_socket_transport(self):
tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())

View File

@ -1427,7 +1427,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.assertFalse(tr.can_write_eof())
self.assertRaises(NotImplementedError, tr.write_eof)
def test_close(self):
def check_close(self):
tr = self._make_one()
tr.close()
@ -1439,6 +1439,19 @@ class SelectorSslTransportTests(test_utils.TestCase):
self.assertEqual(tr._conn_lost, 1)
self.assertEqual(1, self.loop.remove_reader_count[1])
test_utils.run_briefly(self.loop)
def test_close(self):
self.check_close()
self.assertTrue(self.protocol.connection_made.called)
self.assertTrue(self.protocol.connection_lost.called)
def test_close_not_connected(self):
self.sslsock.do_handshake.side_effect = ssl.SSLWantReadError
self.check_close()
self.assertFalse(self.protocol.connection_made.called)
self.assertFalse(self.protocol.connection_lost.called)
@unittest.skipIf(ssl is None, 'No SSL support')
def test_server_hostname(self):
self.ssl_transport(server_hostname='localhost')

View File

@ -22,7 +22,9 @@ class SslProtoHandshakeTests(test_utils.TestCase):
def ssl_protocol(self, waiter=None):
sslcontext = test_utils.dummy_ssl_context()
app_proto = asyncio.Protocol()
return sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter)
proto = sslproto.SSLProtocol(self.loop, app_proto, sslcontext, waiter)
self.addCleanup(proto._app_transport.close)
return proto
def connection_made(self, ssl_proto, do_handshake=None):
transport = mock.Mock()
@ -56,9 +58,6 @@ class SslProtoHandshakeTests(test_utils.TestCase):
with test_utils.disable_logger():
self.loop.run_until_complete(handshake_fut)
# Close the transport
ssl_proto._app_transport.close()
def test_eof_received_waiter(self):
waiter = asyncio.Future(loop=self.loop)
ssl_proto = self.ssl_protocol(waiter)

View File

@ -4,6 +4,7 @@ import unittest
from unittest import mock
import asyncio
from asyncio import base_subprocess
from asyncio import subprocess
from asyncio import test_utils
try:
@ -23,6 +24,56 @@ PROGRAM_CAT = [
'data = sys.stdin.buffer.read()',
'sys.stdout.buffer.write(data)'))]
class TestSubprocessTransport(base_subprocess.BaseSubprocessTransport):
def _start(self, *args, **kwargs):
self._proc = mock.Mock()
self._proc.stdin = None
self._proc.stdout = None
self._proc.stderr = None
class SubprocessTransportTests(test_utils.TestCase):
def setUp(self):
self.loop = self.new_test_loop()
self.set_event_loop(self.loop)
def create_transport(self, waiter=None):
protocol = mock.Mock()
protocol.connection_made._is_coroutine = False
protocol.process_exited._is_coroutine = False
transport = TestSubprocessTransport(
self.loop, protocol, ['test'], False,
None, None, None, 0, waiter=waiter)
return (transport, protocol)
def test_proc_exited(self):
waiter = asyncio.Future(loop=self.loop)
transport, protocol = self.create_transport(waiter)
transport._process_exited(6)
self.loop.run_until_complete(waiter)
self.assertEqual(transport.get_returncode(), 6)
self.assertTrue(protocol.connection_made.called)
self.assertTrue(protocol.process_exited.called)
self.assertTrue(protocol.connection_lost.called)
self.assertEqual(protocol.connection_lost.call_args[0], (None,))
self.assertFalse(transport._closed)
self.assertIsNone(transport._loop)
self.assertIsNone(transport._proc)
self.assertIsNone(transport._protocol)
# methods must raise ProcessLookupError if the process exited
self.assertRaises(ProcessLookupError,
transport.send_signal, signal.SIGTERM)
self.assertRaises(ProcessLookupError, transport.terminate)
self.assertRaises(ProcessLookupError, transport.kill)
transport.close()
class SubprocessMixin:
def test_stdin_stdout(self):

View File

@ -350,16 +350,13 @@ class UnixReadPipeTransportTests(test_utils.TestCase):
return transport
def test_ctor(self):
tr = self.read_pipe_transport()
self.loop.assert_reader(5, tr._read_ready)
test_utils.run_briefly(self.loop)
self.protocol.connection_made.assert_called_with(tr)
waiter = asyncio.Future(loop=self.loop)
tr = self.read_pipe_transport(waiter=waiter)
self.loop.run_until_complete(waiter)
def test_ctor_with_waiter(self):
fut = asyncio.Future(loop=self.loop)
tr = self.read_pipe_transport(waiter=fut)
test_utils.run_briefly(self.loop)
self.assertIsNone(fut.result())
self.protocol.connection_made.assert_called_with(tr)
self.loop.assert_reader(5, tr._read_ready)
self.assertIsNone(waiter.result())
@mock.patch('os.read')
def test__read_ready(self, m_read):
@ -502,17 +499,13 @@ class UnixWritePipeTransportTests(test_utils.TestCase):
return transport
def test_ctor(self):
tr = self.write_pipe_transport()
self.loop.assert_reader(5, tr._read_ready)
test_utils.run_briefly(self.loop)
self.protocol.connection_made.assert_called_with(tr)
waiter = asyncio.Future(loop=self.loop)
tr = self.write_pipe_transport(waiter=waiter)
self.loop.run_until_complete(waiter)
def test_ctor_with_waiter(self):
fut = asyncio.Future(loop=self.loop)
tr = self.write_pipe_transport(waiter=fut)
self.protocol.connection_made.assert_called_with(tr)
self.loop.assert_reader(5, tr._read_ready)
test_utils.run_briefly(self.loop)
self.assertEqual(None, fut.result())
self.assertEqual(None, waiter.result())
def test_can_write_eof(self):
tr = self.write_pipe_transport()

View File

@ -360,6 +360,27 @@ class AbstractMemoryTests:
self.assertEqual(list(reversed(m)), aslist)
self.assertEqual(list(reversed(m)), list(m[::-1]))
def test_issue22668(self):
a = array.array('H', [256, 256, 256, 256])
x = memoryview(a)
m = x.cast('B')
b = m.cast('H')
c = b[0:2]
d = memoryview(b)
del b
self.assertEqual(c[0], 256)
self.assertEqual(d[0], 256)
self.assertEqual(c.format, "H")
self.assertEqual(d.format, "H")
_ = m.cast('I')
self.assertEqual(c[0], 256)
self.assertEqual(d[0], 256)
self.assertEqual(c.format, "H")
self.assertEqual(d.format, "H")
# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.

View File

@ -965,7 +965,7 @@ class SizeofTest(unittest.TestCase):
check(int(PyLong_BASE**2-1), vsize('') + 2*self.longdigit)
check(int(PyLong_BASE**2), vsize('') + 3*self.longdigit)
# memoryview
check(memoryview(b''), size('Pnin 2P2n2i5P 3cPn'))
check(memoryview(b''), size('Pnin 2P2n2i5P Pn'))
# module
check(unittest, size('PnPPP'))
# None

View File

@ -1132,6 +1132,51 @@ get_native_fmtchar(char *result, const char *fmt)
return -1;
}
Py_LOCAL_INLINE(char *)
get_native_fmtstr(const char *fmt)
{
int at = 0;
if (fmt[0] == '@') {
at = 1;
fmt++;
}
if (fmt[0] == '\0' || fmt[1] != '\0') {
return NULL;
}
#define RETURN(s) do { return at ? "@" s : s; } while (0)
switch (fmt[0]) {
case 'c': RETURN("c");
case 'b': RETURN("b");
case 'B': RETURN("B");
case 'h': RETURN("h");
case 'H': RETURN("H");
case 'i': RETURN("i");
case 'I': RETURN("I");
case 'l': RETURN("l");
case 'L': RETURN("L");
#ifdef HAVE_LONG_LONG
case 'q': RETURN("q");
case 'Q': RETURN("Q");
#endif
case 'n': RETURN("n");
case 'N': RETURN("N");
case 'f': RETURN("f");
case 'd': RETURN("d");
#ifdef HAVE_C99_BOOL
case '?': RETURN("?");
#else
case '?': RETURN("?");
#endif
case 'P': RETURN("P");
}
return NULL;
}
/* Cast a memoryview's data type to 'format'. The input array must be
C-contiguous. At least one of input-format, output-format must have
byte size. The output array is 1-D, with the same byte length as the
@ -1181,10 +1226,13 @@ cast_to_1D(PyMemoryViewObject *mv, PyObject *format)
goto out;
}
strncpy(mv->format, PyBytes_AS_STRING(asciifmt),
_Py_MEMORYVIEW_MAX_FORMAT);
mv->format[_Py_MEMORYVIEW_MAX_FORMAT-1] = '\0';
view->format = mv->format;
view->format = get_native_fmtstr(PyBytes_AS_STRING(asciifmt));
if (view->format == NULL) {
/* NOT_REACHED: get_native_fmtchar() already validates the format. */
PyErr_SetString(PyExc_RuntimeError,
"memoryview: internal error");
goto out;
}
view->itemsize = itemsize;
view->ndim = 1;