bpo-32418: Postfix, raise NotImplementdError and close resources in tests (#5052)

This commit is contained in:
Andrew Svetlov 2017-12-30 18:52:56 +02:00 committed by GitHub
parent 1634fc289a
commit ffcb4c0165
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 5 deletions

View File

@ -149,15 +149,15 @@ class AbstractServer:
def close(self): def close(self):
"""Stop serving. This leaves existing connections open.""" """Stop serving. This leaves existing connections open."""
return NotImplemented raise NotImplementedError
async def wait_closed(self): async def wait_closed(self):
"""Coroutine to wait until service is closed.""" """Coroutine to wait until service is closed."""
return NotImplemented raise NotImplementedError
def get_loop(self): def get_loop(self):
""" Get the event loop the Server object is attached to.""" """ Get the event loop the Server object is attached to."""
return NotImplemented raise NotImplementedError
class AbstractEventLoop: class AbstractEventLoop:

View File

@ -2826,19 +2826,36 @@ else:
get_running_loop_impl = events._c_get_running_loop get_running_loop_impl = events._c_get_running_loop
get_event_loop_impl = events._c_get_event_loop get_event_loop_impl = events._c_get_event_loop
class TestServer(unittest.TestCase): class TestServer(unittest.TestCase):
def test_get_loop(self): def test_get_loop(self):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
proto = MyProto(loop) proto = MyProto(loop)
server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
self.assertEqual(server.get_loop(), loop) self.assertEqual(server.get_loop(), loop)
loop.close() server.close()
loop.run_until_complete(server.wait_closed())
class TestAbstractServer(unittest.TestCase): class TestAbstractServer(unittest.TestCase):
def test_close(self):
with self.assertRaises(NotImplementedError):
events.AbstractServer().close()
def test_wait_closed(self):
loop = asyncio.new_event_loop()
self.addCleanup(loop.close)
with self.assertRaises(NotImplementedError):
loop.run_until_complete(events.AbstractServer().wait_closed())
def test_get_loop(self): def test_get_loop(self):
self.assertEqual(events.AbstractServer().get_loop(), NotImplemented) with self.assertRaises(NotImplementedError):
events.AbstractServer().get_loop()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()