Add initial support for dynamic FEC

This commit is contained in:
Vasily Evseenko 2023-01-09 19:23:17 +03:00
parent 5f63a8cbfd
commit cef2b325e2
4 changed files with 125 additions and 74 deletions

View File

@ -198,30 +198,15 @@ void Receiver::loop_iter(void)
}
Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair, uint64_t epoch, uint32_t channel_id) : \
fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0),
Aggregator::Aggregator(const string &client_addr, int client_port, const string &keypair, uint64_t epoch, uint32_t channel_id) : \
fec_p(NULL), fec_k(-1), fec_n(-1), seq(0), rx_ring_front(0), rx_ring_alloc(0),
last_known_block((uint64_t)-1), epoch(epoch), channel_id(channel_id),
count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0), count_p_override(0)
{
sockfd = open_udp_socket_for_tx(client_addr, client_port);
fec_p = fec_new(fec_k, fec_n);
memset(session_key, '\0', sizeof(session_key));
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{
rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0;
rx_ring[ring_idx].fragments = new uint8_t*[fec_n];
for(int i=0; i < fec_n; i++)
{
rx_ring[ring_idx].fragments[i] = new uint8_t[MAX_FEC_PAYLOAD];
}
rx_ring[ring_idx].fragment_map = new uint8_t[fec_n];
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
}
FILE *fp;
if((fp = fopen(keypair.c_str(), "r")) == NULL)
{
@ -243,6 +228,47 @@ Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n,
Aggregator::~Aggregator()
{
if (fec_p != NULL)
{
deinit_fec();
}
close(sockfd);
}
void Aggregator::init_fec(int k, int n)
{
assert(fec_p == NULL);
assert(k >= 1);
assert(n >= 1);
assert(k <= n);
fec_k = k;
fec_n = n;
fec_p = fec_new(fec_k, fec_n);
rx_ring_front = 0;
rx_ring_alloc = 0;
last_known_block = (uint64_t)-1;
seq = 0;
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{
rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0;
rx_ring[ring_idx].fragments = new uint8_t*[fec_n];
for(int i=0; i < fec_n; i++)
{
rx_ring[ring_idx].fragments[i] = new uint8_t[MAX_FEC_PAYLOAD];
}
rx_ring[ring_idx].fragment_map = new uint8_t[fec_n];
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
}
}
void Aggregator::deinit_fec(void)
{
assert(fec_p != NULL);
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{
@ -255,7 +281,9 @@ Aggregator::~Aggregator()
}
fec_free(fec_p);
close(sockfd);
fec_p = NULL;
fec_k = -1;
fec_n = -1;
}
@ -461,46 +489,56 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if (be64toh(new_session_data.epoch) < epoch)
{
fprintf(stderr, "Session epoch doesn't match\n");
fprintf(stderr, "Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data.epoch), epoch);
count_p_dec_err += 1;
return;
}
if (be32toh(new_session_data.channel_id) != channel_id)
{
fprintf(stderr, "Session channel_id doesn't match\n");
fprintf(stderr, "Session channel_id doesn't match: %d != %d\n", be32toh(new_session_data.channel_id), channel_id);
count_p_dec_err += 1;
return;
}
if (new_session_data.k != fec_k ||
new_session_data.n != fec_n ||
new_session_data.fec_type != WFB_FEC_VDM_RS)
if (new_session_data.fec_type != WFB_FEC_VDM_RS)
{
fprintf(stderr, "Session FEC settings doesn't match\n");
fprintf(stderr, "Unsupported FEC codec type: %d\n", new_session_data.fec_type);
count_p_dec_err += 1;
return;
}
if (new_session_data.n < 1)
{
fprintf(stderr, "Invalid FEC N: %d\n", new_session_data.n);
count_p_dec_err += 1;
return;
}
if (new_session_data.k < 1 || new_session_data.k > new_session_data.n)
{
fprintf(stderr, "Invalid FEC K: %d\n", new_session_data.k);
count_p_dec_err += 1;
return;
}
count_p_dec_ok += 1;
epoch = be64toh(new_session_data.epoch);
if (memcmp(session_key, new_session_data.session_key, sizeof(session_key)) != 0)
{
fprintf(stderr, "New session detected\n");
epoch = be64toh(new_session_data.epoch);
memcpy(session_key, new_session_data.session_key, sizeof(session_key));
rx_ring_front = 0;
rx_ring_alloc = 0;
last_known_block = (uint64_t)-1;
seq = 0;
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
if (fec_p != NULL)
{
rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0;
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
deinit_fec();
}
init_fec(new_session_data.k, new_session_data.n);
fprintf(stdout, "%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%u:%u\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n);
fflush(stdout);
}
return;
@ -671,6 +709,11 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
void Aggregator::apply_fec(int ring_idx)
{
assert(fec_k >= 1);
assert(fec_n >= 1);
assert(fec_k <= fec_n);
assert(fec_p != NULL);
unsigned index[fec_k];
uint8_t *in_blocks[fec_k];
uint8_t *out_blocks[fec_n - fec_k];
@ -836,7 +879,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval)
int main(int argc, char* const *argv)
{
int opt;
uint8_t k = 8, n = 12, radio_port = 0;
uint8_t radio_port = 0;
uint32_t link_id = 0;
uint64_t epoch = 0;
int log_interval = 1000;
@ -846,7 +889,7 @@ int main(int argc, char* const *argv)
rx_mode_t rx_mode = LOCAL;
string keypair = "rx.key";
while ((opt = getopt(argc, argv, "K:fa:k:n:c:u:p:l:i:e:")) != -1) {
while ((opt = getopt(argc, argv, "K:fa:c:u:p:l:i:e:")) != -1) {
switch (opt) {
case 'K':
keypair = optarg;
@ -858,12 +901,6 @@ int main(int argc, char* const *argv)
rx_mode = AGGREGATOR;
srv_port = atoi(optarg);
break;
case 'k':
k = atoi(optarg);
break;
case 'n':
n = atoi(optarg);
break;
case 'c':
client_addr = string(optarg);
break;
@ -884,10 +921,10 @@ int main(int argc, char* const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d\n", keypair.c_str(), k, n, client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "WFB-ng version " WFB_VERSION "\n");
fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n");
exit(1);
@ -924,7 +961,7 @@ int main(int argc, char* const *argv)
shared_ptr<BaseAggregator> agg;
if(rx_mode == LOCAL){
agg = shared_ptr<Aggregator>(new Aggregator(client_addr, client_port, k, n, keypair, epoch, channel_id));
agg = shared_ptr<Aggregator>(new Aggregator(client_addr, client_port, keypair, epoch, channel_id));
}else{
agg = shared_ptr<Forwarder>(new Forwarder(client_addr, client_port));
}
@ -933,7 +970,7 @@ int main(int argc, char* const *argv)
}else if(rx_mode == AGGREGATOR)
{
if (optind > argc) goto show_usage;
Aggregator agg(client_addr, client_port, k, n, keypair, epoch, channel_id);
Aggregator agg(client_addr, client_port, keypair, epoch, channel_id);
network_loop(srv_port, agg, log_interval);
}else{

View File

@ -118,11 +118,13 @@ typedef std::unordered_map<uint64_t, antennaItem> antenna_stat_t;
class Aggregator : public BaseAggregator
{
public:
Aggregator(const std::string &client_addr, int client_port, int k, int n, const std::string &keypair, uint64_t epoch, uint32_t channel_id);
Aggregator(const std::string &client_addr, int client_port, const std::string &keypair, uint64_t epoch, uint32_t channel_id);
~Aggregator();
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna, const int8_t *rssi, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);
private:
void init_fec(int k, int n);
void deinit_fec(void);
void send_packet(int ring_idx, int fragment_idx);
void apply_fec(int ring_idx);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, const uint8_t *ant, const int8_t *rssi);

View File

@ -43,8 +43,8 @@ keypair = 'gs.key' # keypair generated by wfb-keygen
stats_port = 8001 # used by wfb-cli
stream_tx = 0x90 # radio port for mavlink tx
stream_rx = 0x10 # radio port for mavlink rx
fec_k = 1 # FEC K
fec_n = 2 # FEC N
fec_k = 1 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 2 # FEC N (For tx side. Rx will get FEC settings from session packet)
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
port_rx = 14600 # udp port for internal use
@ -55,7 +55,7 @@ peer = 'connect://127.0.0.1:14550' # outgoing connection
mirror = None
# mirror = 'connect://127.0.0.1:14551' # mirroring mavlink packets to OSD
inject_rssi = True # inject RADIO_STATUS packets
inject_rssi = True # inject RADIO_STATUS packets
mavlink_sys_id = 3 # for injected rssi packets
mavlink_comp_id = 68 # MAV_COMP_ID_TELEMETRY_RADIO
@ -76,8 +76,6 @@ call_on_disarm = None # call program on disarm
keypair = 'gs.key' # keypair generated by wfb-keygen
stats_port = 8002 # used by wfb-cli
stream = 0x0 # radio port for video stream
fec_k = 8 # FEC K
fec_n = 12 # FEC N
peer = 'connect://127.0.0.1:5600' # outgoing connection for video sink (GS)
# Radio settings for RX
@ -91,15 +89,15 @@ stream_tx = 0x10
stream_rx = 0x90
port_rx = 14700
port_tx = 14701
fec_k = 1 # FEC K
fec_n = 2 # FEC N
fec_k = 1 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 2 # FEC N (For tx side. Rx will get FEC settings from session packet)
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
peer = 'listen://0.0.0.0:14560' # incoming connection
#peer = 'connect://127.0.0.1:14560' # outgoing connection
mirror = None
inject_rssi = True # inject RADIO_STATUS packets
inject_rssi = True # inject RADIO_STATUS packets
mavlink_sys_id = 3 # for injected rssi packets
mavlink_comp_id = 68 # MAV_COMP_ID_TELEMETRY_RADIO
@ -120,8 +118,8 @@ call_on_disarm = None # call program on disarm
keypair = 'drone.key'
stats_port = None
stream = 0x0
fec_k = 8 # FEC K
fec_n = 12 # FEC N
fec_k = 8 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 12 # FEC N (For tx side. Rx will get FEC settings from session packet)
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
peer = 'listen://0.0.0.0:5602' # listen for video stream (drone)
@ -143,8 +141,8 @@ stream_tx = 0x20 # radio port for tunnel tx
stream_rx = 0xa0 # radio port for tunnel rx
port_rx = 14800 # udp port for internal use
port_tx = 14801 # udp port range (from port_tx to port_tx + number of wlans) for internal use
fec_k = 1 # FEC K
fec_n = 2 # FEC N
fec_k = 1 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 2 # FEC N (For tx side. Rx will get FEC settings from session packet)
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
ifname = 'gs-wfb'
@ -167,8 +165,8 @@ stream_tx = 0xa0
stream_rx = 0x20
port_rx = 14900
port_tx = 14901
fec_k = 1 # FEC K
fec_n = 2 # FEC N
fec_k = 1 # FEC K (For tx side. Rx will get FEC settings from session packet)
fec_n = 2 # FEC N (For tx side. Rx will get FEC settings from session packet)
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
ifname = 'drone-wfb'

View File

@ -54,15 +54,18 @@ class WFBFlags(object):
LINK_JAMMED = 2
fec_types = {1: 'VDM_RS'}
class StatisticsProtocol(Protocol):
def connectionMade(self):
self.factory.sessions.append(self)
self.factory.ui_sessions.append(self)
def dataReceived(self, data):
pass
def connectionLost(self, reason):
self.factory.sessions.remove(self)
self.factory.ui_sessions.remove(self)
def send_stats(self, data):
self.transport.write(json.dumps(data).encode('utf-8') + b'\n')
@ -83,7 +86,7 @@ class AntennaFactory(Factory):
p_in.peer = p_tx_l[0]
# tcp sockets for UI
self.sessions = []
self.ui_sessions = []
def select_tx_antenna(self, ant_rssi):
wlan_rssi = {}
@ -131,7 +134,7 @@ class AntennaFactory(Factory):
if settings.common.debug:
log.msg('%s rssi %s tx#%d %s %s' % (rx_id, max(mav_rssi) if mav_rssi else 'N/A', self.tx_sel, packet_stats, ant_rssi))
for s in self.sessions:
for s in self.ui_sessions:
s.send_stats(dict(id=rx_id, tx_ant=self.tx_sel, packets=packet_stats, rssi=ant_rssi))
@ -145,7 +148,9 @@ class AntennaProtocol(LineReceiver):
self.count_all = None
def lineReceived(self, line):
cols = line.decode('utf-8').strip().split('\t')
line = line.decode('utf-8').strip()
cols = line.split('\t')
try:
if len(cols) < 2:
raise BadTelemetry()
@ -157,6 +162,7 @@ class AntennaProtocol(LineReceiver):
if len(cols) != 4:
raise BadTelemetry()
self.ant[cols[2]] = tuple(int(i) for i in cols[3].split(':'))
elif cmd == 'PKT':
if len(cols) != 3:
raise BadTelemetry()
@ -174,6 +180,14 @@ class AntennaProtocol(LineReceiver):
self.antenna_f.update_stats(self.rx_id, stats, dict(self.ant))
self.ant.clear()
elif cmd == 'SESSION':
if len(cols) != 3:
raise BadTelemetry()
epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':'))
log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch))
else:
raise BadTelemetry()
except BadTelemetry:
@ -306,9 +320,9 @@ def init(profile, wlans):
def init_mavlink(profile, wlans, link_id):
cfg = getattr(settings, '%s_mavlink' % (profile,))
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \
cmd_rx = ('%s -p %d -u %d -K %s -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n, link_id)).split() + wlans
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'),
@ -430,11 +444,11 @@ def init_video(profile, wlans, link_id):
if cfg.stats_port:
reactor.listenTCP(cfg.stats_port, ant_f)
cmd = ('%s -p %d -c %s -u %d -K %s -k %d -n %d -i %d' % \
cmd = ('%s -p %d -c %s -u %d -K %s -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'),
cfg.stream, connect[0], connect[1],
os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.fec_k, cfg.fec_n, link_id)).split() + wlans
link_id)).split() + wlans
df = RXProtocol(ant_f, cmd, 'video rx').start()
else:
@ -446,9 +460,9 @@ def init_video(profile, wlans, link_id):
def init_tunnel(profile, wlans, link_id):
cfg = getattr(settings, '%s_tunnel' % (profile,))
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \
cmd_rx = ('%s -p %d -u %d -K %s -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n, link_id)).split() + wlans
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'),