# This file should be kept compatible with both Python 2.6 and Python >= 3.0. from __future__ import division from __future__ import print_function """ ccbench, a Python concurrency benchmark. """ import time import os import sys import itertools import threading import subprocess import socket from optparse import OptionParser, SUPPRESS_HELP import platform # Compatibility try: xrange except NameError: xrange = range try: map = itertools.imap except AttributeError: pass THROUGHPUT_DURATION = 2.0 LATENCY_PING_INTERVAL = 0.1 LATENCY_DURATION = 2.0 BANDWIDTH_PACKET_SIZE = 1024 BANDWIDTH_DURATION = 2.0 def task_pidigits(): """Pi calculation (Python)""" _map = map _count = itertools.count _islice = itertools.islice def calc_ndigits(n): # From http://shootout.alioth.debian.org/ def gen_x(): return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) def compose(a, b): aq, ar, as_, at = a bq, br, bs, bt = b return (aq * bq, aq * br + ar * bt, as_ * bq + at * bs, as_ * br + at * bt) def extract(z, j): q, r, s, t = z return (q*j + r) // (s*j + t) def pi_digits(): z = (1, 0, 0, 1) x = gen_x() while 1: y = extract(z, 3) while y != extract(z, 4): z = compose(z, next(x)) y = extract(z, 3) z = compose((10, -10*y, 0, 1), z) yield y return list(_islice(pi_digits(), n)) return calc_ndigits, (50, ) def task_regex(): """regular expression (C)""" # XXX this task gives horrendous latency results. import re # Taken from the `inspect` module pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) with open(__file__, "r") as f: arg = f.read(2000) def findall(s): t = time.time() try: return pat.findall(s) finally: print(time.time() - t) return pat.findall, (arg, ) def task_sort(): """list sorting (C)""" def list_sort(l): l = l[::-1] l.sort() return list_sort, (list(range(1000)), ) def task_compress_zlib(): """zlib compression (C)""" import zlib with open(__file__, "rb") as f: arg = f.read(5000) * 3 def compress(s): zlib.decompress(zlib.compress(s, 5)) return compress, (arg, ) def task_compress_bz2(): """bz2 compression (C)""" import bz2 with open(__file__, "rb") as f: arg = f.read(3000) * 2 def compress(s): bz2.compress(s) return compress, (arg, ) def task_hashing(): """SHA1 hashing (C)""" import hashlib with open(__file__, "rb") as f: arg = f.read(5000) * 30 def compute(s): hashlib.sha1(s).digest() return compute, (arg, ) throughput_tasks = [task_pidigits, task_regex] for mod in 'bz2', 'hashlib': try: globals()[mod] = __import__(mod) except ImportError: globals()[mod] = None # For whatever reasons, zlib gives irregular results, so we prefer bz2 or # hashlib if available. # (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) if bz2 is not None: throughput_tasks.append(task_compress_bz2) elif hashlib is not None: throughput_tasks.append(task_hashing) else: throughput_tasks.append(task_compress_zlib) latency_tasks = throughput_tasks bandwidth_tasks = [task_pidigits] class TimedLoop: def __init__(self, func, args): self.func = func self.args = args def __call__(self, start_time, min_duration, end_event, do_yield=False): step = 20 niters = 0 duration = 0.0 _time = time.time _sleep = time.sleep _func = self.func _args = self.args t1 = start_time while True: for i in range(step): _func(*_args) t2 = _time() # If another thread terminated, the current measurement is invalid # => return the previous one. if end_event: return niters, duration niters += step duration = t2 - start_time if duration >= min_duration: end_event.append(None) return niters, duration if t2 - t1 < 0.01: # Minimize interference of measurement on overall runtime step = step * 3 // 2 elif do_yield: # OS scheduling of Python threads is sometimes so bad that we # have to force thread switching ourselves, otherwise we get # completely useless results. _sleep(0.0001) t1 = t2 def run_throughput_test(func, args, nthreads): assert nthreads >= 1 # Warm up func(*args) results = [] loop = TimedLoop(func, args) end_event = [] if nthreads == 1: # Pure single-threaded performance, without any switching or # synchronization overhead. start_time = time.time() results.append(loop(start_time, THROUGHPUT_DURATION, end_event, do_yield=False)) return results started = False ready_cond = threading.Condition() start_cond = threading.Condition() ready = [] def run(): with ready_cond: ready.append(None) ready_cond.notify() with start_cond: while not started: start_cond.wait() results.append(loop(start_time, THROUGHPUT_DURATION, end_event, do_yield=True)) threads = [] for i in range(nthreads): threads.append(threading.Thread(target=run)) for t in threads: t.setDaemon(True) t.start() # We don't want measurements to include thread startup overhead, # so we arrange for timing to start after all threads are ready. with ready_cond: while len(ready) < nthreads: ready_cond.wait() with start_cond: start_time = time.time() started = True start_cond.notify(nthreads) for t in threads: t.join() return results def run_throughput_tests(max_threads): for task in throughput_tasks: print(task.__doc__) print() func, args = task() nthreads = 1 baseline_speed = None while nthreads <= max_threads: results = run_throughput_test(func, args, nthreads) # Taking the max duration rather than average gives pessimistic # results rather than optimistic. speed = sum(r[0] for r in results) / max(r[1] for r in results) print("threads=%d: %d" % (nthreads, speed), end="") if baseline_speed is None: print(" iterations/s.") baseline_speed = speed else: print(" ( %d %%)" % (speed / baseline_speed * 100)) nthreads += 1 print() LAT_END = "END" def _sendto(sock, s, addr): sock.sendto(s.encode('ascii'), addr) def _recv(sock, n): return sock.recv(n).decode('ascii') def latency_client(addr, nb_pings, interval): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: _time = time.time _sleep = time.sleep def _ping(): _sendto(sock, "%r\n" % _time(), addr) # The first ping signals the parent process that we are ready. _ping() # We give the parent a bit of time to notice. _sleep(1.0) for i in range(nb_pings): _sleep(interval) _ping() _sendto(sock, LAT_END + "\n", addr) finally: sock.close() def run_latency_client(**kwargs): cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] cmd_line.extend(['--latclient', repr(kwargs)]) return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def run_latency_test(func, args, nthreads): # Create a listening socket to receive the pings. We use UDP which should # be painlessly cross-platform. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("127.0.0.1", 0)) addr = sock.getsockname() interval = LATENCY_PING_INTERVAL duration = LATENCY_DURATION nb_pings = int(duration / interval) results = [] threads = [] end_event = [] start_cond = threading.Condition() started = False if nthreads > 0: # Warm up func(*args) results = [] loop = TimedLoop(func, args) ready = [] ready_cond = threading.Condition() def run(): with ready_cond: ready.append(None) ready_cond.notify() with start_cond: while not started: start_cond.wait() loop(start_time, duration * 1.5, end_event, do_yield=False) for i in range(nthreads): threads.append(threading.Thread(target=run)) for t in threads: t.setDaemon(True) t.start() # Wait for threads to be ready with ready_cond: while len(ready) < nthreads: ready_cond.wait() # Run the client and wait for the first ping(s) to arrive before # unblocking the background threads. chunks = [] process = run_latency_client(addr=sock.getsockname(), nb_pings=nb_pings, interval=interval) s = _recv(sock, 4096) _time = time.time with start_cond: start_time = _time() started = True start_cond.notify(nthreads) while LAT_END not in s: s = _recv(sock, 4096) t = _time() chunks.append((t, s)) # Tell the background threads to stop. end_event.append(None) for t in threads: t.join() process.wait() sock.close() for recv_time, chunk in chunks: # NOTE: it is assumed that a line sent by a client wasn't received # in two chunks because the lines are very small. for line in chunk.splitlines(): line = line.strip() if line and line != LAT_END: send_time = eval(line) assert isinstance(send_time, float) results.append((send_time, recv_time)) return results def run_latency_tests(max_threads): for task in latency_tasks: print("Background CPU task:", task.__doc__) print() func, args = task() nthreads = 0 while nthreads <= max_threads: results = run_latency_test(func, args, nthreads) n = len(results) # We print out milliseconds lats = [1000 * (t2 - t1) for (t1, t2) in results] #print(list(map(int, lats))) avg = sum(lats) / n dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") print() #print(" [... from %d samples]" % n) nthreads += 1 print() BW_END = "END" def bandwidth_client(addr, packet_size, duration): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind(("127.0.0.1", 0)) local_addr = sock.getsockname() _time = time.time _sleep = time.sleep def _send_chunk(msg): _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) # We give the parent some time to be ready. _sleep(1.0) try: start_time = _time() end_time = start_time + duration * 2.0 i = 0 while _time() < end_time: _send_chunk(str(i)) s = _recv(sock, packet_size) assert len(s) == packet_size i += 1 _send_chunk(BW_END) finally: sock.close() def run_bandwidth_client(**kwargs): cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] cmd_line.extend(['--bwclient', repr(kwargs)]) return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) def run_bandwidth_test(func, args, nthreads): # Create a listening socket to receive the packets. We use UDP which should # be painlessly cross-platform. with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.bind(("127.0.0.1", 0)) addr = sock.getsockname() duration = BANDWIDTH_DURATION packet_size = BANDWIDTH_PACKET_SIZE results = [] threads = [] end_event = [] start_cond = threading.Condition() started = False if nthreads > 0: # Warm up func(*args) results = [] loop = TimedLoop(func, args) ready = [] ready_cond = threading.Condition() def run(): with ready_cond: ready.append(None) ready_cond.notify() with start_cond: while not started: start_cond.wait() loop(start_time, duration * 1.5, end_event, do_yield=False) for i in range(nthreads): threads.append(threading.Thread(target=run)) for t in threads: t.setDaemon(True) t.start() # Wait for threads to be ready with ready_cond: while len(ready) < nthreads: ready_cond.wait() # Run the client and wait for the first packet to arrive before # unblocking the background threads. process = run_bandwidth_client(addr=addr, packet_size=packet_size, duration=duration) _time = time.time # This will also wait for the parent to be ready s = _recv(sock, packet_size) remote_addr = eval(s.partition('#')[0]) with start_cond: start_time = _time() started = True start_cond.notify(nthreads) n = 0 first_time = None while not end_event and BW_END not in s: _sendto(sock, s, remote_addr) s = _recv(sock, packet_size) if first_time is None: first_time = _time() n += 1 end_time = _time() end_event.append(None) for t in threads: t.join() process.kill() return (n - 1) / (end_time - first_time) def run_bandwidth_tests(max_threads): for task in bandwidth_tasks: print("Background CPU task:", task.__doc__) print() func, args = task() nthreads = 0 baseline_speed = None while nthreads <= max_threads: results = run_bandwidth_test(func, args, nthreads) speed = results #speed = len(results) * 1.0 / results[-1][0] print("CPU threads=%d: %.1f" % (nthreads, speed), end="") if baseline_speed is None: print(" packets/s.") baseline_speed = speed else: print(" ( %d %%)" % (speed / baseline_speed * 100)) nthreads += 1 print() def main(): usage = "usage: %prog [-h|--help] [options]" parser = OptionParser(usage=usage) parser.add_option("-t", "--throughput", action="store_true", dest="throughput", default=False, help="run throughput tests") parser.add_option("-l", "--latency", action="store_true", dest="latency", default=False, help="run latency tests") parser.add_option("-b", "--bandwidth", action="store_true", dest="bandwidth", default=False, help="run I/O bandwidth tests") parser.add_option("-i", "--interval", action="store", type="int", dest="check_interval", default=None, help="sys.setcheckinterval() value") parser.add_option("-I", "--switch-interval", action="store", type="float", dest="switch_interval", default=None, help="sys.setswitchinterval() value") parser.add_option("-n", "--num-threads", action="store", type="int", dest="nthreads", default=4, help="max number of threads in tests") # Hidden option to run the pinging and bandwidth clients parser.add_option("", "--latclient", action="store", dest="latclient", default=None, help=SUPPRESS_HELP) parser.add_option("", "--bwclient", action="store", dest="bwclient", default=None, help=SUPPRESS_HELP) options, args = parser.parse_args() if args: parser.error("unexpected arguments") if options.latclient: kwargs = eval(options.latclient) latency_client(**kwargs) return if options.bwclient: kwargs = eval(options.bwclient) bandwidth_client(**kwargs) return if not options.throughput and not options.latency and not options.bandwidth: options.throughput = options.latency = options.bandwidth = True if options.check_interval: sys.setcheckinterval(options.check_interval) if options.switch_interval: sys.setswitchinterval(options.switch_interval) print("== %s %s (%s) ==" % ( platform.python_implementation(), platform.python_version(), platform.python_build()[0], )) # Processor identification often has repeated spaces cpu = ' '.join(platform.processor().split()) print("== %s %s on '%s' ==" % ( platform.machine(), platform.system(), cpu, )) print() if options.throughput: print("--- Throughput ---") print() run_throughput_tests(options.nthreads) if options.latency: print("--- Latency ---") print() run_latency_tests(options.nthreads) if options.bandwidth: print("--- I/O bandwidth ---") print() run_bandwidth_tests(options.nthreads) if __name__ == "__main__": main()