Fix binary log format. Add missing log parser.

This commit is contained in:
Vasily Evseenko 2024-07-10 19:50:53 +03:00
parent 0fb201292f
commit c2859f9415
4 changed files with 118 additions and 40 deletions

View File

@ -45,7 +45,7 @@ setup(
entry_points={'console_scripts': ['wfb-cli=wfb_ng.cli:main',
'wfb-test-latency=wfb_ng.latency_test:main',
'wfb-server=wfb_ng.server:main',
'wfb-log-parser=wfb_ng.log_parser']},
'wfb-log-parser=wfb_ng.log_parser:main']},
package_data={'wfb_ng.conf': ['master.cfg', 'site.cfg']},
data_files = [('/usr/bin', ['wfb_tx', 'wfb_rx', 'wfb_keygen', 'scripts/wfb-cli-x11']),
('/lib/systemd/system', ['scripts/wifibroadcast.service',

View File

@ -3,6 +3,7 @@ import os
import queue
import threading
import atexit
import time
from twisted.internet import utils, reactor
from logging import currentframe
@ -171,28 +172,37 @@ def call_and_check_rc(cmd, *args, **kwargs):
return utils.getProcessOutputAndValue(cmd, args, env=os.environ).addCallbacks(_check_rc, _got_signal)
def close_if_failed(f):
def _f(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except Exception as v:
if self.twisted_logger:
# Don't use logger due to infinite loop
print('Unable to write to log file: %s' % (v,), file=self.stderr)
else:
reactor.callFromThread(log.err, v, 'Unable to write to: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs))
if self.logfile is not None:
self.logfile.close()
self.logfile = None
return _f
class ErrorSafeLogFile(object):
stderr = sys.stderr
log_cls = LogFile
log_max = 1000
binary = False
twisted_logger = True
always_flush = True
flush_delay = 0
def __init__(self, *args, **kwargs):
cleanup_at_exit = kwargs.pop('cleanup_at_exit', True)
self.logfile = None
self.args = args
self.kwargs = kwargs
try:
self.logfile = self.log_cls(*self.args, **self.kwargs)
except Exception as v:
if self.twisted_logger:
print('Unable to open log file: %s' % (v,), file=self.stderr)
else:
log.err(v, 'Unable to open log file: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True)
self.need_stop = threading.Event()
self.lock = threading.RLock()
self.log_queue_overflow = 0
@ -209,8 +219,20 @@ class ErrorSafeLogFile(object):
self.need_stop.set()
self.thread.join()
if self.logfile is not None:
self.logfile.close()
self.logfile = None
def _log_thread_loop(self):
flush_ts = 0
while not self.need_stop.is_set():
now = time.time()
if self.flush_delay > 0 and now > flush_ts:
flush_ts = now + self.flush_delay
self._flush()
try:
data = self.log_queue.get(timeout=1)
except queue.Empty:
@ -223,29 +245,25 @@ class ErrorSafeLogFile(object):
self.log_queue_overflow = 0
if overflow and not self.binary:
self._write_and_flush('--- Dropped %d log items due to log queue overflow\n' % (overflow,))
self._write('--- Dropped %d log items due to log queue overflow\n' % (overflow,))
self._write_and_flush(data)
self._write(data)
self.log_queue.task_done()
def _write_and_flush(self, data):
try:
if self.logfile is None:
self.logfile = self.log_cls(*self.args, **self.kwargs)
@close_if_failed
def _write(self, data):
if self.logfile is None:
self.logfile = self.log_cls(*self.args, **self.kwargs)
self.logfile.write(data)
if self.always_flush:
self.logfile.flush()
except Exception as v:
if self.twisted_logger:
# Don't use logger due to infinite loop
print('Unable to write to log file: %s' % (v,), file=self.stderr)
else:
reactor.callFromThread(log.err, v, 'Unable to write to: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True)
self.logfile.write(data)
if self.logfile is not None:
self.logfile.close()
self.logfile = None
if self.flush_delay == 0:
self.logfile.flush()
@close_if_failed
def _flush(self):
if self.logfile is not None:
self.logfile.flush()
def write(self, data):
try:

52
wfb_ng/log_parser.py Normal file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env python3
import sys
import time
import msgpack
import struct
import gzip
from pprint import pformat
from .mavlink_protocol import unpack_mavlink
def main():
for f in sys.argv[1:]:
with gzip.GzipFile(f, 'rb') as fd:
while True:
hdr = fd.read(4)
if len(hdr) < 4:
break
data_len = struct.unpack('!I', hdr)[0]
data = fd.read(data_len)
if len(data) < data_len:
break
msg = msgpack.unpackb(data, strict_map_key=False, use_list=False)
ts = msg.pop('timestamp')
mtype = msg.pop('type')
if mtype == 'mavlink':
seq, sys_id, comp_id, msg_id = msg.pop('hdr')
msg['sys_id'] = sys_id
msg['comp_id'] = comp_id
msg['seq'] = seq
mav_message = msg.pop('msg')
try:
k, v = unpack_mavlink(msg_id, mav_message)
msg[k] = v
except Exception as v:
msg['msg'] = mav_message
msg['parse_error'] = v
ts_txt = time.strftime('%Y-%m-%d %H:%M:%S.{} %Z'.format(('%.3f' % ((ts % 1),))[2:]), time.localtime(ts))
msg_pp = ('\n%s\t%s\t' % (' ' * len(ts_txt), ' ' * len(mtype)))\
.join(pformat(msg, compact=True).split('\n'))
print('%s\t%s\t%s' % (ts_txt, mtype, msg_pp))
if __name__ == '__main__':
main()

View File

@ -24,6 +24,8 @@ import os
import re
import hashlib
import time
import struct
import gzip
from base64 import b85encode
from itertools import groupby
@ -31,7 +33,7 @@ from twisted.python import log, failure
from twisted.python.logfile import LogFile
from twisted.internet import reactor, defer, main as ti_main, threads, task
from twisted.internet.protocol import ProcessProtocol, Protocol, Factory
from twisted.protocols.basic import LineReceiver, Int32StringReceiver, _formatNetstring
from twisted.protocols.basic import LineReceiver, Int32StringReceiver
from twisted.internet.serialport import SerialPort
from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError
@ -59,17 +61,25 @@ class WFBFlags(object):
fec_types = {1: 'VDM_RS'}
# Log format is msgpack -> base85 -> netstring + newline
# See http://cr.yp.to/proto/netstrings.txt for the specification of netstrings.
# Log format is gzipped sequence of int32 strings
# For every run new file will be open to avoid framing errors
class NetstringLogger(ErrorSafeLogFile):
def BinLogFile(self, fname, directory):
filename = '%s.%s' % (fname, time.strftime('%Y%m%d-%H%M%S', time.localtime()))
filename = os.path.join(directory, filename)
reactor.callFromThread(log.msg, 'Open binary log %s' % (filename,))
return gzip.GzipFile(filename, 'wb')
class BinLogger(ErrorSafeLogFile):
binary = True
twisted_logger = False
always_flush = False
flush_delay = 10
log_cls = BinLogFile
def send_stats(self, data):
msg = b85encode(msgpack.packb(data))
self.write(_formatNetstring(msg) + b'\n')
data = msgpack.packb(data)
self.write(b''.join((struct.pack('!I', len(data)), data)))
class StatisticsProtocol(Int32StringReceiver):
@ -608,10 +618,8 @@ def init(profiles, wlans):
profile_cfg = getattr(settings, profile)
if settings.common.binary_log_file is not None:
logger = NetstringLogger(settings.common.binary_log_file % (profile,),
settings.path.log_dir,
rotateLength=10 * 1024 * 1024,
maxRotatedFiles=10)
logger = BinLogger(settings.common.binary_log_file % (profile,),
settings.path.log_dir)
else:
logger = None