Added various runtime statistics:

- MCS, bandwidth and used FEC scheme
 - Radio and UDP flow rates

CLI UI refactoring
This commit is contained in:
Vasily Evseenko 2024-05-20 23:41:00 +03:00
parent d3ca31b24e
commit 9d8e5b3990
8 changed files with 258 additions and 105 deletions

View File

@ -117,6 +117,9 @@ void Receiver::loop_iter(void)
int8_t noise[RX_ANT_MAX];
uint8_t flags = 0;
bool self_injected = false;
uint8_t mcs_index = 0;
uint8_t bandwidth = 20;
struct ieee80211_radiotap_iterator iterator;
int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL);
@ -170,6 +173,45 @@ void Receiver::loop_iter(void)
self_injected = true;
break;
case IEEE80211_RADIOTAP_MCS:
{
/* u8,u8,u8 */
uint8_t mcs_have = iterator.this_arg[0];
if (mcs_have & IEEE80211_RADIOTAP_MCS_HAVE_MCS)
{
mcs_index = iterator.this_arg[2] & 0x7f;
}
if ((mcs_have & 1) && (iterator.this_arg[1] & 1))
{
bandwidth = 40;
}
}
break;
case IEEE80211_RADIOTAP_VHT:
{
/* u16 known, u8 flags, u8 bandwidth, u8 mcs_nss[4], u8 coding, u8 group_id, u16 partial_aid */
u8 known = iterator.this_arg[0];
if(known & 0x40)
{
int bwidth = iterator.this_arg[3] & 0x1f;
if(bwidth >= 1 && bwidth <= 3)
{
bandwidth = 40;
}
else if(bwidth >= 4 && bwidth <= 10)
{
bandwidth = 80;
}
}
mcs_index = (iterator.this_arg[4] >> 4) & 0x0f;
}
break;
default:
break;
}
@ -201,9 +243,11 @@ void Receiver::loop_iter(void)
pkt += iterator._max_length;
pktlen -= iterator._max_length;
//fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth);
if (pktlen > (int)sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, noise, freq, NULL);
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header),
wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL);
} else {
fprintf(stderr, "Short packet (ieee header)\n");
continue;
@ -213,8 +257,8 @@ void Receiver::loop_iter(void)
Aggregator::Aggregator(const string &client_addr, int client_port, const string &keypair, uint64_t epoch, uint32_t channel_id) : \
count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0),
count_p_all(0), count_b_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0), count_p_override(0), count_p_outgoing(0), count_b_outgoing(0),
fec_p(NULL), fec_k(-1), fec_n(-1), seq(0), rx_ring_front(0), rx_ring_alloc(0),
last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id)
{
@ -307,10 +351,14 @@ Forwarder::Forwarder(const string &client_addr, int client_port)
}
void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr)
void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr)
{
wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx,
.freq = freq };
.freq = freq,
.mcs_index = mcs_index,
.bandwidth = bandwidth };
memcpy(fwd_hdr.antenna, antenna, RX_ANT_MAX * sizeof(uint8_t));
memcpy(fwd_hdr.rssi, rssi, RX_ANT_MAX * sizeof(int8_t));
@ -414,13 +462,14 @@ void Aggregator::dump_stats(FILE *fp)
for(rx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.antenna_id, it->second.count_all,
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all,
it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max,
it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max);
}
fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing);
fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing);
fflush(fp);
if(count_p_override)
@ -437,12 +486,16 @@ void Aggregator::dump_stats(FILE *fp)
}
void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq)
void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise,
uint16_t freq, uint8_t mcs_index, uint8_t bandwidth)
{
for(int i = 0; i < RX_ANT_MAX && ant[i] != 0xff; i++)
{
// antenna_id: addr + port + wlan_idx + ant
rxAntennaKey key = {.freq = freq, .antenna_id = 0};
rxAntennaKey key = {.freq = freq,
.antenna_id = 0,
.mcs_index=mcs_index,
.bandwidth=bandwidth};
if (sockaddr != NULL && sockaddr->sin_family == AF_INET)
{
@ -456,10 +509,13 @@ void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const u
}
void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr)
void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr)
{
wsession_data_t new_session_data;
count_p_all += 1;
count_b_all += size;
if(size == 0) return;
@ -536,7 +592,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
}
count_p_dec_ok += 1;
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq);
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth);
if (memcmp(session_key, new_session_data.session_key, sizeof(session_key)) != 0)
{
@ -579,7 +635,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
}
count_p_dec_ok += 1;
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq);
log_rssi(sockaddr, wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth);
assert(decrypted_len <= MAX_FEC_PAYLOAD);
@ -715,10 +771,12 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
{
fprintf(stderr, "Corrupted packet %u\n", seq);
count_p_bad += 1;
}else if(!(flags & WFB_PACKET_FEC_ONLY))
}
else if(!(flags & WFB_PACKET_FEC_ONLY))
{
send(sockfd, payload, packet_size, MSG_DONTWAIT);
count_p_outgoing += 1;
count_b_outgoing += packet_size;
}
}
@ -882,7 +940,9 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
fprintf(stderr, "Short packet (rx fwd header)\n");
continue;
}
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq, &sockaddr);
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna,
fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq,
fwd_hdr.mcs_index, fwd_hdr.bandwidth, &sockaddr);
}
if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
}

