import queue import sched import threading import time import unittest from test import support from test.support import threading_helper TIMEOUT = support.SHORT_TIMEOUT class Timer: def __init__(self): self._cond = threading.Condition() self._time = 0 self._stop = 0 def time(self): with self._cond: return self._time # increase the time but not beyond the established limit def sleep(self, t): assert t >= 0 with self._cond: t += self._time while self._stop < t: self._time = self._stop self._cond.wait() self._time = t # advance time limit for user code def advance(self, t): assert t >= 0 with self._cond: self._stop += t self._cond.notify_all() class TestCase(unittest.TestCase): def test_enter(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) for x in [0.5, 0.4, 0.3, 0.2, 0.1]: z = scheduler.enter(x, 1, fun, (x,)) scheduler.run() self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5]) def test_enterabs(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: z = scheduler.enterabs(x, 1, fun, (x,)) scheduler.run() self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05]) @threading_helper.requires_working_threading() def test_enter_concurrent(self): q = queue.Queue() fun = q.put timer = Timer() scheduler = sched.scheduler(timer.time, timer.sleep) scheduler.enter(1, 1, fun, (1,)) scheduler.enter(3, 1, fun, (3,)) t = threading.Thread(target=scheduler.run) t.start() timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 1) self.assertTrue(q.empty()) for x in [4, 5, 2]: z = scheduler.enter(x - 1, 1, fun, (x,)) timer.advance(2) self.assertEqual(q.get(timeout=TIMEOUT), 2) self.assertEqual(q.get(timeout=TIMEOUT), 3) self.assertTrue(q.empty()) timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 4) self.assertTrue(q.empty()) timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 5) self.assertTrue(q.empty()) timer.advance(1000) threading_helper.join_thread(t) self.assertTrue(q.empty()) self.assertEqual(timer.time(), 5) def test_priority(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) cases = [ ([1, 2, 3, 4, 5], [1, 2, 3, 4, 5]), ([5, 4, 3, 2, 1], [1, 2, 3, 4, 5]), ([2, 5, 3, 1, 4], [1, 2, 3, 4, 5]), ([1, 2, 3, 2, 1], [1, 1, 2, 2, 3]), ] for priorities, expected in cases: with self.subTest(priorities=priorities, expected=expected): for priority in priorities: scheduler.enterabs(0.01, priority, fun, (priority,)) scheduler.run() self.assertEqual(l, expected) # Cleanup: self.assertTrue(scheduler.empty()) l.clear() def test_cancel(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) now = time.time() event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,)) event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,)) event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,)) event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,)) event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,)) scheduler.cancel(event1) scheduler.cancel(event5) scheduler.run() self.assertEqual(l, [0.02, 0.03, 0.04]) @threading_helper.requires_working_threading() def test_cancel_concurrent(self): q = queue.Queue() fun = q.put timer = Timer() scheduler = sched.scheduler(timer.time, timer.sleep) now = timer.time() event1 = scheduler.enterabs(now + 1, 1, fun, (1,)) event2 = scheduler.enterabs(now + 2, 1, fun, (2,)) event4 = scheduler.enterabs(now + 4, 1, fun, (4,)) event5 = scheduler.enterabs(now + 5, 1, fun, (5,)) event3 = scheduler.enterabs(now + 3, 1, fun, (3,)) t = threading.Thread(target=scheduler.run) t.start() timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 1) self.assertTrue(q.empty()) scheduler.cancel(event2) scheduler.cancel(event5) timer.advance(1) self.assertTrue(q.empty()) timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 3) self.assertTrue(q.empty()) timer.advance(1) self.assertEqual(q.get(timeout=TIMEOUT), 4) self.assertTrue(q.empty()) timer.advance(1000) threading_helper.join_thread(t) self.assertTrue(q.empty()) self.assertEqual(timer.time(), 4) def test_cancel_correct_event(self): # bpo-19270 events = [] scheduler = sched.scheduler() scheduler.enterabs(1, 1, events.append, ("a",)) b = scheduler.enterabs(1, 1, events.append, ("b",)) scheduler.enterabs(1, 1, events.append, ("c",)) scheduler.cancel(b) scheduler.run() self.assertEqual(events, ["a", "c"]) def test_empty(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) self.assertTrue(scheduler.empty()) for x in [0.05, 0.04, 0.03, 0.02, 0.01]: z = scheduler.enterabs(x, 1, fun, (x,)) self.assertFalse(scheduler.empty()) scheduler.run() self.assertTrue(scheduler.empty()) def test_queue(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) now = time.time() e5 = scheduler.enterabs(now + 0.05, 1, fun) e1 = scheduler.enterabs(now + 0.01, 1, fun) e2 = scheduler.enterabs(now + 0.02, 1, fun) e4 = scheduler.enterabs(now + 0.04, 1, fun) e3 = scheduler.enterabs(now + 0.03, 1, fun) # queue property is supposed to return an order list of # upcoming events self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5]) def test_args_kwargs(self): seq = [] def fun(*a, **b): seq.append((a, b)) now = time.time() scheduler = sched.scheduler(time.time, time.sleep) scheduler.enterabs(now, 1, fun) scheduler.enterabs(now, 1, fun, argument=(1, 2)) scheduler.enterabs(now, 1, fun, argument=('a', 'b')) scheduler.enterabs(now, 1, fun, argument=(1, 2), kwargs={"foo": 3}) scheduler.run() self.assertCountEqual(seq, [ ((), {}), ((1, 2), {}), (('a', 'b'), {}), ((1, 2), {'foo': 3}) ]) def test_run_non_blocking(self): l = [] fun = lambda x: l.append(x) scheduler = sched.scheduler(time.time, time.sleep) for x in [10, 9, 8, 7, 6]: scheduler.enter(x, 1, fun, (x,)) scheduler.run(blocking=False) self.assertEqual(l, []) if __name__ == "__main__": unittest.main()