#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import time import struct import os from twisted.python import log from twisted.internet import reactor, defer, task from twisted.internet.protocol import DatagramProtocol from twisted.trial import unittest from telemetry.common import df_sleep, abort_on_crash, exit_status class PacketSource(DatagramProtocol): noisy = False def __init__(self, addr, size, count, rate, key): self.df = defer.Deferred() self.addr = addr self.size = size self.count = count self.rate = rate self.key = key def startProtocol(self): self.df.callback(None) def start(self): msg = bytearray(b'\0' * self.size) i = [0] def _sendmsg(c): if c > 1: log.msg('Packet source freeze for %.2f ms at iter %d/%d' % (1000.0 * float(c) / self.rate, i[0], self.count)) struct.pack_into('!HIdd', msg, 0, self.size, i[0], reactor.seconds(), self.key) self.transport.write(msg, self.addr) i[0] += 1 if i[0] >= self.count: lc.stop() lc = task.LoopingCall.withCount(_sendmsg) return lc.start(1.0 / self.rate, now=False) class PacketSink(DatagramProtocol): noisy = False def __init__(self, key): self.df = defer.Deferred() self.count = 0 self.lmin = -1 self.lmax = -1 self.lavg = 0 self.key = key self.id_set = set() self.last_id = 0 def startProtocol(self): self.df.callback(None) def datagramReceived(self, data, addr): size, i, ts, key = struct.unpack_from('!HIdd', data) if size != len(data): log.msg('bad size %d != %d' % (len(data), size), isError=1) return if self.key != key: log.msg('bad session %d != %d #%d, already got %d packets' % (key, self.key, i, self.count)) return latency = reactor.seconds() - ts if latency < 0: log.msg('bad latency %f' % (latency,)) return if i < self.last_id: log.msg('Out of order #%d (prev #%d)' % (i, self.last_id)) else: self.last_id = i self.id_set.add(i) self.lmin = latency if self.lmin < 0 else min(latency, self.lmin) self.lmax = latency if self.lmax < 0 else max(latency, self.lmax) self.lavg += latency self.count += 1 @defer.inlineCallbacks def run_test(port_in, port_out, size, count, rate): key = int(os.urandom(2).encode('hex'), 16) log.msg('Session: %d' % (key,)) p1 = PacketSource(('127.0.0.1', port_in), size, count, rate, key) p2 = PacketSink(key) ep1 = reactor.listenUDP(0, p1) ep2 = reactor.listenUDP(port_out, p2) yield p1.df yield p2.df yield p1.start() yield df_sleep(2) sent = count lost = count - p2.count dup = p2.count - len(p2.id_set) bitrate = rate * size * 8 / 1e6 log.msg('Sent %d, Lost %d, Dup: %d, Packet rate: %d pkt/s, Bitrate: %.2f MBit/s, Lmin %.2f ms, Lmax %.2f ms, Lavg %.2f ms' % \ (sent, lost, dup, rate, bitrate, 1000.0 * p2.lmin, 1000.0 * p2.lmax, 1000.0 * p2.lavg / p2.count if p2.count else -1)) yield ep1.stopListening() yield ep2.stopListening() defer.returnValue((lost, p2.lavg / p2.count if p2.count else -1, bitrate)) @defer.inlineCallbacks def eval_max_rate(port_in, port_out, size, max_rate): min_rate = 1 while 1: dr = int((max_rate - min_rate) / 2) if dr <= 0: break rate = min_rate + dr count = 3 * rate # run each test for 3s lost, lavg, bitrate = yield run_test(port_in, port_out, size, count, rate) if lost >= max(count * 0.01, 10) or lavg > 0.005: # rate too big max_rate = rate else: # rate too low min_rate = rate log.msg('Max bitrate is %.2f MBit/s' % (bitrate,)) def main(): log.startLogging(sys.stdout) port_in, port_out, size, max_packet_rate = sys.argv[1:] reactor.callWhenRunning(lambda: defer.maybeDeferred(eval_max_rate, int(port_in), int(port_out), int(size), int(max_packet_rate))\ .addCallbacks(lambda _: reactor.stop(), abort_on_crash)) reactor.run() rc = exit_status() log.msg('Exiting with code %d' % rc) sys.exit(rc) if __name__ == '__main__': main()