View File

@ -39,7 +39,10 @@ typedef enum {
class BaseAggregator
{
public:
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr) = 0;
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr) = 0;
virtual void dump_stats(FILE *fp) = 0;
protected:
int open_udp_socket_for_tx(const std::string &client_addr, int client_port)
@ -67,7 +70,9 @@ class Forwarder : public BaseAggregator
public:
Forwarder(const std::string &client_addr, int client_port);
~Forwarder();
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth,sockaddr_in *sockaddr);
virtual void dump_stats(FILE *) {}
private:
int sockfd;
@ -129,21 +134,37 @@ struct rxAntennaKey
{
uint16_t freq;
uint64_t antenna_id;
uint8_t mcs_index;
uint8_t bandwidth;
bool operator==(const rxAntennaKey &other) const
{
return (freq == other.freq && antenna_id == other.antenna_id);
return (freq == other.freq && \
antenna_id == other.antenna_id && \
mcs_index == other.mcs_index && \
bandwidth == other.bandwidth);
}
};
template <typename T>
void hash_combine(std::size_t& seed, const T& v)
{
seed ^= std::hash<T>{}(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
template<>
struct std::hash<rxAntennaKey>
{
std::size_t operator()(const rxAntennaKey& k) const noexcept
{
std::size_t h1 = std::hash<uint16_t>{}(k.freq);
std::size_t h2 = std::hash<uint64_t>{}(k.antenna_id);
return h1 ^ (h2 << 1); // combine hashes
std::size_t h = 0;
hash_combine(h, k.freq);
hash_combine(h, k.antenna_id);
hash_combine(h, k.mcs_index);
hash_combine(h, k.bandwidth);
return h;
}
};
@ -154,7 +175,9 @@ class Aggregator : public BaseAggregator
public:
Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id);
~Aggregator();
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, const int8_t *noise, uint16_t freq, sockaddr_in *sockaddr);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);
// Make stats public for android userspace receiver
@ -162,6 +185,7 @@ public:
{
antenna_stat.clear();
count_p_all = 0;
count_b_all = 0;
count_p_dec_err = 0;
count_p_dec_ok = 0;
count_p_fec_recovered = 0;
@ -169,10 +193,12 @@ public:
count_p_bad = 0;
count_p_override = 0;
count_p_outgoing = 0;
count_b_outgoing = 0;
}
rx_antenna_stat_t antenna_stat;
uint32_t count_p_all;
uint32_t count_b_all;
uint32_t count_p_dec_err;
uint32_t count_p_dec_ok;
uint32_t count_p_fec_recovered;
@ -180,13 +206,15 @@ public:
uint32_t count_p_bad;
uint32_t count_p_override;
uint32_t count_p_outgoing;
uint32_t count_b_outgoing;
private:
void init_fec(int k, int n);
void deinit_fec(void);
void send_packet(int ring_idx, int fragment_idx);
void apply_fec(int ring_idx);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi, const int8_t *noise, uint16_t freq);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi,
const int8_t *noise, uint16_t freq, uint8_t mcs_index, uint8_t bandwidth);
int get_block_ring_idx(uint64_t block_idx);
int rx_ring_push(void);
fec_t* fec_p;

View File

