2011-11-14 14:07:41 -04:00
|
|
|
# Test case for the select.devpoll() function
|
|
|
|
|
|
|
|
# Initial tests are copied as is from "test_poll.py"
|
|
|
|
|
|
|
|
import os, select, random, unittest, sys
|
|
|
|
from test.support import TESTFN, run_unittest
|
|
|
|
|
|
|
|
try:
|
|
|
|
select.devpoll
|
|
|
|
except AttributeError:
|
2012-11-14 16:16:53 -04:00
|
|
|
raise unittest.SkipTest("select.devpoll not defined")
|
2011-11-14 14:07:41 -04:00
|
|
|
|
|
|
|
|
|
|
|
def find_ready_matching(ready, flag):
|
|
|
|
match = []
|
|
|
|
for fd, mode in ready:
|
|
|
|
if mode & flag:
|
|
|
|
match.append(fd)
|
|
|
|
return match
|
|
|
|
|
|
|
|
class DevPollTests(unittest.TestCase):
|
|
|
|
|
|
|
|
def test_devpoll1(self):
|
|
|
|
# Basic functional test of poll object
|
|
|
|
# Create a bunch of pipe and test that poll works with them.
|
|
|
|
|
|
|
|
p = select.devpoll()
|
|
|
|
|
|
|
|
NUM_PIPES = 12
|
|
|
|
MSG = b" This is a test."
|
|
|
|
MSG_LEN = len(MSG)
|
|
|
|
readers = []
|
|
|
|
writers = []
|
|
|
|
r2w = {}
|
|
|
|
w2r = {}
|
|
|
|
|
|
|
|
for i in range(NUM_PIPES):
|
|
|
|
rd, wr = os.pipe()
|
|
|
|
p.register(rd)
|
|
|
|
p.modify(rd, select.POLLIN)
|
|
|
|
p.register(wr, select.POLLOUT)
|
|
|
|
readers.append(rd)
|
|
|
|
writers.append(wr)
|
|
|
|
r2w[rd] = wr
|
|
|
|
w2r[wr] = rd
|
|
|
|
|
|
|
|
bufs = []
|
|
|
|
|
|
|
|
while writers:
|
|
|
|
ready = p.poll()
|
|
|
|
ready_writers = find_ready_matching(ready, select.POLLOUT)
|
|
|
|
if not ready_writers:
|
|
|
|
self.fail("no pipes ready for writing")
|
|
|
|
wr = random.choice(ready_writers)
|
|
|
|
os.write(wr, MSG)
|
|
|
|
|
|
|
|
ready = p.poll()
|
|
|
|
ready_readers = find_ready_matching(ready, select.POLLIN)
|
|
|
|
if not ready_readers:
|
|
|
|
self.fail("no pipes ready for reading")
|
|
|
|
self.assertEqual([w2r[wr]], ready_readers)
|
|
|
|
rd = ready_readers[0]
|
|
|
|
buf = os.read(rd, MSG_LEN)
|
|
|
|
self.assertEqual(len(buf), MSG_LEN)
|
|
|
|
bufs.append(buf)
|
|
|
|
os.close(r2w[rd]) ; os.close(rd)
|
|
|
|
p.unregister(r2w[rd])
|
|
|
|
p.unregister(rd)
|
|
|
|
writers.remove(r2w[rd])
|
|
|
|
|
|
|
|
self.assertEqual(bufs, [MSG] * NUM_PIPES)
|
|
|
|
|
|
|
|
def test_timeout_overflow(self):
|
|
|
|
pollster = select.devpoll()
|
|
|
|
w, r = os.pipe()
|
|
|
|
pollster.register(w)
|
|
|
|
|
|
|
|
pollster.poll(-1)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, -2)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, -1 << 31)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, -1 << 64)
|
|
|
|
|
|
|
|
pollster.poll(0)
|
|
|
|
pollster.poll(1)
|
|
|
|
pollster.poll(1 << 30)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, 1 << 31)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, 1 << 63)
|
|
|
|
self.assertRaises(OverflowError, pollster.poll, 1 << 64)
|
|
|
|
|
|
|
|
def test_main():
|
|
|
|
run_unittest(DevPollTests)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
test_main()
|