Merge 3.4 (asyncio)

This commit is contained in:
Victor Stinner 2015-01-09 01:43:04 +01:00
commit b551fac136
3 changed files with 16 additions and 7 deletions

View File

@ -126,6 +126,8 @@ class Queue:
# Use _put and _get instead of passing item straight to getter, in # Use _put and _get instead of passing item straight to getter, in
# case a subclass has logic that must run (e.g. JoinableQueue). # case a subclass has logic that must run (e.g. JoinableQueue).
self._put(item) self._put(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get()) getter.set_result(self._get())
elif self._maxsize > 0 and self._maxsize <= self.qsize(): elif self._maxsize > 0 and self._maxsize <= self.qsize():
@ -152,6 +154,8 @@ class Queue:
# Use _put and _get instead of passing item straight to getter, in # Use _put and _get instead of passing item straight to getter, in
# case a subclass has logic that must run (e.g. JoinableQueue). # case a subclass has logic that must run (e.g. JoinableQueue).
self._put(item) self._put(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get()) getter.set_result(self._get())
elif self._maxsize > 0 and self._maxsize <= self.qsize(): elif self._maxsize > 0 and self._maxsize <= self.qsize():
@ -200,6 +204,8 @@ class Queue:
item, putter = self._putters.popleft() item, putter = self._putters.popleft()
self._put(item) self._put(item)
# Wake putter on next tick. # Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
putter.set_result(None) putter.set_result(None)
return self._get() return self._get()

View File

@ -363,15 +363,15 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
break break
except BlockingIOError: except BlockingIOError:
fut.add_done_callback(functools.partial(self._sock_connect_done, fut.add_done_callback(functools.partial(self._sock_connect_done,
sock)) fd))
self.add_writer(fd, self._sock_connect_cb, fut, sock, address) self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
except Exception as exc: except Exception as exc:
fut.set_exception(exc) fut.set_exception(exc)
else: else:
fut.set_result(None) fut.set_result(None)
def _sock_connect_done(self, sock, fut): def _sock_connect_done(self, fd, fut):
self.remove_writer(sock.fileno()) self.remove_writer(fd)
def _sock_connect_cb(self, fut, sock, address): def _sock_connect_cb(self, fut, sock, address):
if fut.cancelled(): if fut.cancelled():

View File

@ -582,11 +582,12 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
def _done_callback(i, fut): def _done_callback(i, fut):
nonlocal nfinished nonlocal nfinished
if outer._state != futures._PENDING: if outer.done():
if fut._exception is not None: if not fut.cancelled():
# Mark exception retrieved. # Mark exception retrieved.
fut.exception() fut.exception()
return return
if fut._state == futures._CANCELLED: if fut._state == futures._CANCELLED:
res = futures.CancelledError() res = futures.CancelledError()
if not return_exceptions: if not return_exceptions:
@ -644,9 +645,11 @@ def shield(arg, *, loop=None):
def _done_callback(inner): def _done_callback(inner):
if outer.cancelled(): if outer.cancelled():
# Mark inner's result as retrieved. if not inner.cancelled():
inner.cancelled() or inner.exception() # Mark inner's result as retrieved.
inner.exception()
return return
if inner.cancelled(): if inner.cancelled():
outer.cancel() outer.cancel()
else: else: