[3.6] bpo-33041: Add tests for jumps in/out of 'async with' blocks. (GH-6110). (GH-6141)

(cherry picked from commit bc300ce205)
This commit is contained in:
Serhiy Storchaka 2018-03-18 12:32:32 +02:00 committed by GitHub
parent 5bd0fe6a00
commit 193760f00a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 133 additions and 1 deletions

View File

@ -7,8 +7,9 @@ import difflib
import gc
from functools import wraps
class tracecontext:
"""Contex manager that traces its enter and exit."""
"""Context manager that traces its enter and exit."""
def __init__(self, output, value):
self.output = output
self.value = value
@ -19,6 +20,36 @@ class tracecontext:
def __exit__(self, *exc_info):
self.output.append(-self.value)
class asynctracecontext:
"""Asynchronous context manager that traces its aenter and aexit."""
def __init__(self, output, value):
self.output = output
self.value = value
async def __aenter__(self):
self.output.append(self.value)
async def __aexit__(self, *exc_info):
self.output.append(-self.value)
def asyncio_run(main):
import asyncio
import asyncio.events
import asyncio.coroutines
assert asyncio.events._get_running_loop() is None
assert asyncio.coroutines.iscoroutine(main)
loop = asyncio.events.new_event_loop()
try:
asyncio.events.set_event_loop(loop)
return loop.run_until_complete(main)
finally:
try:
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.events.set_event_loop(None)
loop.close()
# A very basic example. If this fails, we're in deep trouble.
def basic():
return 1
@ -591,6 +622,19 @@ class JumpTestCase(unittest.TestCase):
sys.settrace(None)
self.compare_jump_output(expected, output)
def run_async_test(self, func, jumpFrom, jumpTo, expected, error=None,
event='line', decorated=False):
tracer = JumpTracer(func, jumpFrom, jumpTo, event, decorated)
sys.settrace(tracer.trace)
output = []
if error is None:
asyncio_run(func(output))
else:
with self.assertRaisesRegex(*error):
asyncio_run(func(output))
sys.settrace(None)
self.compare_jump_output(expected, output)
def jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
"""Decorator that creates a test that makes a jump
from one place to another in the following code.
@ -603,6 +647,18 @@ class JumpTestCase(unittest.TestCase):
return test
return decorator
def async_jump_test(jumpFrom, jumpTo, expected, error=None, event='line'):
"""Decorator that creates a test that makes a jump
from one place to another in the following asynchronous code.
"""
def decorator(func):
@wraps(func)
def test(self):
self.run_async_test(func, jumpFrom, jumpTo, expected,
error=error, event=event, decorated=True)
return test
return decorator
## The first set of 'jump' tests are for things that are allowed:
@jump_test(1, 3, [3])
@ -698,12 +754,24 @@ class JumpTestCase(unittest.TestCase):
output.append(2)
output.append(3)
@async_jump_test(2, 3, [1, 3])
async def test_jump_forwards_out_of_async_with_block(output):
async with asynctracecontext(output, 1):
output.append(2)
output.append(3)
@jump_test(3, 1, [1, 2, 1, 2, 3, -2])
def test_jump_backwards_out_of_with_block(output):
output.append(1)
with tracecontext(output, 2):
output.append(3)
@async_jump_test(3, 1, [1, 2, 1, 2, 3, -2])
async def test_jump_backwards_out_of_async_with_block(output):
output.append(1)
async with asynctracecontext(output, 2):
output.append(3)
@jump_test(2, 5, [5])
def test_jump_forwards_out_of_try_finally_block(output):
try:
@ -767,6 +835,14 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 4):
output.append(5)
@async_jump_test(2, 4, [1, 4, 5, -4])
async def test_jump_across_async_with(output):
output.append(1)
async with asynctracecontext(output, 2):
output.append(3)
async with asynctracecontext(output, 4):
output.append(5)
@jump_test(4, 5, [1, 3, 5, 6])
def test_jump_out_of_with_block_within_for_block(output):
output.append(1)
@ -776,6 +852,15 @@ class JumpTestCase(unittest.TestCase):
output.append(5)
output.append(6)
@async_jump_test(4, 5, [1, 3, 5, 6])
async def test_jump_out_of_async_with_block_within_for_block(output):
output.append(1)
for i in [1]:
async with asynctracecontext(output, 3):
output.append(4)
output.append(5)
output.append(6)
@jump_test(4, 5, [1, 2, 3, 5, -2, 6])
def test_jump_out_of_with_block_within_with_block(output):
output.append(1)
@ -785,6 +870,15 @@ class JumpTestCase(unittest.TestCase):
output.append(5)
output.append(6)
@async_jump_test(4, 5, [1, 2, 3, 5, -2, 6])
async def test_jump_out_of_async_with_block_within_with_block(output):
output.append(1)
with tracecontext(output, 2):
async with asynctracecontext(output, 3):
output.append(4)
output.append(5)
output.append(6)
@jump_test(5, 6, [2, 4, 6, 7])
def test_jump_out_of_with_block_within_finally_block(output):
try:
@ -795,6 +889,16 @@ class JumpTestCase(unittest.TestCase):
output.append(6)
output.append(7)
@async_jump_test(5, 6, [2, 4, 6, 7])
async def test_jump_out_of_async_with_block_within_finally_block(output):
try:
output.append(2)
finally:
async with asynctracecontext(output, 4):
output.append(5)
output.append(6)
output.append(7)
@jump_test(8, 11, [1, 3, 5, 11, 12])
def test_jump_out_of_complex_nested_blocks(output):
output.append(1)
@ -818,6 +922,14 @@ class JumpTestCase(unittest.TestCase):
output.append(4)
output.append(5)
@async_jump_test(3, 5, [1, 2, 5])
async def test_jump_out_of_async_with_assignment(output):
output.append(1)
async with asynctracecontext(output, 2) \
as x:
output.append(4)
output.append(5)
@jump_test(3, 6, [1, 6, 8, 9])
def test_jump_over_return_in_try_finally_block(output):
output.append(1)
@ -936,12 +1048,24 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 2):
output.append(3)
@async_jump_test(1, 3, [], (ValueError, 'into'))
async def test_no_jump_forwards_into_async_with_block(output):
output.append(1)
async with asynctracecontext(output, 2):
output.append(3)
@jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
def test_no_jump_backwards_into_with_block(output):
with tracecontext(output, 1):
output.append(2)
output.append(3)
@async_jump_test(3, 2, [1, 2, -1], (ValueError, 'into'))
async def test_no_jump_backwards_into_async_with_block(output):
async with asynctracecontext(output, 1):
output.append(2)
output.append(3)
@jump_test(1, 3, [], (ValueError, 'into'))
def test_no_jump_forwards_into_try_finally_block(output):
output.append(1)
@ -1022,6 +1146,14 @@ class JumpTestCase(unittest.TestCase):
with tracecontext(output, 4):
output.append(5)
@async_jump_test(3, 5, [1, 2, -2], (ValueError, 'into'))
async def test_no_jump_between_async_with_blocks(output):
output.append(1)
async with asynctracecontext(output, 2):
output.append(3)
async with asynctracecontext(output, 4):
output.append(5)
@jump_test(7, 4, [1, 6], (ValueError, 'into'))
def test_no_jump_into_for_block_before_else(output):
output.append(1)