Issue #7449, part 8: don't skip the whole test_asynchat if threading is missing

TestFifo can be executed without the threading module
This commit is contained in:
Victor Stinner 2010-04-27 23:03:16 +00:00
parent be595d336c
commit 09227b9111
1 changed files with 77 additions and 74 deletions

View File

@ -1,99 +1,102 @@
# test asynchat
import asyncore, asynchat, socket, threading, time
import asyncore, asynchat, socket, time
import unittest
import sys
from test import test_support
# Skip tests if thread module does not exist.
test_support.import_module('thread')
try:
import threading
except ImportError:
threading = None
HOST = test_support.HOST
SERVER_QUIT = 'QUIT\n'
class echo_server(threading.Thread):
# parameter to determine the number of bytes passed back to the
# client each send
chunk_size = 1
if threading:
class echo_server(threading.Thread):
# parameter to determine the number of bytes passed back to the
# client each send
chunk_size = 1
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = test_support.bind_port(self.sock)
# This will be set if the client wants us to wait before echoing data
# back.
self.start_resend_event = None
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = test_support.bind_port(self.sock)
# This will be set if the client wants us to wait before echoing data
# back.
self.start_resend_event = None
def run(self):
self.sock.listen(1)
self.event.set()
conn, client = self.sock.accept()
self.buffer = ""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
self.buffer = self.buffer + data
def run(self):
self.sock.listen(1)
self.event.set()
conn, client = self.sock.accept()
self.buffer = ""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
self.buffer = self.buffer + data
# remove the SERVER_QUIT message
self.buffer = self.buffer.replace(SERVER_QUIT, '')
# remove the SERVER_QUIT message
self.buffer = self.buffer.replace(SERVER_QUIT, '')
if self.start_resend_event:
self.start_resend_event.wait()
if self.start_resend_event:
self.start_resend_event.wait()
# re-send entire set of collected data
try:
# this may fail on some tests, such as test_close_when_done, since
# the client closes the channel when it's done sending
while self.buffer:
n = conn.send(self.buffer[:self.chunk_size])
time.sleep(0.001)
self.buffer = self.buffer[n:]
except:
# re-send entire set of collected data
try:
# this may fail on some tests, such as test_close_when_done, since
# the client closes the channel when it's done sending
while self.buffer:
n = conn.send(self.buffer[:self.chunk_size])
time.sleep(0.001)
self.buffer = self.buffer[n:]
except:
pass
conn.close()
self.sock.close()
class echo_client(asynchat.async_chat):
def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = ''
def handle_connect(self):
pass
conn.close()
self.sock.close()
if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests
# on darwin, so just ignore it
def handle_expt(self):
pass
class echo_client(asynchat.async_chat):
def collect_incoming_data(self, data):
self.buffer += data
def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = ''
def handle_connect(self):
pass
if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests
# on darwin, so just ignore it
def handle_expt(self):
pass
def collect_incoming_data(self, data):
self.buffer += data
def found_terminator(self):
self.contents.append(self.buffer)
self.buffer = ""
def found_terminator(self):
self.contents.append(self.buffer)
self.buffer = ""
def start_echo_server():
event = threading.Event()
s = echo_server(event)
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
return s, event
def start_echo_server():
event = threading.Event()
s = echo_server(event)
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
return s, event
@unittest.skipUnless(threading, 'Threading required for this test.')
class TestAsynchat(unittest.TestCase):
usepoll = False