asyncio: Fix pyflakes warnings: remove unused variables and imports

This commit is contained in:
Victor Stinner 2014-02-26 11:07:42 +01:00
parent 24ba203504
commit f5e37037cc
8 changed files with 12 additions and 26 deletions

View File

@ -11,7 +11,6 @@ import sys
import tempfile
import threading
import time
import unittest
from unittest import mock
from http.server import HTTPServer

View File

@ -7,7 +7,7 @@ import sys
import time
import unittest
from unittest import mock
from test.support import find_unused_port, IPV6_ENABLED
from test.support import IPV6_ENABLED
import asyncio
from asyncio import base_events

View File

@ -25,7 +25,6 @@ from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
import asyncio
from asyncio import events
from asyncio import selector_events
from asyncio import test_utils
@ -1648,13 +1647,12 @@ class SubprocessTestsMixin:
def test_subprocess_wait_no_same_group(self):
proto = None
transp = None
@asyncio.coroutine
def connect():
nonlocal proto
# start the new process in a new session
transp, proto = yield from self.loop.subprocess_shell(
_, proto = yield from self.loop.subprocess_shell(
functools.partial(MySubprocessProtocol, self.loop),
'exit 7', stdin=None, stdout=None, stderr=None,
start_new_session=True)

View File

@ -1,6 +1,5 @@
"""Tests for selector_events.py"""
import collections
import errno
import gc
import pprint
@ -1378,7 +1377,7 @@ class SelectorSslWithoutSslTransportTests(unittest.TestCase):
def test_ssl_transport_requires_ssl_module(self):
Mock = mock.Mock
with self.assertRaises(RuntimeError):
transport = _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
_SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
class SelectorDatagramTransportTests(unittest.TestCase):

View File

@ -1,6 +1,5 @@
"""Tests for streams.py."""
import functools
import gc
import socket
import unittest

View File

@ -830,7 +830,7 @@ class TaskTests(unittest.TestCase):
v = yield from f
self.assertEqual(v, 'a')
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
def test_as_completed_reverse_wait(self):
@ -964,13 +964,9 @@ class TaskTests(unittest.TestCase):
loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close)
sleepfut = None
@asyncio.coroutine
def sleep(dt):
nonlocal sleepfut
sleepfut = asyncio.sleep(dt, loop=loop)
yield from sleepfut
yield from asyncio.sleep(dt, loop=loop)
@asyncio.coroutine
def doit():

View File

@ -1302,7 +1302,7 @@ class ChildWatcherTestsMixin:
m.waitpid.side_effect = ValueError
with mock.patch.object(log.logger,
'error') as m_error:
'error') as m_error:
self.assertEqual(self.watcher._sig_chld(), None)
self.assertTrue(m_error.called)
@ -1376,19 +1376,16 @@ class ChildWatcherTestsMixin:
# attach a new loop
old_loop = self.loop
self.loop = test_utils.TestLoop()
patch = mock.patch.object
with mock.patch.object(
old_loop,
"remove_signal_handler") as m_old_remove_signal_handler, \
mock.patch.object(
self.loop,
"add_signal_handler") as m_new_add_signal_handler:
with patch(old_loop, "remove_signal_handler") as m_old_remove, \
patch(self.loop, "add_signal_handler") as m_new_add:
self.watcher.attach_loop(self.loop)
m_old_remove_signal_handler.assert_called_once_with(
m_old_remove.assert_called_once_with(
signal.SIGCHLD)
m_new_add_signal_handler.assert_called_once_with(
m_new_add.assert_called_once_with(
signal.SIGCHLD, self.watcher._sig_chld)
# child terminates
@ -1462,7 +1459,6 @@ class ChildWatcherTestsMixin:
def test_close(self, m):
# register two children
callback1 = mock.Mock()
callback2 = mock.Mock()
with self.watcher:
self.running = True

View File

@ -8,7 +8,6 @@ if sys.platform != 'win32':
import _winapi
import asyncio
from asyncio import test_utils
from asyncio import _overlapped
from asyncio import windows_events
@ -50,7 +49,7 @@ class ProactorTests(unittest.TestCase):
ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
server1 = windows_events.PipeServer(ADDRESS)
with self.assertRaises(PermissionError):
server2 = windows_events.PipeServer(ADDRESS)
windows_events.PipeServer(ADDRESS)
server1.close()
def test_pipe(self):