diff --git a/setup.py b/setup.py index 96e5d5e..f9f1cc3 100644 --- a/setup.py +++ b/setup.py @@ -45,7 +45,7 @@ setup( entry_points={'console_scripts': ['wfb-cli=wfb_ng.cli:main', 'wfb-test-latency=wfb_ng.latency_test:main', 'wfb-server=wfb_ng.server:main', - 'wfb-log-parser=wfb_ng.log_parser']}, + 'wfb-log-parser=wfb_ng.log_parser:main']}, package_data={'wfb_ng.conf': ['master.cfg', 'site.cfg']}, data_files = [('/usr/bin', ['wfb_tx', 'wfb_rx', 'wfb_keygen', 'scripts/wfb-cli-x11']), ('/lib/systemd/system', ['scripts/wifibroadcast.service', diff --git a/wfb_ng/__init__.py b/wfb_ng/__init__.py index 02d5a78..9c73b10 100644 --- a/wfb_ng/__init__.py +++ b/wfb_ng/__init__.py @@ -3,6 +3,7 @@ import os import queue import threading import atexit +import time from twisted.internet import utils, reactor from logging import currentframe @@ -171,28 +172,37 @@ def call_and_check_rc(cmd, *args, **kwargs): return utils.getProcessOutputAndValue(cmd, args, env=os.environ).addCallbacks(_check_rc, _got_signal) + +def close_if_failed(f): + def _f(self, *args, **kwargs): + try: + return f(self, *args, **kwargs) + except Exception as v: + if self.twisted_logger: + # Don't use logger due to infinite loop + print('Unable to write to log file: %s' % (v,), file=self.stderr) + else: + reactor.callFromThread(log.err, v, 'Unable to write to: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs)) + + if self.logfile is not None: + self.logfile.close() + self.logfile = None + return _f + + class ErrorSafeLogFile(object): stderr = sys.stderr log_cls = LogFile log_max = 1000 binary = False twisted_logger = True - always_flush = True + flush_delay = 0 def __init__(self, *args, **kwargs): cleanup_at_exit = kwargs.pop('cleanup_at_exit', True) self.logfile = None self.args = args self.kwargs = kwargs - - try: - self.logfile = self.log_cls(*self.args, **self.kwargs) - except Exception as v: - if self.twisted_logger: - print('Unable to open log file: %s' % (v,), file=self.stderr) - else: - log.err(v, 'Unable to open log file: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True) - self.need_stop = threading.Event() self.lock = threading.RLock() self.log_queue_overflow = 0 @@ -209,8 +219,20 @@ class ErrorSafeLogFile(object): self.need_stop.set() self.thread.join() + if self.logfile is not None: + self.logfile.close() + self.logfile = None + def _log_thread_loop(self): + flush_ts = 0 + while not self.need_stop.is_set(): + now = time.time() + + if self.flush_delay > 0 and now > flush_ts: + flush_ts = now + self.flush_delay + self._flush() + try: data = self.log_queue.get(timeout=1) except queue.Empty: @@ -223,29 +245,25 @@ class ErrorSafeLogFile(object): self.log_queue_overflow = 0 if overflow and not self.binary: - self._write_and_flush('--- Dropped %d log items due to log queue overflow\n' % (overflow,)) + self._write('--- Dropped %d log items due to log queue overflow\n' % (overflow,)) - self._write_and_flush(data) + self._write(data) self.log_queue.task_done() - def _write_and_flush(self, data): - try: - if self.logfile is None: - self.logfile = self.log_cls(*self.args, **self.kwargs) + @close_if_failed + def _write(self, data): + if self.logfile is None: + self.logfile = self.log_cls(*self.args, **self.kwargs) - self.logfile.write(data) - if self.always_flush: - self.logfile.flush() - except Exception as v: - if self.twisted_logger: - # Don't use logger due to infinite loop - print('Unable to write to log file: %s' % (v,), file=self.stderr) - else: - reactor.callFromThread(log.err, v, 'Unable to write to: %s(%s, %s)' % (self.log_cls, self.args, self.kwargs), isError=True) + self.logfile.write(data) - if self.logfile is not None: - self.logfile.close() - self.logfile = None + if self.flush_delay == 0: + self.logfile.flush() + + @close_if_failed + def _flush(self): + if self.logfile is not None: + self.logfile.flush() def write(self, data): try: diff --git a/wfb_ng/log_parser.py b/wfb_ng/log_parser.py new file mode 100644 index 0000000..20218b9 --- /dev/null +++ b/wfb_ng/log_parser.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import sys +import time +import msgpack +import struct +import gzip +from pprint import pformat +from .mavlink_protocol import unpack_mavlink + + +def main(): + for f in sys.argv[1:]: + with gzip.GzipFile(f, 'rb') as fd: + while True: + hdr = fd.read(4) + + if len(hdr) < 4: + break + + data_len = struct.unpack('!I', hdr)[0] + data = fd.read(data_len) + + if len(data) < data_len: + break + + msg = msgpack.unpackb(data, strict_map_key=False, use_list=False) + ts = msg.pop('timestamp') + mtype = msg.pop('type') + + if mtype == 'mavlink': + seq, sys_id, comp_id, msg_id = msg.pop('hdr') + msg['sys_id'] = sys_id + msg['comp_id'] = comp_id + msg['seq'] = seq + + mav_message = msg.pop('msg') + try: + k, v = unpack_mavlink(msg_id, mav_message) + msg[k] = v + except Exception as v: + msg['msg'] = mav_message + msg['parse_error'] = v + + ts_txt = time.strftime('%Y-%m-%d %H:%M:%S.{} %Z'.format(('%.3f' % ((ts % 1),))[2:]), time.localtime(ts)) + msg_pp = ('\n%s\t%s\t' % (' ' * len(ts_txt), ' ' * len(mtype)))\ + .join(pformat(msg, compact=True).split('\n')) + print('%s\t%s\t%s' % (ts_txt, mtype, msg_pp)) + + +if __name__ == '__main__': + main() diff --git a/wfb_ng/server.py b/wfb_ng/server.py index 987271d..7be9713 100644 --- a/wfb_ng/server.py +++ b/wfb_ng/server.py @@ -24,6 +24,8 @@ import os import re import hashlib import time +import struct +import gzip from base64 import b85encode from itertools import groupby @@ -31,7 +33,7 @@ from twisted.python import log, failure from twisted.python.logfile import LogFile from twisted.internet import reactor, defer, main as ti_main, threads, task from twisted.internet.protocol import ProcessProtocol, Protocol, Factory -from twisted.protocols.basic import LineReceiver, Int32StringReceiver, _formatNetstring +from twisted.protocols.basic import LineReceiver, Int32StringReceiver from twisted.internet.serialport import SerialPort from . import _log_msg, ConsoleObserver, ErrorSafeLogFile, call_and_check_rc, ExecError @@ -59,17 +61,25 @@ class WFBFlags(object): fec_types = {1: 'VDM_RS'} -# Log format is msgpack -> base85 -> netstring + newline -# See http://cr.yp.to/proto/netstrings.txt for the specification of netstrings. +# Log format is gzipped sequence of int32 strings +# For every run new file will be open to avoid framing errors -class NetstringLogger(ErrorSafeLogFile): +def BinLogFile(self, fname, directory): + filename = '%s.%s' % (fname, time.strftime('%Y%m%d-%H%M%S', time.localtime())) + filename = os.path.join(directory, filename) + reactor.callFromThread(log.msg, 'Open binary log %s' % (filename,)) + return gzip.GzipFile(filename, 'wb') + + +class BinLogger(ErrorSafeLogFile): binary = True twisted_logger = False - always_flush = False + flush_delay = 10 + log_cls = BinLogFile def send_stats(self, data): - msg = b85encode(msgpack.packb(data)) - self.write(_formatNetstring(msg) + b'\n') + data = msgpack.packb(data) + self.write(b''.join((struct.pack('!I', len(data)), data))) class StatisticsProtocol(Int32StringReceiver): @@ -608,10 +618,8 @@ def init(profiles, wlans): profile_cfg = getattr(settings, profile) if settings.common.binary_log_file is not None: - logger = NetstringLogger(settings.common.binary_log_file % (profile,), - settings.path.log_dir, - rotateLength=10 * 1024 * 1024, - maxRotatedFiles=10) + logger = BinLogger(settings.common.binary_log_file % (profile,), + settings.path.log_dir) else: logger = None