Encode frequency field in wrxfwd_t in network byte order

Use only ip address of forwarder in antenna_id inside of aggregator.
Add more checks in forwarder/aggregator unit tests
This commit is contained in:
Vasily Evseenko 2024-08-18 00:47:43 +03:00
parent 613e45f191
commit 8c81d23850
4 changed files with 49 additions and 10 deletions

View File

@ -357,7 +357,7 @@ void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx
uint8_t bandwidth, sockaddr_in *sockaddr)
{
wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx,
.freq = freq,
.freq = htons(freq),
.mcs_index = mcs_index,
.bandwidth = bandwidth };
@ -500,7 +500,8 @@ void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const u
if (sockaddr != NULL && sockaddr->sin_family == AF_INET)
{
key.antenna_id = ((uint64_t)ntohl(sockaddr->sin_addr.s_addr) << 32 | (uint64_t)ntohs(sockaddr->sin_port) << 16);
// We ignore port here because for the one host (wlan_idx, antenna_id) will be unique key for all forwarder processes.
key.antenna_id = ((uint64_t)ntohl(sockaddr->sin_addr.s_addr) << 32);
}
key.antenna_id |= ((uint64_t)wlan_idx << 8 | (uint64_t)ant[i]);
@ -972,8 +973,9 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
fprintf(stderr, "Short packet (rx fwd header)\n");
continue;
}
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna,
fwd_hdr.rssi, fwd_hdr.noise, fwd_hdr.freq,
agg.process_packet(buf, rsize - sizeof(wrxfwd_t),
fwd_hdr.wlan_idx, fwd_hdr.antenna,
fwd_hdr.rssi, fwd_hdr.noise, ntohs(fwd_hdr.freq),
fwd_hdr.mcs_index, fwd_hdr.bandwidth, &sockaddr);
}
if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));

View File

@ -210,9 +210,14 @@ private:
memset(fwd_hdr.antenna, 0xff, sizeof(fwd_hdr.antenna));
memset(fwd_hdr.rssi, SCHAR_MIN, sizeof(fwd_hdr.rssi));
memset(fwd_hdr.noise, SCHAR_MAX, sizeof(fwd_hdr.noise));
fwd_hdr.mcs_index = 1;
fwd_hdr.bandwidth = 20;
fwd_hdr.freq = htons(4321);
fwd_hdr.antenna[0] = (uint8_t)(rand() % 2);
fwd_hdr.rssi[0] = (int8_t)(rand() & 0xff);
fwd_hdr.rssi[0] = -42;
fwd_hdr.noise[0] = -70;
struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr,
.iov_len = sizeof(fwd_hdr)},

View File

@ -185,7 +185,7 @@ class StatsAndSelectorFactory(Factory):
def _stats_agg_by_freq(self, ant_stats):
stats_agg = {}
for (((freq, mcs_index, bandwith), ant_id),
for (((freq, mcs_index, bandwidth), ant_id),
(pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max)) in ant_stats.items():
@ -561,7 +561,7 @@ def init_wlans(max_bw, wlans):
elif max_bw == 160:
ht_mode = '160MHz'
else:
raise Exception('Unsupported bandwith %d MHz' % (max_bw,))
raise Exception('Unsupported bandwidth %d MHz' % (max_bw,))
if not settings.common.primary:
log.msg('Skip card init due to secondary role')

View File

@ -35,6 +35,37 @@ def gen_req_id(f):
return _f
class FakeAntennaProtocol(object):
def process_new_session(self, rx_id, session):
log.msg('%s new session %r' % (rx_id, session))
def update_rx_stats(self, rx_id, packet_stats, ant_stats, session):
log.msg('%s %r %r %r' % (rx_id, packet_stats, ant_stats, session))
for (((freq, mcs_index, bandwidth), ant_id),
(pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max)) in ant_stats.items():
assert pkt_s >= 0
assert freq == 4321
assert mcs_index == 1
assert bandwidth == 20
assert rssi_min == rssi_avg == rssi_max == -42
assert snr_min == snr_avg == snr_max == 28
host, port, wlan_idx, ant_id = struct.unpack('!IHBB', ant_id.to_bytes(8, byteorder='big'))
assert host == 0x7f000001
assert port == 0
assert 0 <= wlan_idx < 2
assert 0 <= ant_id < 2
def update_tx_stats(self, tx_id, packet_stats, ant_latency):
log.msg('%s %r %r' % (tx_id, packet_stats, ant_latency))
class TXCommandClient(DatagramProtocol):
noisy = False
@ -100,8 +131,9 @@ class TXRXTestCase(unittest.TestCase):
# '-Q', '-P 1', ## requires root priv
'-i', str(link_id), '-e', str(epoch), '-R', str(512 * 1024), 'wlan0']
self.rx_pp = RXProtocol(None, cmd_rx, 'debug rx')
self.tx_pp = TXProtocol(None, cmd_tx, 'debug tx')
ap = FakeAntennaProtocol()
self.rx_pp = RXProtocol(ap, cmd_rx, 'debug rx')
self.tx_pp = TXProtocol(ap, cmd_tx, 'debug tx')
self.rx_pp.start().addErrback(lambda f: f.trap('twisted.internet.error.ProcessTerminated'))
self.tx_pp.start().addErrback(lambda f: f.trap('twisted.internet.error.ProcessTerminated'))
@ -136,7 +168,7 @@ class TXRXTestCase(unittest.TestCase):
if i not in (4, 9, 10, 11, 12, 11 + 4, 11 + 5, 11 + 6):
self.rxp.send_msg(pkt)
yield df_sleep(0.1)
yield df_sleep(1.1)
self.assertEqual([b'm%d' % (i + 1,) for i in range(16) if i + 1 != 4], self.rxp.rxq)