@ -234,7 +234,7 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
}
uint64_t key = (uint64_t)(current_output) << 8 | (uint64_t)0xff;
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0);
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
else
{
@ -252,25 +252,26 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
}
uint64_t key = (uint64_t)(i) << 8 | (uint64_t)0xff;
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0);
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
}
}
void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped)
void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
{
for(tx_antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
ts, it->first,
it->second.count_injected, it->second.count_dropped,
it->second.count_p_injected, it->second.count_p_dropped,
it->second.latency_min,
it->second.latency_sum / (it->second.count_injected + it->second.count_dropped),
it->second.latency_sum / (it->second.count_p_injected + it->second.count_p_dropped),
it->second.latency_max);
injected += it->second.count_injected;
dropped += it->second.count_dropped;
injected_packets += it->second.count_p_injected;
dropped_packets += it->second.count_p_dropped;
injected_bytes += it->second.count_b_injected;
}
antenna_stat.clear();
}
@ -393,7 +394,9 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0;
uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout
uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
uint32_t count_p_injected = 0; // successfully injected (include additional fec packets)
uint32_t count_b_incoming = 0; // incoming udp bytes (received + dropped due to rxq overflow)
uint32_t count_p_injected = 0; // successfully injected packets (include additional fec packets)
uint32_t count_b_injected = 0; // successfully injected bytes (include additional fec packets)
uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout
uint32_t count_p_truncated = 0; // injected large packets that were truncated
int start_fd_idx = 0;
@ -420,10 +423,10 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
if (cur_ts >= log_send_ts) // log timeout expired
{
t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped);
t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped, count_b_injected);
fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u\n",
cur_ts, count_p_fec_timeouts, count_p_incoming, count_p_injected, count_p_dropped, count_p_truncated);
fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n",
cur_ts, count_p_fec_timeouts, count_p_incoming, count_b_incoming, count_p_injected, count_b_injected, count_p_dropped, count_p_truncated);
fflush(stdout);
if(count_p_dropped)
@ -438,7 +441,9 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
count_p_fec_timeouts = 0;
count_p_incoming = 0;
count_b_incoming = 0;
count_p_injected = 0;
count_b_injected = 0;
count_p_dropped = 0;
count_p_truncated = 0;
@ -502,6 +507,7 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int fec_timeout
}
count_p_incoming += 1;
count_b_incoming += rsize;
if (rsize > (ssize_t)MAX_PAYLOAD_SIZE)
{

View File

@ -38,7 +38,7 @@ public:
bool send_packet(const uint8_t *buf, size_t size, uint8_t flags);
void send_session_key(void);
virtual void select_output(int idx) = 0;
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped) = 0;
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) = 0;
protected:
virtual void inject_packet(const uint8_t *buf, size_t size) = 0;
@ -66,26 +66,36 @@ private:
class txAntennaItem
{
public:
txAntennaItem(void) : count_injected(0), count_dropped(0), latency_sum(0), latency_min(0), latency_max(0) {}
txAntennaItem(void) : count_p_injected(0), count_b_injected(0), count_p_dropped(0), latency_sum(0), latency_min(0), latency_max(0) {}
void log_latency(uint64_t latency, bool succeeded) {
if(count_injected + count_dropped == 0)
void log_latency(uint64_t latency, bool succeeded, uint32_t packet_size) {
if(count_p_injected + count_p_dropped == 0)
{
latency_min = latency;
latency_max = latency;
} else {
}
else
{
latency_min = std::min(latency, latency_min);
latency_max = std::max(latency, latency_max);
}
latency_sum += latency;
if (succeeded) count_injected += 1;
else count_dropped += 1;
if (succeeded)
{
count_p_injected += 1;
count_b_injected += packet_size;
}
else
{
count_p_dropped += 1;
}
}
uint32_t count_injected;
uint32_t count_dropped;
uint32_t count_p_injected;
uint32_t count_b_injected;
uint32_t count_p_dropped;
uint64_t latency_sum;
uint64_t latency_min;
uint64_t latency_max;
@ -99,7 +109,7 @@ public:
RawSocketTransmitter(int k, int m, const std::string &keypair, uint64_t epoch, uint32_t channel_id, const std::vector<std::string> &wlans, shared_ptr<uint8_t[]> radiotap_header, size_t radiotap_header_len, uint8_t frame_type);
virtual ~RawSocketTransmitter();
virtual void select_output(int idx) { current_output = idx; }
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped);
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
private:
virtual void inject_packet(const uint8_t *buf, size_t size);
const uint32_t channel_id;
@ -129,7 +139,7 @@ public:
saddr.sin_port = htons((unsigned short)base_port);
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected, uint32_t &dropped) {}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) {}
virtual ~UdpTransmitter()
{

View File

@ -197,6 +197,8 @@ typedef struct {
int8_t rssi[RX_ANT_MAX]; //RADIOTAP_DBM_ANTSIGNAL, list of rssi for corresponding antenna idx
int8_t noise[RX_ANT_MAX]; //RADIOTAP_DBM_ANTNOISE, list of (rssi - snr) for corresponding antenna idx
uint16_t freq; //IEEE80211_RADIOTAP_CHANNEL -- channel frequency in MHz
uint8_t mcs_index;
uint8_t bandwidth;
} __attribute__ ((packed)) wrxfwd_t;
// Network packet headers. All numbers are in network (big endian) format

View File

@ -35,6 +35,10 @@ from .server import parse_services
from .common import abort_on_crash, exit_status
from .conf import settings
_orig_stdout = sys.stdout
def set_window_title(s):
print("\033]2;%s\007" % (s,), file=_orig_stdout)
# Workarond for ncurses bug that show error on output to the last position on the screen
@ -44,6 +48,10 @@ def ignore_curses_err(f, *args, **kwargs):
except curses.error:
pass
def addcstr(window, s, attrs=0):
h, w = window.getmaxyx()
addstr(window, h // 2, max((w - len(s)) // 2, 0), s, attrs)
def rectangle(win, uly, ulx, lry, lrx):
"""Draw a rectangle with corners at the provided upper-left
@ -67,6 +75,23 @@ def addstr(window, y, x, s, *attrs):
pass
def human_rate(r):
rate = r * 8
if rate > 1024 * 1024:
rate = rate / 1024 / 1024
mod = 'mbit/s'
else:
rate = rate / 1024
mod = 'kbit/s'
if rate < 10:
return '%0.1f %s' % (rate, mod)
else:
return '%3d %s' % (rate, mod)
class AntennaStat(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024
@ -77,9 +102,12 @@ class AntennaStat(Int32StringReceiver):
self.draw_rx(attrs)
elif attrs['type'] == 'tx':
self.draw_tx(attrs)
elif attrs['type'] == 'cli_title':
set_window_title(attrs['cli_title'])
def draw_rx(self, attrs):
p = attrs['packets']
session_d = attrs['session']
stats_d = attrs['rx_ant_stats']
tx_ant = attrs.get('tx_ant')
rx_id = attrs['id']
@ -89,10 +117,12 @@ class AntennaStat(Int32StringReceiver):
return
window.erase()
addstr(window, 0, 0, '[RX] pkt/s pkt')
addstr(window, 0, 0, ' pkt/s pkt')
msg_l = (('recv %4d %d' % tuple(p['all']), 0),
#('recvb %4d %d' % tuple(p['all_bytes']), 0),
('udp %4d %d' % tuple(p['out']), 0),
#('udpb %4d %d' % tuple(p['out_bytes']), 0),
('fec_r %4d %d' % tuple(p['fec_rec']), curses.A_REVERSE if p['fec_rec'][0] else 0),
('lost %4d %d' % tuple(p['lost']), curses.A_REVERSE if p['lost'][0] else 0),
('d_err %4d %d' % tuple(p['dec_err']), curses.A_REVERSE if p['dec_err'][0] else 0),
@ -103,18 +133,27 @@ class AntennaStat(Int32StringReceiver):
if y < ymax:
addstr(window, y, 0, msg, attr)
session = ''
if session_d:
session = ', FEC: %(fec_k)d/%(fec_n)d' % (session_d)
addstr(window, 0, 20, 'Flow: %s -> %s%s' % \
(human_rate(p['all_bytes'][0]),
human_rate(p['out_bytes'][0]),
session))
if stats_d:
addstr(window, 0, 24, 'Freq [ANT] pkt/s RSSI [dBm] SNR [dB]')
for y, ((freq, ant_id), v) in enumerate(sorted(stats_d.items()), 1):
addstr(window, 2, 20, 'Freq MCS BW [ANT] pkt/s RSSI [dBm] SNR [dB]')
for y, (((freq, mcs_index, bandwith), ant_id), v) in enumerate(sorted(stats_d.items()), 3):
pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max = v
if y < ymax:
active_tx = '*' if (ant_id >> 8) == tx_ant else ' '
addstr(window, y, 24, '%04d %s%04x %4d %3d < %3d < %3d %3d < %3d < %3d' % \
(freq, active_tx, ant_id, pkt_s,
addstr(window, y, 20, '%04d %3d %2d %s%04x %4d %3d < %3d < %3d %3d < %3d < %3d' % \
(freq, mcs_index, bandwith, active_tx, ant_id, pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max))
else:
addstr(window, 0, 25, '[No data]', curses.A_REVERSE)
addstr(window, 2, 20, '[No data]', curses.A_REVERSE)
window.refresh()
@ -128,10 +167,12 @@ class AntennaStat(Int32StringReceiver):
return
window.erase()
addstr(window, 0, 0, '[TX] pkt/s pkt')
addstr(window, 0, 0, ' pkt/s pkt')
msg_l = (('sent %4d %d' % tuple(p['injected']), 0),
#('sentb %4d %d' % tuple(p['injected_bytes']), 0),
('udp %4d %d' % tuple(p['incoming']), 0),
#('udpb %4d %d' % tuple(p['incoming_bytes']), 0),
('fec_t %4d %d' % tuple(p['fec_timeouts']), 0),
('drop %4d %d' % tuple(p['dropped']), curses.A_REVERSE if p['dropped'][0] else 0),
('trunc %4d %d' % tuple(p['truncated']), curses.A_REVERSE if p['truncated'][0] else 0))
@ -141,15 +182,19 @@ class AntennaStat(Int32StringReceiver):
if y < ymax:
addstr(window, y, 0, msg, attr)
addstr(window, 0, 20, 'Flow: %s -> %s' % \
(human_rate(p['incoming_bytes'][0]),
human_rate(p['injected_bytes'][0])))
if latency_d:
addstr(window, 0, 25, '[ANT] pkt/s Injection [us]')
for y, (k, v) in enumerate(sorted(latency_d.items()), 1):
addstr(window, 2, 20, '[ANT] pkt/s Injection [us]')
for y, (k, v) in enumerate(sorted(latency_d.items()), 3):
k = int(k) # json doesn't support int keys
injected, dropped, lat_min, lat_avg, lat_max = v
if y < ymax:
addstr(window, y, 25, '%04x: %4d %4d < %4d < %4d' % (k, injected, lat_min, lat_avg, lat_max))
addstr(window, y, 20, '%04x: %4d %4d < %4d < %4d' % (k, injected, lat_min, lat_avg, lat_max))
else:
addstr(window, 0, 25, '[No data]', curses.A_REVERSE)
addstr(window, 2, 20, '[No data]', curses.A_REVERSE)
window.refresh()
@ -172,7 +217,7 @@ class AntennaStatClientFactory(ReconnectingClientFactory):
curses.resize_term(height, width)
self.stdscr.clear()
service_list = list((s_name, cfg.show_rx_stats, cfg.show_tx_stats) for s_name, _, cfg in parse_services(self.profile))
service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile))
if not service_list:
rectangle(self.stdscr, 0, 0, height - 1, width - 1)
@ -203,47 +248,44 @@ class AntennaStatClientFactory(ReconnectingClientFactory):
hoff_float += h_fixed
whl = []
for ww, xoff, txrx, show_stats in [((width // 2 - 1), 0, 'rx', show_rx_stats),
((width - width // 2 - 1), width // 2, 'tx', show_tx_stats)]:
if show_stats:
err = round(hoff_float) - (hoff_int + int(h_exp))
wh = int(h_exp) + err
if wh < h_fixed:
raise Exception('Terminal height is too small')
else:
wh = h_fixed
for ww, xoff, txrx, show_stats in [((width * 4 // 7 - 1), 0, 'rx', show_rx_stats),
((width - width * 4 // 7 - 1), width * 4 // 7, 'tx', show_tx_stats)]:
if not show_stats:
whl.append(0)
continue
err = round(hoff_float) - (hoff_int + int(h_exp))
wh = int(h_exp) + err
if wh < h_fixed:
raise Exception('Terminal height is too small')
window = self.stdscr.subpad(wh - 2, ww - 2, hoff_int + 1, xoff + 1)
window.idlok(1)
window.scrollok(1)
rectangle(self.stdscr, hoff_int, xoff, hoff_int + wh - 1, xoff + ww)
addstr(self.stdscr, hoff_int, 3 + xoff, '[%s %s %s]' % (self.profile, name, txrx))
if show_stats:
self.windows['%s %s' % (name, txrx)] = window
else:
addstr(window, 0, 0, '[statistics disabled]', curses.A_REVERSE)
window.refresh()
addstr(self.stdscr, hoff_int, 3 + xoff, '[%s: %s %s]' % (txrx.upper(), self.profile, name))
self.windows['%s %s' % (name, txrx)] = window
whl.append(wh)
hoff_int += max(whl)
self.stdscr.refresh()
def startedConnecting(self, connector):
log.msg('Connecting to %s:%d ...' % (connector.host, connector.port))
set_window_title('Connecting to %s:%d ...' % (connector.host, connector.port))
for window in self.windows.values():
window.erase()
addstr(window, 0, 0, 'Connecting...')
addcstr(window, 'Connecting...')
window.refresh()
def buildProtocol(self, addr):
log.msg('Connected to %s' % (addr,))
set_window_title('Connected to %s' % (addr,))
for window in self.windows.values():
window.erase()
addstr(window, 0, 0, 'Waiting for data...')
addcstr(window, 'Waiting for data...')
window.refresh()
self.resetDelay()
@ -252,21 +294,21 @@ class AntennaStatClientFactory(ReconnectingClientFactory):
return p
def clientConnectionLost(self, connector, reason):
log.msg('Connection lost: %s' % (reason.value,))
set_window_title('Connection lost: %s' % (reason.value,))
for window in self.windows.values():
window.erase()
addstr(window, 0, 0, 'Connection lost: %s' % (reason.value,))
addcstr(window, '[Connection lost]', curses.A_REVERSE)
window.refresh()
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
log.msg('Connection failed: %s' % (reason.value,))
set_window_title('Connection failed: %s' % (reason.value,))
for window in self.windows.values():
window.erase()
addstr(window, 0, 0, 'Connection failed: %s' % (reason.value,))
addcstr(window, '[Connection failed]', curses.A_REVERSE)
window.refresh()
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)

View File

@ -112,8 +112,6 @@ link_domain = "test_two_way_udp"
stream_rx = None
stream_tx = None
keypair = None
show_rx_stats = True
show_tx_stats = True
mirror = False # Set to true if you want to mirror packet via all cards for redundancy. Not recommended if cards are on one frequency channel.
# Radio settings for TX and RX
@ -175,7 +173,6 @@ fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emi
[drone_video]
show_rx_stats = False # this is tx only endpoint and has not rx stats
peer = 'listen://0.0.0.0:5602' # listen for video stream (drone)
@ -191,7 +188,6 @@ default_route = False
[gs_video]
show_tx_stats = False # this is rx only endpoint and has not tx stats
peer = 'connect://127.0.0.1:5600' # outgoing connection for video sink (GS)

View File

@ -61,6 +61,7 @@ class StatisticsProtocol(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024
def connectionMade(self):
self.sendString(msgpack.packb(dict(type='cli_title', cli_title=self.factory.cli_title)))
self.factory.ui_sessions.append(self)
def stringReceived(self, string):
@ -81,7 +82,7 @@ class StatsAndSelectorFactory(Factory):
Aggregate RX stats and select TX antenna
"""
def __init__(self):
def __init__(self, profile, wlans, link_domain):
self.ant_sel_cb_list = []
self.rssi_cb_l = []
@ -92,6 +93,9 @@ class StatsAndSelectorFactory(Factory):
# tcp sockets for UI
self.ui_sessions = []
# CLI title
self.cli_title = 'WFB-ng_%s @%s %s [%s]' % (settings.common.version, profile, ', '.join(wlans), link_domain)
def add_ant_sel_cb(self, ant_sel_cb):
self.ant_sel_cb_list.append(ant_sel_cb)
ant_sel_cb(self.tx_sel)
@ -102,9 +106,10 @@ class StatsAndSelectorFactory(Factory):
def _stats_agg_by_freq(self, ant_stats):
stats_agg = {}
for (freq, ant_id), (pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max) in ant_stats.items():
for (((freq, mcs_index, bandwith), ant_id),
(pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max)) in ant_stats.items():
if ant_id not in stats_agg:
stats_agg[ant_id] = (pkt_s,
@ -157,7 +162,7 @@ class StatsAndSelectorFactory(Factory):
self.tx_sel = max_rssi_ant
def update_rx_stats(self, rx_id, packet_stats, ant_stats):
def update_rx_stats(self, rx_id, packet_stats, ant_stats, session):
stats_agg = self._stats_agg_by_freq(ant_stats)
card_rssi_l = list(rssi_avg
for pkt_s,
@ -193,7 +198,9 @@ class StatsAndSelectorFactory(Factory):
# Send stats to CLI sessions
for s in self.ui_sessions:
s.send_stats(dict(type='rx', id=rx_id, tx_ant=self.tx_sel, packets=packet_stats, rx_ant_stats=ant_stats))
s.send_stats(dict(type='rx', id=rx_id, tx_ant=self.tx_sel,
packets=packet_stats, rx_ant_stats=ant_stats,
session=session))
def update_tx_stats(self, tx_id, packet_stats, ant_latency):
if settings.common.debug:
@ -217,6 +224,7 @@ class RXAntennaProtocol(LineReceiver):
self.rx_id = rx_id
self.ant = {}
self.count_all = None
self.session = None
def lineReceived(self, line):
line = line.decode('utf-8').strip()
@ -232,27 +240,27 @@ class RXAntennaProtocol(LineReceiver):
if cmd == 'RX_ANT':
if len(cols) != 5:
raise BadTelemetry()
self.ant[(int(cols[2]), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':'))
self.ant[(tuple(int(i) for i in cols[2].split(':')), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':'))
elif cmd == 'PKT':
if len(cols) != 3:
raise BadTelemetry()
p_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing = list(int(i) for i in cols[2].split(':'))
p_all, b_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing, b_outgoing = list(int(i) for i in cols[2].split(':'))
if not self.count_all:
self.count_all = (p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing)
self.count_all = (p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing)
else:
self.count_all = tuple((a + b) for a, b in zip((p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing),
self.count_all = tuple((a + b) for a, b in zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing),
self.count_all))
stats = dict(zip(('all', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out'),
zip((p_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing),
stats = dict(zip(('all', 'all_bytes', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out', 'out_bytes'),
zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing),
self.count_all)))
# Send stats to aggregators
if self.ant_stat_cb is not None:
self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant))
self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant), self.session)
self.ant.clear()
@ -261,6 +269,7 @@ class RXAntennaProtocol(LineReceiver):
raise BadTelemetry()
epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':'))
self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n)
log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch))
else:
@ -319,16 +328,16 @@ class TXAntennaProtocol(LineReceiver):
if len(cols) != 3:
raise BadTelemetry()
p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':'))
p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':'))
if not self.count_all:
self.count_all = (p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated)
self.count_all = (p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated)
else:
self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated),
self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated),
self.count_all))
stats = dict(zip(('fec_timeouts', 'incoming', 'injected', 'dropped', 'truncated'),
zip((p_fec_timeouts, p_incoming, p_injected, p_dropped, p_truncated),
stats = dict(zip(('fec_timeouts', 'incoming', 'incoming_bytes', 'injected', 'injected_bytes', 'dropped', 'truncated'),
zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated),
self.count_all)))
# Send stats to aggregators
@ -500,8 +509,8 @@ def init(profiles, wlans):
for profile, service_list in services:
# Domain wide antenna selector
ant_sel_f = StatsAndSelectorFactory()
profile_cfg = getattr(settings, profile)
ant_sel_f = StatsAndSelectorFactory(profile, wlans, profile_cfg.link_domain)
link_id = int.from_bytes(hashlib.sha1(profile_cfg.link_domain.encode('utf-8')).digest()[:3], 'big')
if profile_cfg.stats_port:
@ -897,7 +906,7 @@ def main():
log.theLogPublisher._startLogging(obs.emit, False)
log.msg('WFB version %s-%s' % (settings.common.version, settings.common.commit[:8]))
log.msg('WFB-ng version %s-%s' % (settings.common.version, settings.common.commit[:8]))
profiles, wlans = sys.argv[1], list(wlan for arg in sys.argv[2:] for wlan in arg.split())
uname = os.uname()
log.msg('Run on %s/%s @%s, profile(s) %s using %s' % (uname[4], uname[2], uname[1], profiles, ', '.join(wlans)))