cpython/Lib/test/test_kqueue.py

167 lines
5.8 KiB
Python

"""
Tests for kqueue wrapper.
"""
import socket
import errno
import time
import select
import sys
import unittest
from test import support
if not hasattr(select, "kqueue"):
raise support.TestSkipped("test works only on BSD")
class TestKQueue(unittest.TestCase):
def test_create_queue(self):
kq = select.kqueue()
self.assert_(kq.fileno() > 0, kq.fileno())
self.assert_(not kq.closed)
kq.close()
self.assert_(kq.closed)
self.assertRaises(ValueError, kq.fileno)
def test_create_event(self):
fd = sys.stderr.fileno()
ev = select.kevent(fd)
other = select.kevent(1000)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_READ)
self.assertEqual(ev.flags, select.KQ_EV_ADD)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
self.assertEqual(cmp(ev, other), -1)
self.assert_(ev < other)
self.assert_(other >= ev)
self.assertRaises(TypeError, cmp, ev, None)
self.assertRaises(TypeError, cmp, ev, 1)
self.assertRaises(TypeError, cmp, ev, "ev")
ev = select.kevent(fd, select.KQ_FILTER_WRITE)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
self.assertEqual(ev.flags, select.KQ_EV_ADD)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
self.assertEqual(ev.ident, fd)
self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
self.assertEqual(ev.fflags, 0)
self.assertEqual(ev.data, 0)
self.assertEqual(ev.udata, 0)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
ev = select.kevent(1, 2, 3, 4, 5, 6)
self.assertEqual(ev.ident, 1)
self.assertEqual(ev.filter, 2)
self.assertEqual(ev.flags, 3)
self.assertEqual(ev.fflags, 4)
self.assertEqual(ev.data, 5)
self.assertEqual(ev.udata, 6)
self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
def test_queue_event(self):
serverSocket = socket.socket()
serverSocket.bind(('127.0.0.1', 0))
serverSocket.listen(1)
client = socket.socket()
client.setblocking(False)
try:
client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
except socket.error as e:
self.assertEquals(e.args[0], errno.EINPROGRESS)
else:
#raise AssertionError("Connect should have raised EINPROGRESS")
pass # FreeBSD doesn't raise an exception here
server, addr = serverSocket.accept()
if sys.platform.startswith("darwin"):
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
else:
flags = 0
kq = select.kqueue()
kq2 = select.kqueue.fromfd(kq.fileno())
ev = select.kevent(server.fileno(),
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
kq.control([ev], 0)
ev = select.kevent(server.fileno(),
select.KQ_FILTER_READ,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
kq.control([ev], 0)
ev = select.kevent(client.fileno(),
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
kq2.control([ev], 0)
ev = select.kevent(client.fileno(),
select.KQ_FILTER_READ,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
kq2.control([ev], 0)
events = kq.control(None, 4, 1)
events = [(e.ident, e.filter, e.flags) for e in events]
events.sort()
self.assertEquals(events, [
(client.fileno(), select.KQ_FILTER_WRITE, flags),
(server.fileno(), select.KQ_FILTER_WRITE, flags)])
client.send(b"Hello!")
server.send(b"world!!!")
events = kq.control(None, 4, 1)
# We may need to call it several times
for i in range(5):
if len(events) == 4:
break
events = kq.control(None, 4, 1)
events = [(e.ident, e.filter, e.flags) for e in events]
events.sort()
self.assertEquals(events, [
(client.fileno(), select.KQ_FILTER_WRITE, flags),
(client.fileno(), select.KQ_FILTER_READ, flags),
(server.fileno(), select.KQ_FILTER_WRITE, flags),
(server.fileno(), select.KQ_FILTER_READ, flags)])
# Remove completely client, and server read part
ev = select.kevent(client.fileno(),
select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
kq.control([ev], 0)
ev = select.kevent(client.fileno(),
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
kq.control([ev], 0)
ev = select.kevent(server.fileno(),
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
kq.control([ev], 0, 0)
events = kq.control([], 4, 0.99)
events = [(e.ident, e.filter, e.flags) for e in events]
events.sort()
self.assertEquals(events, [
(server.fileno(), select.KQ_FILTER_WRITE, flags)])
client.close()
server.close()
serverSocket.close()
def test_main():
support.run_unittest(TestKQueue)
if __name__ == "__main__":
test_main()