Add optional logging for radio channel stats and mavlink messages

This commit is contained in:
Vasily Evseenko 2024-07-09 20:30:20 +03:00
parent 85d3b8b0a3
commit 0fb201292f
5 changed files with 230 additions and 17 deletions

View File

@ -44,7 +44,8 @@ setup(
zip_safe=False,
entry_points={'console_scripts': ['wfb-cli=wfb_ng.cli:main',
'wfb-test-latency=wfb_ng.latency_test:main',
'wfb-server=wfb_ng.server:main']},
'wfb-server=wfb_ng.server:main',
'wfb-log-parser=wfb_ng.log_parser']},
package_data={'wfb_ng.conf': ['master.cfg', 'site.cfg']},
data_files = [('/usr/bin', ['wfb_tx', 'wfb_rx', 'wfb_keygen', 'scripts/wfb-cli-x11']),
('/lib/systemd/system', ['scripts/wifibroadcast.service',

View File

@ -1,8 +1,13 @@
import sys
import os
from twisted.internet import utils
import queue
import threading
import atexit
from twisted.internet import utils, reactor
from logging import currentframe
from twisted.python import log
from twisted.python.logfile import LogFile
__orig_msg = log.msg
_srcfile = os.path.splitext(os.path.normcase(__file__))[0] + '.py'
@ -164,3 +169,90 @@ def call_and_check_rc(cmd, *args, **kwargs):
raise err
return utils.getProcessOutputAndValue(cmd, args, env=os.environ).addCallbacks(_check_rc, _got_signal)
class ErrorSafeLogFile(object):
stderr = sys.stderr
log_cls = LogFile
log_max = 1000
binary = False
twisted_logger = True
always_flush = True
def __init__(self, *args, **kwargs):
cleanup_at_exit = kwargs.pop('cleanup_at_exit', True)
self.logfile = None
self.args = args
self.kwargs = kwargs
try:
self.logfile = self.log_cls(*self.args, **self.kwargs)
except Exception as v:
if self.twisted_logger:
print('Unable to open log file: %s' % (v,), file=self.stderr)
else:
log.err(v, 'Unable to open log file: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True)
self.need_stop = threading.Event()
self.lock = threading.RLock()
self.log_queue_overflow = 0
self.log_queue = queue.Queue(self.log_max)
self.thread = threading.Thread(target=self._log_thread_loop, name='logging thread')
self.thread.daemon = True
self.thread.start()
if cleanup_at_exit:
atexit.register(self._cleanup)
def _cleanup(self):
self.log_queue.join()
self.need_stop.set()
self.thread.join()
def _log_thread_loop(self):
while not self.need_stop.is_set():
try:
data = self.log_queue.get(timeout=1)
except queue.Empty:
continue
overflow = 0
with self.lock:
if self.log_queue_overflow:
overflow = self.log_queue_overflow
self.log_queue_overflow = 0
if overflow and not self.binary:
self._write_and_flush('--- Dropped %d log items due to log queue overflow\n' % (overflow,))
self._write_and_flush(data)
self.log_queue.task_done()
def _write_and_flush(self, data):
try:
if self.logfile is None:
self.logfile = self.log_cls(*self.args, **self.kwargs)
self.logfile.write(data)
if self.always_flush:
self.logfile.flush()
except Exception as v:
if self.twisted_logger:
# Don't use logger due to infinite loop
print('Unable to write to log file: %s' % (v,), file=self.stderr)
else:
reactor.callFromThread(log.err, v, 'Unable to write to: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True)
if self.logfile is not None:
self.logfile.close()
self.logfile = None
def write(self, data):
try:
self.log_queue.put_nowait(data)
except queue.Full:
with self.lock:
self.log_queue_overflow += 1
def flush(self):
pass

View File

@ -13,6 +13,9 @@ commit = None
primary = True # Set to False if you use several wfb instances on one card. Only primary instance will set radio channel.
log_file = None # Set to "wifibroadcast.log" to disable log to stdout
binary_log_file = None # Machine readable log for post-processing
# File name should have '%%s' inside for profile mapping.
# For example: 'binlog_%%s.log'
set_nm_unmanaged = True # Set radio interface in 'unmanaged state' in NetworkManager
radio_mtu = 1445 # Used for mavlink aggregation and for tunnel packets - should be less or equal to MAX_PAYLOAD_SIZE, don't change if doubt
@ -154,6 +157,7 @@ peer = None
[mavlink]
log_messages = False # Log all messages to binary log for post processing
frame_type = 'data' # Use data or rts frames
fec_k = 1 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 2 # FEC N (For tx side. Rx will get FEC settings from session packet)

View File

@ -19,15 +19,57 @@
#
import struct
import time
from . import call_and_check_rc, ExecError
from .mavlink import MAV_MODE_FLAG_SAFETY_ARMED, MAVLINK_MSG_ID_HEARTBEAT
from .mavlink import MAV_MODE_FLAG_SAFETY_ARMED, MAVLINK_MSG_ID_HEARTBEAT, mavlink_map
from zope.interface import implementer
from twisted.python import log
from twisted.internet import reactor, defer, utils, interfaces
from twisted.internet.protocol import Protocol, DatagramProtocol, Factory
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Type, Union, cast
def unpack_mavlink(msg_id, mbuf):
msgtype = mavlink_map[msg_id]
order_map = msgtype.orders
len_map = msgtype.lengths
csize = msgtype.unpacker.size
if len(mbuf) < csize:
# zero pad to give right size
mbuf += (b'0' * (csize - len(mbuf)))
t = cast(Tuple[Union[bytes, int, float], ...],
msgtype.unpacker.unpack(mbuf[:csize]))
tlist = list(t)
if sum(len_map) == len(len_map):
# message has no arrays in it
for i in range(0, len(tlist)):
tlist[i] = t[order_map[i]]
else:
# message has some arrays
tlist = []
for i in range(0, len(order_map)):
order = order_map[i]
L = len_map[order]
tip = sum(len_map[:order])
field = t[tip]
if L == 1 or isinstance(field, bytes):
tlist.append(field)
else:
tlist.append(cast(Union[Sequence[int], Sequence[float]], list(t[tip : (tip + L)])))
# terminate any strings
for i, elem in enumerate(tlist):
if isinstance(elem, bytes):
tlist[i] = elem.rstrip(b"\x00")
return msgtype.msgname, dict(zip(msgtype.fieldnames, tlist))
def parse_mavlink_l2_v1(msg):
plen, seq, sys_id, comp_id, msg_id = struct.unpack('<BBBBB', msg[1:6])
@ -210,3 +252,22 @@ class MavlinkTCPFactory(Factory):
s.send(data)
except Exception as v:
log.err(v)
class MavlinkLoggerProtocol(object):
def __init__(self, logger):
self.logger = logger
self.mavlink_fsm = mavlink_parser_gen(parse_l2=True)
self.mavlink_fsm.send(None)
def dataReceived(self, data):
for l2_headers, m in self.mavlink_fsm.send(data):
self.messageReceived(l2_headers, m)
def messageReceived(self, l2_headers, message):
# seq, sys_id, comp_id, msg_id = l2_headers
self.logger.send_stats(dict(type='mavlink',
timestamp=time.time(),
hdr=l2_headers,
msg=message))

View File

@ -23,20 +23,22 @@ import msgpack
import os
import re
import hashlib
import time
from base64 import b85encode
from itertools import groupby
from twisted.python import log, failure
from twisted.python.logfile import LogFile
from twisted.internet import reactor, defer, main as ti_main, threads, task
from twisted.internet.protocol import ProcessProtocol, Protocol, Factory
from twisted.protocols.basic import LineReceiver, Int32StringReceiver
from twisted.protocols.basic import LineReceiver, Int32StringReceiver, _formatNetstring
from twisted.internet.serialport import SerialPort
from . import _log_msg, ConsoleObserver, call_and_check_rc, ExecError
from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError
from .common import abort_on_crash, exit_status, df_sleep
from .config_parser import Section
from .proxy import UDPProxyProtocol, MavlinkSerialProxyProtocol, MavlinkUDPProxyProtocol
from .mavlink_protocol import MavlinkARMProtocol, MavlinkTCPFactory
from .mavlink_protocol import MavlinkARMProtocol, MavlinkTCPFactory, MavlinkLoggerProtocol
from .tuntap import TUNTAPProtocol, TUNTAPTransport
from .conf import settings, cfg_files
@ -57,6 +59,19 @@ class WFBFlags(object):
fec_types = {1: 'VDM_RS'}
# Log format is msgpack -> base85 -> netstring + newline
# See http://cr.yp.to/proto/netstrings.txt for the specification of netstrings.
class NetstringLogger(ErrorSafeLogFile):
binary = True
twisted_logger = False
always_flush = False
def send_stats(self, data):
msg = b85encode(msgpack.packb(data))
self.write(_formatNetstring(msg) + b'\n')
class StatisticsProtocol(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024
@ -82,7 +97,7 @@ class StatsAndSelectorFactory(Factory):
Aggregate RX stats and select TX antenna
"""
def __init__(self, profile, wlans, link_domain):
def __init__(self, profile, wlans, link_domain, logger):
self.wlans = tuple(wlans)
self.ant_sel_cb_list = []
self.rssi_cb_l = []
@ -94,6 +109,19 @@ class StatsAndSelectorFactory(Factory):
# tcp sockets for UI
self.ui_sessions = []
# machine-readable logger
self.logger = logger
if logger is not None:
logger.send_stats(dict(type='init',
timestamp = time.time(),
version=settings.common.version,
profile=profile,
wlans=wlans,
link_domain=link_domain))
self.ui_sessions.append(logger)
# CLI title
self.cli_title = 'WFB-ng_%s @%s %s [%s]' % (settings.common.version, profile, ', '.join(wlans), link_domain)
@ -203,6 +231,13 @@ class StatsAndSelectorFactory(Factory):
self.tx_sel = max_rssi_ant
def process_new_session(self, rx_id, session):
if self.logger is not None:
self.logger.send_stats(dict(type='new_session',
timestamp = time.time(),
id=rx_id,
**session))
def update_rx_stats(self, rx_id, packet_stats, ant_stats, session):
stats_agg = self._stats_agg_by_freq(ant_stats)
card_rssi_l = list(rssi_avg
@ -237,9 +272,11 @@ class StatsAndSelectorFactory(Factory):
if settings.common.debug:
log.msg('%s rssi %s tx#%d %s %s' % (rx_id, max(card_rssi_l) if card_rssi_l else 'N/A', self.tx_sel, packet_stats, ant_stats))
# Send stats to CLI sessions
# Send stats to CLI sessions and logger
for s in self.ui_sessions:
s.send_stats(dict(type='rx', id=rx_id, tx_ant=self.tx_sel,
s.send_stats(dict(type='rx',
timestamp = time.time(),
id=rx_id, tx_ant=self.tx_sel,
packets=packet_stats, rx_ant_stats=ant_stats,
session=session))
@ -247,9 +284,11 @@ class StatsAndSelectorFactory(Factory):
if settings.common.debug:
log.msg("%s %r %r" % (tx_id, packet_stats, ant_latency))
# Send stats to CLI sessions
# Send stats to CLI sessions and logger
for s in self.ui_sessions:
s.send_stats(dict(type='tx', id=tx_id,
s.send_stats(dict(type='tx',
timestamp = time.time(),
id=tx_id,
packets=packet_stats,
latency=ant_latency,
rf_temperature=self.rf_temperature))
@ -313,9 +352,11 @@ class RXAntennaProtocol(LineReceiver):
raise BadTelemetry()
epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':'))
self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n)
self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n, epoch=epoch)
log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch))
if self.ant_stat_cb is not None:
self.ant_stat_cb.process_new_session(self.rx_id, self.session)
else:
raise BadTelemetry()
except BadTelemetry:
@ -565,7 +606,16 @@ def init(profiles, wlans):
for profile, service_list in services:
# Domain wide antenna selector
profile_cfg = getattr(settings, profile)
ant_sel_f = StatsAndSelectorFactory(profile, wlans, profile_cfg.link_domain)
if settings.common.binary_log_file is not None:
logger = NetstringLogger(settings.common.binary_log_file % (profile,),
settings.path.log_dir,
rotateLength=10 * 1024 * 1024,
maxRotatedFiles=10)
else:
logger = None
ant_sel_f = StatsAndSelectorFactory(profile, wlans, profile_cfg.link_domain, logger)
ant_sel_l.append(ant_sel_f)
link_id = int.from_bytes(hashlib.sha1(profile_cfg.link_domain.encode('utf-8')).digest()[:3], 'big')
@ -678,6 +728,11 @@ def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f):
rx_hooks.append(arm_proto.dataReceived)
tx_hooks.append(arm_proto.dataReceived)
if cfg.log_messages and ant_sel_f.logger is not None:
mav_log_proto = MavlinkLoggerProtocol(ant_sel_f.logger)
rx_hooks.append(mav_log_proto.dataReceived)
tx_hooks.append(mav_log_proto.dataReceived)
if serial:
p_in = MavlinkSerialProxyProtocol(agg_max_size=settings.common.radio_mtu,
agg_timeout=settings.common.mavlink_agg_timeout,
@ -953,10 +1008,10 @@ def main():
log.msg = _log_msg
if settings.common.log_file:
log.startLogging(LogFile(settings.common.log_file,
settings.path.log_dir,
rotateLength=1024*1024,
maxRotatedFiles=10))
log.startLogging(ErrorSafeLogFile(settings.common.log_file,
settings.path.log_dir,
rotateLength=1024 * 1024,
maxRotatedFiles=10))
elif sys.stdout.isatty():
log.startLogging(sys.stdout)