Issue #7993: Add a test of IO packet processing bandwidth to ccbench.

It measures the number of UDP packets processed per second depending on
the number of background CPU-bound Python threads.
This commit is contained in:
Antoine Pitrou 2010-03-13 21:21:30 +00:00
parent a231e45fb1
commit 02b3f0ab0d
2 changed files with 156 additions and 3 deletions

View File

@ -54,6 +54,12 @@ Extension Modules
integer codes for which it was used differed between native packing integer codes for which it was used differed between native packing
and standard packing.) and standard packing.)
Tools/Demos
-----------
- Issue #7993: Add a test of IO packet processing bandwidth to ccbench.
It measures the number of UDP packets processed per second depending on
the number of background CPU-bound Python threads.
What's New in Python 2.7 alpha 4? What's New in Python 2.7 alpha 4?

View File

@ -36,6 +36,9 @@ THROUGHPUT_DURATION = 2.0
LATENCY_PING_INTERVAL = 0.1 LATENCY_PING_INTERVAL = 0.1
LATENCY_DURATION = 2.0 LATENCY_DURATION = 2.0
BANDWIDTH_PACKET_SIZE = 1024
BANDWIDTH_DURATION = 2.0
def task_pidigits(): def task_pidigits():
"""Pi calculation (Python)""" """Pi calculation (Python)"""
@ -149,6 +152,7 @@ else:
throughput_tasks.append(task_compress_zlib) throughput_tasks.append(task_compress_zlib)
latency_tasks = throughput_tasks latency_tasks = throughput_tasks
bandwidth_tasks = [task_pidigits]
class TimedLoop: class TimedLoop:
@ -394,6 +398,133 @@ def run_latency_tests(max_threads):
print() print()
BW_END = "END"
def bandwidth_client(addr, packet_size, duration):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("127.0.0.1", 0))
local_addr = sock.getsockname()
_time = time.time
_sleep = time.sleep
def _send_chunk(msg):
_sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
# We give the parent some time to be ready.
_sleep(1.0)
try:
start_time = _time()
end_time = start_time + duration * 2.0
i = 0
while _time() < end_time:
_send_chunk(str(i))
s = _recv(sock, packet_size)
assert len(s) == packet_size
i += 1
_send_chunk(BW_END)
finally:
sock.close()
def run_bandwidth_client(**kwargs):
cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
cmd_line.extend(['--bwclient', repr(kwargs)])
return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
#stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def run_bandwidth_test(func, args, nthreads):
# Create a listening socket to receive the packets. We use UDP which should
# be painlessly cross-platform.
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(("127.0.0.1", 0))
addr = sock.getsockname()
duration = BANDWIDTH_DURATION
packet_size = BANDWIDTH_PACKET_SIZE
results = []
threads = []
end_event = []
start_cond = threading.Condition()
started = False
if nthreads > 0:
# Warm up
func(*args)
results = []
loop = TimedLoop(func, args)
ready = []
ready_cond = threading.Condition()
def run():
with ready_cond:
ready.append(None)
ready_cond.notify()
with start_cond:
while not started:
start_cond.wait()
loop(start_time, duration * 1.5, end_event, do_yield=False)
for i in range(nthreads):
threads.append(threading.Thread(target=run))
for t in threads:
t.setDaemon(True)
t.start()
# Wait for threads to be ready
with ready_cond:
while len(ready) < nthreads:
ready_cond.wait()
# Run the client and wait for the first packet to arrive before
# unblocking the background threads.
process = run_bandwidth_client(addr=addr,
packet_size=packet_size,
duration=duration)
_time = time.time
# This will also wait for the parent to be ready
s = _recv(sock, packet_size)
remote_addr = eval(s.partition('#')[0])
with start_cond:
start_time = _time()
started = True
start_cond.notify(nthreads)
n = 0
first_time = None
while not end_event and BW_END not in s:
_sendto(sock, s, remote_addr)
s = _recv(sock, packet_size)
if first_time is None:
first_time = _time()
n += 1
end_time = _time()
end_event.append(None)
for t in threads:
t.join()
process.kill()
return (n - 1) / (end_time - first_time)
def run_bandwidth_tests(max_threads):
for task in bandwidth_tasks:
print("Background CPU task:", task.__doc__)
print()
func, args = task()
nthreads = 0
baseline_speed = None
while nthreads <= max_threads:
results = run_bandwidth_test(func, args, nthreads)
speed = results
#speed = len(results) * 1.0 / results[-1][0]
print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
if baseline_speed is None:
print(" packets/s.")
baseline_speed = speed
else:
print(" ( %d %%)" % (speed / baseline_speed * 100))
nthreads += 1
print()
def main(): def main():
usage = "usage: %prog [-h|--help] [options]" usage = "usage: %prog [-h|--help] [options]"
parser = OptionParser(usage=usage) parser = OptionParser(usage=usage)
@ -403,6 +534,9 @@ def main():
parser.add_option("-l", "--latency", parser.add_option("-l", "--latency",
action="store_true", dest="latency", default=False, action="store_true", dest="latency", default=False,
help="run latency tests") help="run latency tests")
parser.add_option("-b", "--bandwidth",
action="store_true", dest="bandwidth", default=False,
help="run I/O bandwidth tests")
parser.add_option("-i", "--interval", parser.add_option("-i", "--interval",
action="store", type="int", dest="check_interval", default=None, action="store", type="int", dest="check_interval", default=None,
help="sys.setcheckinterval() value") help="sys.setcheckinterval() value")
@ -413,10 +547,13 @@ def main():
action="store", type="int", dest="nthreads", default=4, action="store", type="int", dest="nthreads", default=4,
help="max number of threads in tests") help="max number of threads in tests")
# Hidden option to run the pinging client # Hidden option to run the pinging and bandwidth clients
parser.add_option("", "--latclient", parser.add_option("", "--latclient",
action="store", dest="latclient", default=None, action="store", dest="latclient", default=None,
help=SUPPRESS_HELP) help=SUPPRESS_HELP)
parser.add_option("", "--bwclient",
action="store", dest="bwclient", default=None,
help=SUPPRESS_HELP)
options, args = parser.parse_args() options, args = parser.parse_args()
if args: if args:
@ -427,8 +564,13 @@ def main():
latency_client(**kwargs) latency_client(**kwargs)
return return
if not options.throughput and not options.latency: if options.bwclient:
options.throughput = options.latency = True kwargs = eval(options.bwclient)
bandwidth_client(**kwargs)
return
if not options.throughput and not options.latency and not options.bandwidth:
options.throughput = options.latency = options.bandwidth = True
if options.check_interval: if options.check_interval:
sys.setcheckinterval(options.check_interval) sys.setcheckinterval(options.check_interval)
if options.switch_interval: if options.switch_interval:
@ -458,5 +600,10 @@ def main():
print() print()
run_latency_tests(options.nthreads) run_latency_tests(options.nthreads)
if options.bandwidth:
print("--- I/O bandwidth ---")
print()
run_bandwidth_tests(options.nthreads)
if __name__ == "__main__": if __name__ == "__main__":
main() main()