gh-99357: Close the event loop when it is no longer used in test_uncancel_structured_blocks (#99414)

This commit is contained in:
Xiao Chen 2022-11-13 04:16:44 +08:00 committed by GitHub
parent dfc1b17a23
commit 99972dc745
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 20 deletions

View File

@ -635,27 +635,30 @@ class BaseTaskTests:
await asyncio.sleep(0)
return timed_out, structured_block_finished, outer_code_reached
# Test which timed out.
t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
timed_out, structured_block_finished, outer_code_reached = (
loop.run_until_complete(t1)
)
self.assertTrue(timed_out)
self.assertFalse(structured_block_finished) # it was cancelled
self.assertTrue(outer_code_reached) # task got uncancelled after leaving
# the structured block and continued until
# completion
self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
try:
# Test which timed out.
t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1))
timed_out, structured_block_finished, outer_code_reached = (
loop.run_until_complete(t1)
)
self.assertTrue(timed_out)
self.assertFalse(structured_block_finished) # it was cancelled
self.assertTrue(outer_code_reached) # task got uncancelled after leaving
# the structured block and continued until
# completion
self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task
# Test which did not time out.
t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
timed_out, structured_block_finished, outer_code_reached = (
loop.run_until_complete(t2)
)
self.assertFalse(timed_out)
self.assertTrue(structured_block_finished)
self.assertTrue(outer_code_reached)
self.assertEqual(t2.cancelling(), 0)
# Test which did not time out.
t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0))
timed_out, structured_block_finished, outer_code_reached = (
loop.run_until_complete(t2)
)
self.assertFalse(timed_out)
self.assertTrue(structured_block_finished)
self.assertTrue(outer_code_reached)
self.assertEqual(t2.cancelling(), 0)
finally:
loop.close()
def test_cancel(self):