Convert test_poll to unittest.

This commit is contained in:
Georg Brandl 2006-10-29 19:20:45 +00:00
parent 3163179f1b
commit eecce795a3
2 changed files with 118 additions and 173 deletions

View File

@ -1,19 +0,0 @@
test_poll
Running poll test 1
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
Poll test 1 complete
Running poll test 2
Poll test 2 complete
Running poll test 3
Poll test 3 complete

View File

@ -1,7 +1,7 @@
# Test case for the os.poll() function
import sys, os, select, random
from test.test_support import verify, verbose, TestSkipped, TESTFN
import sys, os, select, random, unittest
from test.test_support import TestSkipped, TESTFN, run_unittest
try:
select.poll
@ -16,177 +16,141 @@ def find_ready_matching(ready, flag):
match.append(fd)
return match
def test_poll1():
"""Basic functional test of poll object
class PollTests(unittest.TestCase):
Create a bunch of pipe and test that poll works with them.
"""
print 'Running poll test 1'
p = select.poll()
def test_poll1(self):
# Basic functional test of poll object
# Create a bunch of pipe and test that poll works with them.
p = select.poll()
NUM_PIPES = 12
MSG = " This is a test."
MSG_LEN = len(MSG)
readers = []
writers = []
r2w = {}
w2r = {}
NUM_PIPES = 12
MSG = " This is a test."
MSG_LEN = len(MSG)
readers = []
writers = []
r2w = {}
w2r = {}
for i in range(NUM_PIPES):
rd, wr = os.pipe()
p.register(rd, select.POLLIN)
p.register(wr, select.POLLOUT)
readers.append(rd)
writers.append(wr)
r2w[rd] = wr
w2r[wr] = rd
for i in range(NUM_PIPES):
rd, wr = os.pipe()
p.register(rd, select.POLLIN)
p.register(wr, select.POLLOUT)
readers.append(rd)
writers.append(wr)
r2w[rd] = wr
w2r[wr] = rd
while writers:
ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT)
if not ready_writers:
raise RuntimeError, "no pipes ready for writing"
wr = random.choice(ready_writers)
os.write(wr, MSG)
bufs = []
ready = p.poll()
ready_readers = find_ready_matching(ready, select.POLLIN)
if not ready_readers:
raise RuntimeError, "no pipes ready for reading"
rd = random.choice(ready_readers)
buf = os.read(rd, MSG_LEN)
verify(len(buf) == MSG_LEN)
print buf
os.close(r2w[rd]) ; os.close( rd )
p.unregister( r2w[rd] )
p.unregister( rd )
writers.remove(r2w[rd])
while writers:
ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT)
if not ready_writers:
raise RuntimeError, "no pipes ready for writing"
wr = random.choice(ready_writers)
os.write(wr, MSG)
poll_unit_tests()
print 'Poll test 1 complete'
ready = p.poll()
ready_readers = find_ready_matching(ready, select.POLLIN)
if not ready_readers:
raise RuntimeError, "no pipes ready for reading"
rd = random.choice(ready_readers)
buf = os.read(rd, MSG_LEN)
self.assertEqual(len(buf), MSG_LEN)
bufs.append(buf)
os.close(r2w[rd]) ; os.close( rd )
p.unregister( r2w[rd] )
p.unregister( rd )
writers.remove(r2w[rd])
def poll_unit_tests():
# returns NVAL for invalid file descriptor
FD = 42
try:
os.close(FD)
except OSError:
pass
p = select.poll()
p.register(FD)
r = p.poll()
verify(r[0] == (FD, select.POLLNVAL))
self.assertEqual(bufs, [MSG] * NUM_PIPES)
f = open(TESTFN, 'w')
fd = f.fileno()
p = select.poll()
p.register(f)
r = p.poll()
verify(r[0][0] == fd)
f.close()
r = p.poll()
verify(r[0] == (fd, select.POLLNVAL))
os.unlink(TESTFN)
def poll_unit_tests(self):
# returns NVAL for invalid file descriptor
FD = 42
try:
os.close(FD)
except OSError:
pass
p = select.poll()
p.register(FD)
r = p.poll()
self.assertEqual(r[0], (FD, select.POLLNVAL))
# type error for invalid arguments
p = select.poll()
try:
p.register(p)
except TypeError:
pass
else:
print "Bogus register call did not raise TypeError"
try:
p.unregister(p)
except TypeError:
pass
else:
print "Bogus unregister call did not raise TypeError"
f = open(TESTFN, 'w')
fd = f.fileno()
p = select.poll()
p.register(f)
r = p.poll()
self.assertEqual(r[0][0], fd)
f.close()
r = p.poll()
self.assertEqual(r[0], (fd, select.POLLNVAL))
os.unlink(TESTFN)
# can't unregister non-existent object
p = select.poll()
try:
p.unregister(3)
except KeyError:
pass
else:
print "Bogus unregister call did not raise KeyError"
# type error for invalid arguments
p = select.poll()
self.assertRaises(TypeError, p.register, p)
self.assertRaises(TypeError, p.unregister, p)
# Test error cases
pollster = select.poll()
class Nope:
pass
# can't unregister non-existent object
p = select.poll()
self.assertRaises(KeyError, p.unregister, 3)
class Almost:
def fileno(self):
return 'fileno'
# Test error cases
pollster = select.poll()
class Nope:
pass
try:
pollster.register( Nope(), 0 )
except TypeError: pass
else: print 'expected TypeError exception, not raised'
class Almost:
def fileno(self):
return 'fileno'
try:
pollster.register( Almost(), 0 )
except TypeError: pass
else: print 'expected TypeError exception, not raised'
self.assertRaises(TypeError, pollster.register, Nope(), 0)
self.assertRaises(TypeError, pollster.register, Almost(), 0)
# Another test case for poll(). This is copied from the test case for
# select(), modified to use poll() instead.
# Another test case for poll(). This is copied from the test case for
# select(), modified to use poll() instead.
def test_poll2(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r')
pollster = select.poll()
pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
fdlist = pollster.poll(tout)
if (fdlist == []):
continue
fd, flags = fdlist[0]
if flags & select.POLLHUP:
line = p.readline()
if line != "":
self.fail('error: pipe seems to be closed, but still returns data')
continue
def test_poll2():
print 'Running poll test 2'
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r')
pollster = select.poll()
pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
if verbose:
print 'timeout =', tout
fdlist = pollster.poll(tout)
if (fdlist == []):
continue
fd, flags = fdlist[0]
if flags & select.POLLHUP:
line = p.readline()
if line != "":
print 'error: pipe seems to be closed, but still returns data'
continue
elif flags & select.POLLIN:
line = p.readline()
if not line:
break
continue
else:
self.fail('Unexpected return value from select.poll: %s' % fdlist)
p.close()
elif flags & select.POLLIN:
line = p.readline()
if verbose:
print repr(line)
if not line:
if verbose:
print 'EOF'
break
continue
else:
print 'Unexpected return value from select.poll:', fdlist
p.close()
print 'Poll test 2 complete'
def test_poll3(self):
# test int overflow
pollster = select.poll()
pollster.register(1)
def test_poll3():
# test int overflow
print 'Running poll test 3'
pollster = select.poll()
pollster.register(1)
self.assertRaises(OverflowError, pollster.poll, 1L << 64)
try:
pollster.poll(1L << 64)
except OverflowError:
pass
else:
print 'Expected OverflowError with excessive timeout'
x = 2 + 3
if x != 5:
self.fail('Overflow must have occurred')
x = 2 + 3
if x != 5:
print 'Overflow must have occurred'
print 'Poll test 3 complete'
def test_main():
run_unittest(PollTests)
test_poll1()
test_poll2()
test_poll3()
if __name__ == '__main__':
test_main()