mirror of https://github.com/python/cpython
183 lines
6.1 KiB
Python
183 lines
6.1 KiB
Python
"""Tests for window_utils"""
|
|
|
|
import socket
|
|
import sys
|
|
import unittest
|
|
import warnings
|
|
from unittest import mock
|
|
|
|
if sys.platform != 'win32':
|
|
raise unittest.SkipTest('Windows only')
|
|
|
|
import _winapi
|
|
|
|
from asyncio import _overlapped
|
|
from asyncio import windows_utils
|
|
try:
|
|
from test import support
|
|
except ImportError:
|
|
from asyncio import test_support as support
|
|
|
|
|
|
class WinsocketpairTests(unittest.TestCase):
|
|
|
|
def check_winsocketpair(self, ssock, csock):
|
|
csock.send(b'xxx')
|
|
self.assertEqual(b'xxx', ssock.recv(1024))
|
|
csock.close()
|
|
ssock.close()
|
|
|
|
def test_winsocketpair(self):
|
|
ssock, csock = windows_utils.socketpair()
|
|
self.check_winsocketpair(ssock, csock)
|
|
|
|
@unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
|
|
def test_winsocketpair_ipv6(self):
|
|
ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
|
|
self.check_winsocketpair(ssock, csock)
|
|
|
|
@unittest.skipIf(hasattr(socket, 'socketpair'),
|
|
'socket.socketpair is available')
|
|
@mock.patch('asyncio.windows_utils.socket')
|
|
def test_winsocketpair_exc(self, m_socket):
|
|
m_socket.AF_INET = socket.AF_INET
|
|
m_socket.SOCK_STREAM = socket.SOCK_STREAM
|
|
m_socket.socket.return_value.getsockname.return_value = ('', 12345)
|
|
m_socket.socket.return_value.accept.return_value = object(), object()
|
|
m_socket.socket.return_value.connect.side_effect = OSError()
|
|
|
|
self.assertRaises(OSError, windows_utils.socketpair)
|
|
|
|
def test_winsocketpair_invalid_args(self):
|
|
self.assertRaises(ValueError,
|
|
windows_utils.socketpair, family=socket.AF_UNSPEC)
|
|
self.assertRaises(ValueError,
|
|
windows_utils.socketpair, type=socket.SOCK_DGRAM)
|
|
self.assertRaises(ValueError,
|
|
windows_utils.socketpair, proto=1)
|
|
|
|
@unittest.skipIf(hasattr(socket, 'socketpair'),
|
|
'socket.socketpair is available')
|
|
@mock.patch('asyncio.windows_utils.socket')
|
|
def test_winsocketpair_close(self, m_socket):
|
|
m_socket.AF_INET = socket.AF_INET
|
|
m_socket.SOCK_STREAM = socket.SOCK_STREAM
|
|
sock = mock.Mock()
|
|
m_socket.socket.return_value = sock
|
|
sock.bind.side_effect = OSError
|
|
self.assertRaises(OSError, windows_utils.socketpair)
|
|
self.assertTrue(sock.close.called)
|
|
|
|
|
|
class PipeTests(unittest.TestCase):
|
|
|
|
def test_pipe_overlapped(self):
|
|
h1, h2 = windows_utils.pipe(overlapped=(True, True))
|
|
try:
|
|
ov1 = _overlapped.Overlapped()
|
|
self.assertFalse(ov1.pending)
|
|
self.assertEqual(ov1.error, 0)
|
|
|
|
ov1.ReadFile(h1, 100)
|
|
self.assertTrue(ov1.pending)
|
|
self.assertEqual(ov1.error, _winapi.ERROR_IO_PENDING)
|
|
ERROR_IO_INCOMPLETE = 996
|
|
try:
|
|
ov1.getresult()
|
|
except OSError as e:
|
|
self.assertEqual(e.winerror, ERROR_IO_INCOMPLETE)
|
|
else:
|
|
raise RuntimeError('expected ERROR_IO_INCOMPLETE')
|
|
|
|
ov2 = _overlapped.Overlapped()
|
|
self.assertFalse(ov2.pending)
|
|
self.assertEqual(ov2.error, 0)
|
|
|
|
ov2.WriteFile(h2, b"hello")
|
|
self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
|
|
|
|
res = _winapi.WaitForMultipleObjects([ov2.event], False, 100)
|
|
self.assertEqual(res, _winapi.WAIT_OBJECT_0)
|
|
|
|
self.assertFalse(ov1.pending)
|
|
self.assertEqual(ov1.error, ERROR_IO_INCOMPLETE)
|
|
self.assertFalse(ov2.pending)
|
|
self.assertIn(ov2.error, {0, _winapi.ERROR_IO_PENDING})
|
|
self.assertEqual(ov1.getresult(), b"hello")
|
|
finally:
|
|
_winapi.CloseHandle(h1)
|
|
_winapi.CloseHandle(h2)
|
|
|
|
def test_pipe_handle(self):
|
|
h, _ = windows_utils.pipe(overlapped=(True, True))
|
|
_winapi.CloseHandle(_)
|
|
p = windows_utils.PipeHandle(h)
|
|
self.assertEqual(p.fileno(), h)
|
|
self.assertEqual(p.handle, h)
|
|
|
|
# check garbage collection of p closes handle
|
|
with warnings.catch_warnings():
|
|
warnings.filterwarnings("ignore", "", ResourceWarning)
|
|
del p
|
|
support.gc_collect()
|
|
try:
|
|
_winapi.CloseHandle(h)
|
|
except OSError as e:
|
|
self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE
|
|
else:
|
|
raise RuntimeError('expected ERROR_INVALID_HANDLE')
|
|
|
|
|
|
class PopenTests(unittest.TestCase):
|
|
|
|
def test_popen(self):
|
|
command = r"""if 1:
|
|
import sys
|
|
s = sys.stdin.readline()
|
|
sys.stdout.write(s.upper())
|
|
sys.stderr.write('stderr')
|
|
"""
|
|
msg = b"blah\n"
|
|
|
|
p = windows_utils.Popen([sys.executable, '-c', command],
|
|
stdin=windows_utils.PIPE,
|
|
stdout=windows_utils.PIPE,
|
|
stderr=windows_utils.PIPE)
|
|
|
|
for f in [p.stdin, p.stdout, p.stderr]:
|
|
self.assertIsInstance(f, windows_utils.PipeHandle)
|
|
|
|
ovin = _overlapped.Overlapped()
|
|
ovout = _overlapped.Overlapped()
|
|
overr = _overlapped.Overlapped()
|
|
|
|
ovin.WriteFile(p.stdin.handle, msg)
|
|
ovout.ReadFile(p.stdout.handle, 100)
|
|
overr.ReadFile(p.stderr.handle, 100)
|
|
|
|
events = [ovin.event, ovout.event, overr.event]
|
|
# Super-long timeout for slow buildbots.
|
|
res = _winapi.WaitForMultipleObjects(events, True, 10000)
|
|
self.assertEqual(res, _winapi.WAIT_OBJECT_0)
|
|
self.assertFalse(ovout.pending)
|
|
self.assertFalse(overr.pending)
|
|
self.assertFalse(ovin.pending)
|
|
|
|
self.assertEqual(ovin.getresult(), len(msg))
|
|
out = ovout.getresult().rstrip()
|
|
err = overr.getresult().rstrip()
|
|
|
|
self.assertGreater(len(out), 0)
|
|
self.assertGreater(len(err), 0)
|
|
# allow for partial reads...
|
|
self.assertTrue(msg.upper().rstrip().startswith(out))
|
|
self.assertTrue(b"stderr".startswith(err))
|
|
|
|
# The context manager calls wait() and closes resources
|
|
with p:
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|