Stab at SF 1010777: test_queue fails occasionally
test_queue has failed occasionally for years, and there's more than one cause. The primary cause in the SF report appears to be that the test driver really needs entirely different code for thread tests that expect to raise exceptions than for thread tests that are testing non-exceptional blocking semantics. So gave them entirely different code, and added a ton of explanation. Another cause is that the blocking thread tests relied in several places on the difference between sleep(.1) and sleep(.2) being long enough for the trigger thread to do its stuff sot that the blocking thread could make progress. That's just not reliable on a loaded machine. Boosted the 0.2's to 10.0's instead, which should be long enough under any non-catastrophic system conditions. That doesn't make the test take longer to run, the 10.0 is just how long the blocking thread is *willing* to wait for the trigger thread to do something. But if the Queue module is plain broken, such tests will indeed take 10 seconds to fail now. For similar (heavy load) reasons, changed threaded-test termination to be willing to wait 10 seconds for the signal thread to end too.
This commit is contained in:
parent
afe5297b8a
commit
8d7626c23f
|
@ -18,26 +18,62 @@ class _TriggerThread(threading.Thread):
|
|||
threading.Thread.__init__(self)
|
||||
|
||||
def run(self):
|
||||
time.sleep(.1)
|
||||
# The sleep isn't necessary, but is intended to give the blocking
|
||||
# function in the main thread a chance at actually blocking before
|
||||
# we unclog it. But if the sleep is longer than the timeout-based
|
||||
# tests wait in their blocking functions, those tests will fail.
|
||||
# So we give them much longer timeout values compared to the
|
||||
# sleep here (I aimed at 10 seconds for blocking functions --
|
||||
# they should never actually wait that long - they should make
|
||||
# progress as soon as we call self.fn()).
|
||||
time.sleep(0.1)
|
||||
self.startedEvent.set()
|
||||
self.fn(*self.args)
|
||||
|
||||
# Execute a function that blocks, and in a seperate thread, a function that
|
||||
# Execute a function that blocks, and in a separate thread, a function that
|
||||
# triggers the release. Returns the result of the blocking function.
|
||||
# Caution: block_func must guarantee to block until trigger_func is
|
||||
# called, and trigger_func must guarantee to change queue state so that
|
||||
# block_func can make enough progress to return. In particular, a
|
||||
# block_func that just raises an exception regardless of whether trigger_func
|
||||
# is called will lead to timing-dependent sporadic failures, and one of
|
||||
# those went rarely seen but undiagnosed for years. Now block_func
|
||||
# must be unexceptional. If block_func is supposed to raise an exception,
|
||||
# call _doExceptionalBlockingTest() instead.
|
||||
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
|
||||
t = _TriggerThread(trigger_func, trigger_args)
|
||||
t.start()
|
||||
result = block_func(*block_args)
|
||||
# If block_func returned before our thread made the call, we failed!
|
||||
if not t.startedEvent.isSet():
|
||||
raise TestFailed("blocking function '%r' appeared not to block" %
|
||||
block_func)
|
||||
t.join(10) # make sure the thread terminates
|
||||
if t.isAlive():
|
||||
raise TestFailed("trigger function '%r' appeared to not return" %
|
||||
trigger_func)
|
||||
return result
|
||||
|
||||
# Call this instead if block_func is supposed to raise an exception.
|
||||
def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
|
||||
trigger_args, expected_exception_class):
|
||||
t = _TriggerThread(trigger_func, trigger_args)
|
||||
t.start()
|
||||
try:
|
||||
return block_func(*block_args)
|
||||
try:
|
||||
block_func(*block_args)
|
||||
except expected_exception_class:
|
||||
raise
|
||||
else:
|
||||
raise TestFailed("expected exception of kind %r" %
|
||||
expected_exception_class)
|
||||
finally:
|
||||
# If we unblocked before our thread made the call, we failed!
|
||||
if not t.startedEvent.isSet():
|
||||
raise TestFailed("blocking function '%r' appeared not to block" %
|
||||
block_func)
|
||||
t.join(1) # make sure the thread terminates
|
||||
t.join(10) # make sure the thread terminates
|
||||
if t.isAlive():
|
||||
raise TestFailed("trigger function '%r' appeared to not return" %
|
||||
trigger_func)
|
||||
if not t.startedEvent.isSet():
|
||||
raise TestFailed("trigger thread ended but event never set")
|
||||
|
||||
# A Queue subclass that can provoke failure at a moment's notice :)
|
||||
class FailingQueueException(Exception):
|
||||
|
@ -92,7 +128,8 @@ def FailingQueueTest(q):
|
|||
# Test a failing timeout put
|
||||
q.fail_next_put = True
|
||||
try:
|
||||
_doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
|
||||
_doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
|
||||
FailingQueueException)
|
||||
raise TestFailed("The queue didn't fail when it should have")
|
||||
except FailingQueueException:
|
||||
pass
|
||||
|
@ -129,7 +166,8 @@ def FailingQueueTest(q):
|
|||
verify(q.empty(), "Queue should be empty")
|
||||
q.fail_next_get = True
|
||||
try:
|
||||
_doBlockingTest( q.get, (), q.put, ('empty',))
|
||||
_doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
|
||||
FailingQueueException)
|
||||
raise TestFailed("The queue didn't fail when it should have")
|
||||
except FailingQueueException:
|
||||
pass
|
||||
|
@ -148,6 +186,7 @@ def SimpleQueueTest(q):
|
|||
"Didn't seem to queue the correct data!")
|
||||
for i in range(QUEUE_SIZE-1):
|
||||
q.put(i)
|
||||
verify(not q.empty(), "Queue should not be empty")
|
||||
verify(not q.full(), "Queue should not be full")
|
||||
q.put("last")
|
||||
verify(q.full(), "Queue should be full")
|
||||
|
@ -157,13 +196,13 @@ def SimpleQueueTest(q):
|
|||
except Queue.Full:
|
||||
pass
|
||||
try:
|
||||
q.put("full", timeout=0.1)
|
||||
q.put("full", timeout=0.01)
|
||||
raise TestFailed("Didn't appear to time-out with a full queue")
|
||||
except Queue.Full:
|
||||
pass
|
||||
# Test a blocking put
|
||||
_doBlockingTest(q.put, ("full",), q.get, ())
|
||||
_doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
|
||||
_doBlockingTest(q.put, ("full", True, 10), q.get, ())
|
||||
# Empty it
|
||||
for i in range(QUEUE_SIZE):
|
||||
q.get()
|
||||
|
@ -174,13 +213,13 @@ def SimpleQueueTest(q):
|
|||
except Queue.Empty:
|
||||
pass
|
||||
try:
|
||||
q.get(timeout=0.1)
|
||||
q.get(timeout=0.01)
|
||||
raise TestFailed("Didn't appear to time-out with an empty queue")
|
||||
except Queue.Empty:
|
||||
pass
|
||||
# Test a blocking get
|
||||
_doBlockingTest(q.get, (), q.put, ('empty',))
|
||||
_doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
|
||||
_doBlockingTest(q.get, (True, 10), q.put, ('empty',))
|
||||
|
||||
def test():
|
||||
q = Queue.Queue(QUEUE_SIZE)
|
||||
|
|
Loading…
Reference in New Issue