Fix mavlink aggregation

This commit is contained in:
Vasily Evseenko 2018-12-16 18:08:22 +03:00
parent 273ee924aa
commit e460af605e
2 changed files with 40 additions and 42 deletions

View File

@ -43,53 +43,55 @@ class UDPProxyProtocol(DatagramProtocol):
if(self.agg_queue_timer):
self.agg_queue_timer.cancel()
def write(self, msg):
# send message to local transport
if self.agg_max_size is None:
return self._write(msg)
if len(msg) > self.agg_max_size:
log.msg('Message too big: %d > %d' % (len(msg), self.agg_max_size), isError=1)
return
if self.agg_queue_size + len(msg) > self.agg_max_size:
# message doesn't fit into agg queue
if self.agg_queue_timer is not None:
self.agg_queue_timer.cancel()
self.agg_queue_timer = None
self._write(''.join(self.agg_queue))
self.agg_queue = []
self.agg_queue_size = 0
self.agg_queue.append(msg)
self.agg_queue_size += len(msg)
if self.agg_timeout and self.agg_queue_timer is None:
self.agg_queue_timer = reactor.callLater(self.agg_timeout, self.flush_queue)
def flush_queue(self):
if self.agg_queue_size > 0:
if self.agg_queue_timer is not None:
if not self.agg_queue_timer.called:
self.agg_queue_timer.cancel()
self.agg_queue_timer = None
self._write(''.join(self.agg_queue))
self._send_to_peer(''.join(self.agg_queue))
self.agg_queue = []
self.agg_queue_size = 0
def _write(self, msg):
# call from peer and from mavlink rssi injector only!
def write(self, msg):
if self.transport is not None and self.reply_addr is not None:
self.transport.write(msg, self.reply_addr)
def _send_to_peer(self, data):
if self.peer is not None:
self.peer.write(data)
def datagramReceived(self, data, addr):
if settings.common.debug:
log.msg('Got a message from %s' % (addr,))
self.reply_addr = addr
if self.peer is not None:
self.peer.write(data)
# send message to local transport
if self.agg_max_size is None:
return self._send_to_peer(data)
if len(data) > self.agg_max_size:
log.msg('Message too big: %d > %d' % (len(data), self.agg_max_size), isError=1)
return
if self.agg_queue_size + len(data) > self.agg_max_size:
# message doesn't fit into agg queue
if self.agg_queue_timer is not None:
self.agg_queue_timer.cancel()
self.agg_queue_timer = None
self._send_to_peer(''.join(self.agg_queue))
self.agg_queue = []
self.agg_queue_size = 0
self.agg_queue.append(data)
self.agg_queue_size += len(data)
if self.agg_timeout and self.agg_queue_timer is None:
self.agg_queue_timer = reactor.callLater(self.agg_timeout, self.flush_queue)
def send_rssi(self, rssi, rx_errors, rx_fec, flags):
# Send flags as txbuf

View File

@ -29,7 +29,8 @@ class SendPacket(DatagramProtocol):
def datagramReceived(self, data, addr):
log.msg("received back %r from %s" % (data, addr))
self.df.callback(addr)
self.df.callback((data, addr))
class UDPProxyTestCase(unittest.TestCase):
def setUp(self):
@ -49,20 +50,21 @@ class UDPProxyTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_proxy(self):
addr = ('127.0.0.1', 14551)
p = SendPacket('test', addr)
p = SendPacket('test', addr, 10)
ep3 = reactor.listenUDP(9999, p)
ep4 = reactor.listenUDP(14553, Echo())
try:
ts = time.time()
_addr = yield p.df
_data, _addr = yield p.df
self.assertGreater(time.time() - ts, 1.0)
self.assertEqual(_addr, addr)
self.assertEqual(_data, 'test' * 10)
finally:
ep4.stopListening()
ep3.stopListening()
@defer.inlineCallbacks
def test_rssi_injection_and_aggregation(self):
def test_rssi_injection(self):
addr = ('127.0.0.1', 14551)
p = SendPacket('test', addr)
@ -70,17 +72,11 @@ class UDPProxyTestCase(unittest.TestCase):
yield df_sleep(0.1)
try:
for i in range(100):
self.p1.send_rssi(1, 2, 3, 4)
self.p1.send_rssi(1, 2, 3, 4)
ts = time.time()
_addr = yield p.df
_data, _addr = yield p.df
self.assertLess(time.time() - ts, 1.0)
self.assertEqual(_addr, addr)
self.assertEqual(len(self.p1.agg_queue), 24)
self.assertEqual(self.p1.agg_queue_size, 456)
self.assertEqual(_data, '\xfd\x07\x00\x00\x00\x01\xf2m\x00\x00\x02\x00\x03\x00\x01\x01\x04\xae\x11')
finally:
ep3.stopListening()