asyncio/tests: Fix ResourceWarnings related to unclosed transports

This commit is contained in:
Yury Selivanov 2015-12-16 20:23:26 -05:00
parent 5f68ca66bf
commit 3cd863c86e
1 changed files with 29 additions and 18 deletions

View File

@ -1157,21 +1157,28 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.add_writer = mock.Mock()
self.loop.add_writer._is_coroutine = False
coro = self.loop.create_connection(MyProto, '1.2.3.4', 80)
self.loop.run_until_complete(coro)
sock.connect.assert_called_with(('1.2.3.4', 80))
m_socket.socket.assert_called_with(family=m_socket.AF_INET,
proto=m_socket.IPPROTO_TCP,
type=m_socket.SOCK_STREAM)
coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
t, p = self.loop.run_until_complete(coro)
try:
sock.connect.assert_called_with(('1.2.3.4', 80))
m_socket.socket.assert_called_with(family=m_socket.AF_INET,
proto=m_socket.IPPROTO_TCP,
type=m_socket.SOCK_STREAM)
finally:
t.close()
test_utils.run_briefly(self.loop) # allow transport to close
sock.family = socket.AF_INET6
coro = self.loop.create_connection(MyProto, '::2', 80)
self.loop.run_until_complete(coro)
sock.connect.assert_called_with(('::2', 80))
m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
proto=m_socket.IPPROTO_TCP,
type=m_socket.SOCK_STREAM)
coro = self.loop.create_connection(asyncio.Protocol, '::2', 80)
t, p = self.loop.run_until_complete(coro)
try:
sock.connect.assert_called_with(('::2', 80))
m_socket.socket.assert_called_with(family=m_socket.AF_INET6,
proto=m_socket.IPPROTO_TCP,
type=m_socket.SOCK_STREAM)
finally:
t.close()
test_utils.run_briefly(self.loop) # allow transport to close
@patch_socket
def test_create_connection_ip_addr(self, m_socket):
@ -1559,11 +1566,15 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
reuse_address=False,
reuse_port=reuseport_supported)
self.loop.run_until_complete(coro)
bind.assert_called_with(('1.2.3.4', 0))
m_socket.socket.assert_called_with(family=m_socket.AF_INET,
proto=m_socket.IPPROTO_UDP,
type=m_socket.SOCK_DGRAM)
t, p = self.loop.run_until_complete(coro)
try:
bind.assert_called_with(('1.2.3.4', 0))
m_socket.socket.assert_called_with(family=m_socket.AF_INET,
proto=m_socket.IPPROTO_UDP,
type=m_socket.SOCK_DGRAM)
finally:
t.close()
test_utils.run_briefly(self.loop) # allow transport to close
def test_accept_connection_retry(self):
sock = mock.Mock()