From 592ada9b4b08ad57037e365b9c462d71c96e4453 Mon Sep 17 00:00:00 2001 From: Yury Selivanov Date: Thu, 25 Sep 2014 12:07:56 -0400 Subject: [PATCH] asyncio: Improve canceled timer handles cleanup. Closes issue #22448. Patch by Joshua Moore-Oliva. --- Lib/asyncio/base_events.py | 44 ++++++++++-- Lib/asyncio/events.py | 29 +++++--- Lib/test/test_asyncio/test_base_events.py | 84 +++++++++++++++++++++-- Lib/test/test_asyncio/test_events.py | 14 +++- Misc/NEWS | 14 ++++ 5 files changed, 159 insertions(+), 26 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index db132505a8c..5aaf58f9f1e 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -40,6 +40,13 @@ __all__ = ['BaseEventLoop', 'Server'] # Argument for default thread pool executor creation. _MAX_WORKERS = 5 +# Minimum number of _scheduled timer handles before cleanup of +# cancelled handles is performed. +_MIN_SCHEDULED_TIMER_HANDLES = 100 + +# Minimum fraction of _scheduled timer handles that are cancelled +# before cleanup of cancelled handles is performed. +_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 def _format_handle(handle): cb = handle._callback @@ -145,6 +152,7 @@ class Server(events.AbstractServer): class BaseEventLoop(events.AbstractEventLoop): def __init__(self): + self._timer_cancelled_count = 0 self._closed = False self._ready = collections.deque() self._scheduled = [] @@ -349,6 +357,7 @@ class BaseEventLoop(events.AbstractEventLoop): if timer._source_traceback: del timer._source_traceback[-1] heapq.heappush(self._scheduled, timer) + timer._scheduled = True return timer def call_soon(self, callback, *args): @@ -964,16 +973,19 @@ class BaseEventLoop(events.AbstractEventLoop): assert isinstance(handle, events.Handle), 'A Handle is required here' if handle._cancelled: return - if isinstance(handle, events.TimerHandle): - heapq.heappush(self._scheduled, handle) - else: - self._ready.append(handle) + assert not isinstance(handle, events.TimerHandle) + self._ready.append(handle) def _add_callback_signalsafe(self, handle): """Like _add_callback() but called from a signal handler.""" self._add_callback(handle) self._write_to_self() + def _timer_handle_cancelled(self, handle): + """Notification that a TimerHandle has been cancelled.""" + if handle._scheduled: + self._timer_cancelled_count += 1 + def _run_once(self): """Run one full iteration of the event loop. @@ -981,9 +993,26 @@ class BaseEventLoop(events.AbstractEventLoop): schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ - # Remove delayed calls that were cancelled from head of queue. - while self._scheduled and self._scheduled[0]._cancelled: - heapq.heappop(self._scheduled) + + # Remove delayed calls that were cancelled if their number is too high + sched_count = len(self._scheduled) + if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and + self._timer_cancelled_count / sched_count > + _MIN_CANCELLED_TIMER_HANDLES_FRACTION): + for handle in self._scheduled: + if handle._cancelled: + handle._scheduled = False + + self._scheduled = [x for x in self._scheduled if not x._cancelled] + self._timer_cancelled_count = 0 + + heapq.heapify(self._scheduled) + else: + # Remove delayed calls that were cancelled from head of queue. + while self._scheduled and self._scheduled[0]._cancelled: + self._timer_cancelled_count -= 1 + handle = heapq.heappop(self._scheduled) + handle._scheduled = False timeout = None if self._ready: @@ -1024,6 +1053,7 @@ class BaseEventLoop(events.AbstractEventLoop): if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) + handle._scheduled = False self._ready.append(handle) # This is the only place where callbacks are actually *called*. diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index b7cc35122d4..806218f61b7 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -105,14 +105,15 @@ class Handle: return '<%s>' % ' '.join(info) def cancel(self): - self._cancelled = True - if self._loop.get_debug(): - # Keep a representation in debug mode to keep callback and - # parameters. For example, to log the warning "Executing took 2.5 second" - self._repr = repr(self) - self._callback = None - self._args = None + if not self._cancelled: + self._cancelled = True + if self._loop.get_debug(): + # Keep a representation in debug mode to keep callback and + # parameters. For example, to log the warning + # "Executing took 2.5 second" + self._repr = repr(self) + self._callback = None + self._args = None def _run(self): try: @@ -134,7 +135,7 @@ class Handle: class TimerHandle(Handle): """Object returned by timed callback registration methods.""" - __slots__ = ['_when'] + __slots__ = ['_scheduled', '_when'] def __init__(self, when, callback, args, loop): assert when is not None @@ -142,6 +143,7 @@ class TimerHandle(Handle): if self._source_traceback: del self._source_traceback[-1] self._when = when + self._scheduled = False def _repr_info(self): info = super()._repr_info() @@ -180,6 +182,11 @@ class TimerHandle(Handle): equal = self.__eq__(other) return NotImplemented if equal is NotImplemented else not equal + def cancel(self): + if not self._cancelled: + self._loop._timer_handle_cancelled(self) + super().cancel() + class AbstractServer: """Abstract server returned by create_server().""" @@ -238,6 +245,10 @@ class AbstractEventLoop: # Methods scheduling callbacks. All these return Handles. + def _timer_handle_cancelled(self, handle): + """Notification that a TimerHandle has been cancelled.""" + raise NotImplementedError + def call_soon(self, callback, *args): return self.call_later(0, callback, *args) diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index ca12101b2e6..294872a99fb 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -2,6 +2,7 @@ import errno import logging +import math import socket import sys import time @@ -73,13 +74,6 @@ class BaseEventLoopTests(test_utils.TestCase): self.assertFalse(self.loop._scheduled) self.assertIn(h, self.loop._ready) - def test__add_callback_timer(self): - h = asyncio.TimerHandle(time.monotonic()+10, lambda: False, (), - self.loop) - - self.loop._add_callback(h) - self.assertIn(h, self.loop._scheduled) - def test__add_callback_cancelled_handle(self): h = asyncio.Handle(lambda: False, (), self.loop) h.cancel() @@ -283,6 +277,82 @@ class BaseEventLoopTests(test_utils.TestCase): self.assertTrue(processed) self.assertEqual([handle], list(self.loop._ready)) + def test__run_once_cancelled_event_cleanup(self): + self.loop._process_events = mock.Mock() + + self.assertTrue( + 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) + + def cb(): + pass + + # Set up one "blocking" event that will not be cancelled to + # ensure later cancelled events do not make it to the head + # of the queue and get cleaned. + not_cancelled_count = 1 + self.loop.call_later(3000, cb) + + # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) + # cancelled handles, ensure they aren't removed + + cancelled_count = 2 + for x in range(2): + h = self.loop.call_later(3600, cb) + h.cancel() + + # Add some cancelled events that will be at head and removed + cancelled_count += 2 + for x in range(2): + h = self.loop.call_later(100, cb) + h.cancel() + + # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low + self.assertLessEqual(cancelled_count + not_cancelled_count, + base_events._MIN_SCHEDULED_TIMER_HANDLES) + + self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) + + self.loop._run_once() + + cancelled_count -= 2 + + self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) + + self.assertEqual(len(self.loop._scheduled), + cancelled_count + not_cancelled_count) + + # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION + # so that deletion of cancelled events will occur on next _run_once + add_cancel_count = int(math.ceil( + base_events._MIN_SCHEDULED_TIMER_HANDLES * + base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 + + add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - + add_cancel_count, 0) + + # Add some events that will not be cancelled + not_cancelled_count += add_not_cancel_count + for x in range(add_not_cancel_count): + self.loop.call_later(3600, cb) + + # Add enough cancelled events + cancelled_count += add_cancel_count + for x in range(add_cancel_count): + h = self.loop.call_later(3600, cb) + h.cancel() + + # Ensure all handles are still scheduled + self.assertEqual(len(self.loop._scheduled), + cancelled_count + not_cancelled_count) + + self.loop._run_once() + + # Ensure cancelled events were removed + self.assertEqual(len(self.loop._scheduled), not_cancelled_count) + + # Ensure only uncancelled events remain scheduled + self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) + def test_run_until_complete_type_error(self): self.assertRaises(TypeError, self.loop.run_until_complete, 'blah') diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 7ac845a8a0f..a305e66d5a3 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1890,9 +1890,17 @@ class HandleTests(test_utils.TestCase): # cancelled handle h.cancel() - self.assertEqual(repr(h), - '' - % (filename, lineno, create_filename, create_lineno)) + self.assertEqual( + repr(h), + '' + % (filename, lineno, create_filename, create_lineno)) + + # double cancellation won't overwrite _repr + h.cancel() + self.assertEqual( + repr(h), + '' + % (filename, lineno, create_filename, create_lineno)) def test_handle_source_traceback(self): loop = asyncio.get_event_loop_policy().new_event_loop() diff --git a/Misc/NEWS b/Misc/NEWS index 5b8cbca531b..2f3d9d127ce 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -2,6 +2,20 @@ Python News +++++++++++ + +What's New in Python 3.4.3? +=========================== + +Core and Builtins +----------------- + +Library +------- + +- Issue #22448: Improve canceled timer handles cleanup to prevent + unbound memory usage. Patch by Joshua Moore-Oliva. + + What's New in Python 3.4.2? ===========================