1. Fix bug with FEC if first packet of block is not lost

2. Add TX FEC timeout option: emit empty packets if block is open but no input.
   This can reduce video latency.
This commit is contained in:
Vasily Evseenko 2021-08-15 22:48:53 +03:00
parent fa3f43736e
commit cce93ed9e6
8 changed files with 204 additions and 77 deletions

View File

@ -219,7 +219,7 @@ Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n,
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++) for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{ {
rx_ring[ring_idx].block_idx = 0; rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].send_fragment_idx = 0; rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0; rx_ring[ring_idx].has_fragments = 0;
rx_ring[ring_idx].fragments = new uint8_t*[fec_n]; rx_ring[ring_idx].fragments = new uint8_t*[fec_n];
for(int i=0; i < fec_n; i++) for(int i=0; i < fec_n; i++)
@ -319,7 +319,7 @@ int Aggregator::rx_ring_push(void)
fprintf(stderr, "override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments); fprintf(stderr, "override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments);
for(int f_idx=rx_ring[rx_ring_front].send_fragment_idx; f_idx < fec_k; f_idx++) for(int f_idx=rx_ring[rx_ring_front].fragment_to_send_idx; f_idx < fec_k; f_idx++)
{ {
if(rx_ring[rx_ring_front].fragment_map[f_idx]) if(rx_ring[rx_ring_front].fragment_map[f_idx])
{ {
@ -358,7 +358,7 @@ int Aggregator::get_block_ring_idx(uint64_t block_idx)
{ {
ring_idx = rx_ring_push(); ring_idx = rx_ring_push();
rx_ring[ring_idx].block_idx = block_idx + i + 1 - new_blocks; rx_ring[ring_idx].block_idx = block_idx + i + 1 - new_blocks;
rx_ring[ring_idx].send_fragment_idx = 0; rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0; rx_ring[ring_idx].has_fragments = 0;
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t)); memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
} }
@ -463,7 +463,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++) for(int ring_idx = 0; ring_idx < RX_RING_SIZE; ring_idx++)
{ {
rx_ring[ring_idx].block_idx = 0; rx_ring[ring_idx].block_idx = 0;
rx_ring[ring_idx].send_fragment_idx = 0; rx_ring[ring_idx].fragment_to_send_idx = 0;
rx_ring[ring_idx].has_fragments = 0; rx_ring[ring_idx].has_fragments = 0;
memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t)); memset(rx_ring[ring_idx].fragment_map, '\0', fec_n * sizeof(uint8_t));
} }
@ -537,14 +537,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if(ring_idx == rx_ring_front) if(ring_idx == rx_ring_front)
{ {
// check if any packets without gaps // check if any packets without gaps
while(p->send_fragment_idx < fec_k && p->fragment_map[p->send_fragment_idx]) while(p->fragment_to_send_idx < fec_k && p->fragment_map[p->fragment_to_send_idx])
{ {
send_packet(ring_idx, p->send_fragment_idx); send_packet(ring_idx, p->fragment_to_send_idx);
p->send_fragment_idx += 1; p->fragment_to_send_idx += 1;
} }
// remove block if full // remove block if full
if(p->send_fragment_idx == fec_k) if(p->fragment_to_send_idx == fec_k)
{ {
rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE); rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE);
rx_ring_alloc -= 1; rx_ring_alloc -= 1;
@ -555,14 +555,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
// 1. This is not the oldest block but with sufficient number of fragments (K) to decode // 1. This is not the oldest block but with sufficient number of fragments (K) to decode
// 2. This is the oldest block but with gaps and total number of fragments is K // 2. This is the oldest block but with gaps and total number of fragments is K
if(p->send_fragment_idx < fec_k && p->has_fragments == fec_k) if(p->fragment_to_send_idx < fec_k && p->has_fragments == fec_k)
{ {
// send all queued packets in all unfinished blocks before and remove them // send all queued packets in all unfinished blocks before and remove them
int nrm = modN(ring_idx - rx_ring_front, RX_RING_SIZE); int nrm = modN(ring_idx - rx_ring_front, RX_RING_SIZE);
while(nrm > 0) while(nrm > 0)
{ {
for(int f_idx=rx_ring[rx_ring_front].send_fragment_idx; f_idx < fec_k; f_idx++) for(int f_idx=rx_ring[rx_ring_front].fragment_to_send_idx; f_idx < fec_k; f_idx++)
{ {
if(rx_ring[rx_ring_front].fragment_map[f_idx]) if(rx_ring[rx_ring_front].fragment_map[f_idx])
{ {
@ -577,10 +577,10 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
assert(rx_ring_alloc > 0); assert(rx_ring_alloc > 0);
assert(ring_idx == rx_ring_front); assert(ring_idx == rx_ring_front);
// Search for missed data fragments // Search for missed data fragments and apply FEC only if needed
for(int f_idx=p->send_fragment_idx; f_idx < fec_k; f_idx++) for(int f_idx=p->fragment_to_send_idx; f_idx < fec_k; f_idx++)
{ {
if(! p->fragment_map[p->send_fragment_idx]) if(! p->fragment_map[f_idx])
{ {
//Recover missed fragments using FEC //Recover missed fragments using FEC
apply_fec(ring_idx); apply_fec(ring_idx);
@ -588,7 +588,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
// Count total number of recovered fragments // Count total number of recovered fragments
for(; f_idx < fec_k; f_idx++) for(; f_idx < fec_k; f_idx++)
{ {
if(! p->fragment_map[p->send_fragment_idx]) if(! p->fragment_map[f_idx])
{ {
count_p_fec_recovered += 1; count_p_fec_recovered += 1;
} }
@ -597,15 +597,16 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
} }
} }
while(p->send_fragment_idx < fec_k) while(p->fragment_to_send_idx < fec_k)
{ {
send_packet(ring_idx, p->send_fragment_idx); send_packet(ring_idx, p->fragment_to_send_idx);
p->send_fragment_idx += 1; p->fragment_to_send_idx += 1;
} }
// remove block // remove block
rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE); rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE);
rx_ring_alloc -= 1; rx_ring_alloc -= 1;
assert(rx_ring_alloc >= 0);
} }
} }
@ -613,6 +614,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
{ {
wpacket_hdr_t* packet_hdr = (wpacket_hdr_t*)(rx_ring[ring_idx].fragments[fragment_idx]); wpacket_hdr_t* packet_hdr = (wpacket_hdr_t*)(rx_ring[ring_idx].fragments[fragment_idx]);
uint8_t *payload = (rx_ring[ring_idx].fragments[fragment_idx]) + sizeof(wpacket_hdr_t); uint8_t *payload = (rx_ring[ring_idx].fragments[fragment_idx]) + sizeof(wpacket_hdr_t);
uint8_t flags = packet_hdr->flags;
uint16_t packet_size = be16toh(packet_hdr->packet_size); uint16_t packet_size = be16toh(packet_hdr->packet_size);
uint32_t packet_seq = rx_ring[ring_idx].block_idx * fec_k + fragment_idx; uint32_t packet_seq = rx_ring[ring_idx].block_idx * fec_k + fragment_idx;
@ -628,7 +630,8 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
{ {
fprintf(stderr, "corrupted packet %u\n", seq); fprintf(stderr, "corrupted packet %u\n", seq);
count_p_bad += 1; count_p_bad += 1;
}else{ }else if(!(flags & WFB_PACKET_FEC_ONLY))
{
send(sockfd, payload, packet_size, MSG_DONTWAIT); send(sockfd, payload, packet_size, MSG_DONTWAIT);
} }
} }

View File

@ -78,7 +78,7 @@ typedef struct {
uint64_t block_idx; uint64_t block_idx;
uint8_t** fragments; uint8_t** fragments;
uint8_t *fragment_map; uint8_t *fragment_map;
uint8_t send_fragment_idx; uint8_t fragment_to_send_idx;
uint8_t has_fragments; uint8_t has_fragments;
} rx_ring_item_t; } rx_ring_item_t;

View File

@ -189,15 +189,23 @@ void Transmitter::send_session_key(void)
inject_packet((uint8_t*)&session_key_packet, sizeof(session_key_packet)); inject_packet((uint8_t*)&session_key_packet, sizeof(session_key_packet));
} }
void Transmitter::send_packet(const uint8_t *buf, size_t size) void Transmitter::send_packet(const uint8_t *buf, size_t size, uint8_t flags)
{ {
wpacket_hdr_t packet_hdr; wpacket_hdr_t packet_hdr;
assert(size <= MAX_PAYLOAD_SIZE); assert(size <= MAX_PAYLOAD_SIZE);
// FEC-only packets are only for closing already opened blocks
if(fragment_idx == 0 && flags & WFB_PACKET_FEC_ONLY)
{
return;
}
packet_hdr.packet_size = htobe16(size); packet_hdr.packet_size = htobe16(size);
packet_hdr.flags = flags;
memset(block[fragment_idx], '\0', MAX_FEC_PAYLOAD); memset(block[fragment_idx], '\0', MAX_FEC_PAYLOAD);
memcpy(block[fragment_idx], &packet_hdr, sizeof(packet_hdr)); memcpy(block[fragment_idx], &packet_hdr, sizeof(packet_hdr));
memcpy(block[fragment_idx] + sizeof(packet_hdr), buf, size); memcpy(block[fragment_idx] + sizeof(packet_hdr), buf, size);
send_block_fragment(sizeof(packet_hdr) + size); send_block_fragment(sizeof(packet_hdr) + size);
max_packet_size = max(max_packet_size, sizeof(packet_hdr) + size); max_packet_size = max(max_packet_size, sizeof(packet_hdr) + size);
fragment_idx += 1; fragment_idx += 1;
@ -223,14 +231,14 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size)
} }
} }
void video_source(shared_ptr<Transmitter> &t, vector<int> &tx_fd) void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int poll_timeout)
{ {
int nfds = tx_fd.size(); int nfds = rx_fd.size();
struct pollfd fds[nfds]; struct pollfd fds[nfds];
memset(fds, '\0', sizeof(fds)); memset(fds, '\0', sizeof(fds));
int i = 0; int i = 0;
for(auto it=tx_fd.begin(); it != tx_fd.end(); it++, i++) for(auto it=rx_fd.begin(); it != rx_fd.end(); it++, i++)
{ {
int fd = *it; int fd = *it;
if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0) if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0)
@ -246,14 +254,18 @@ void video_source(shared_ptr<Transmitter> &t, vector<int> &tx_fd)
for(;;) for(;;)
{ {
int rc = poll(fds, nfds, -1); int rc = poll(fds, nfds, poll_timeout > 0 ? poll_timeout : -1);
if (rc < 0){ if (rc < 0){
if (errno == EINTR || errno == EAGAIN) continue; if (errno == EINTR || errno == EAGAIN) continue;
throw runtime_error(string_format("poll error: %s", strerror(errno))); throw runtime_error(string_format("poll error: %s", strerror(errno)));
} }
if (rc == 0) continue; // timeout expired if (rc == 0) // timeout expired
{
t->send_packet(NULL, 0, WFB_PACKET_FEC_ONLY);
continue;
}
for(i = 0; i < nfds; i++) for(i = 0; i < nfds; i++)
{ {
@ -267,7 +279,7 @@ void video_source(shared_ptr<Transmitter> &t, vector<int> &tx_fd)
{ {
uint8_t buf[MAX_PAYLOAD_SIZE]; uint8_t buf[MAX_PAYLOAD_SIZE];
ssize_t rsize; ssize_t rsize;
int fd = tx_fd[i]; int fd = rx_fd[i];
t->select_output(i); t->select_output(i);
while((rsize = recv(fd, buf, sizeof(buf), 0)) >= 0) while((rsize = recv(fd, buf, sizeof(buf), 0)) >= 0)
@ -279,7 +291,7 @@ void video_source(shared_ptr<Transmitter> &t, vector<int> &tx_fd)
t->send_session_key(); t->send_session_key();
session_key_announce_ts = cur_ts + SESSION_KEY_ANNOUNCE_MSEC; session_key_announce_ts = cur_ts + SESSION_KEY_ANNOUNCE_MSEC;
} }
t->send_packet(buf, rsize); t->send_packet(buf, rsize, 0);
} }
if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
} }
@ -299,10 +311,12 @@ int main(int argc, char * const *argv)
int stbc = 0; int stbc = 0;
int ldpc = 0; int ldpc = 0;
int mcs_index = 1; int mcs_index = 1;
int debug_port = 0;
int poll_timeout = 0;
string keypair = "tx.key"; string keypair = "tx.key";
while ((opt = getopt(argc, argv, "K:k:n:u:r:p:B:G:S:L:M:")) != -1) { while ((opt = getopt(argc, argv, "K:k:n:u:r:p:B:G:S:L:M:D:T:")) != -1) {
switch (opt) { switch (opt) {
case 'K': case 'K':
keypair = optarg; keypair = optarg;
@ -334,12 +348,18 @@ int main(int argc, char * const *argv)
case 'M': case 'M':
mcs_index = atoi(optarg); mcs_index = atoi(optarg);
break; break;
case 'D':
debug_port = atoi(optarg);
break;
case 'T':
poll_timeout = atoi(optarg);
break;
default: /* '?' */ default: /* '?' */
show_usage: show_usage:
fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] interface1 [interface2] ...\n", fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [ -T poll_timeout ] interface1 [interface2] ...\n",
argv[0]); argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, radio_port=%d bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d\n", fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, radio_port=%d bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d, poll_timeout=%d\n",
keypair.c_str(), k, n, udp_port, radio_port, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index); keypair.c_str(), k, n, udp_port, radio_port, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, poll_timeout);
fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE); fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE);
fprintf(stderr, "WFB version " WFB_VERSION "\n"); fprintf(stderr, "WFB version " WFB_VERSION "\n");
exit(1); exit(1);
@ -397,23 +417,27 @@ int main(int argc, char * const *argv)
} }
try try
{ {
vector<int> tx_fd; vector<int> rx_fd;
vector<string> wlans; vector<string> wlans;
for(int i = 0; optind + i < argc; i++) for(int i = 0; optind + i < argc; i++)
{ {
int fd = open_udp_socket_for_rx(udp_port + i); int fd = open_udp_socket_for_rx(udp_port + i);
fprintf(stderr, "Listen on %d for %s\n", udp_port + i, argv[optind + i]); fprintf(stderr, "Listen on %d for %s\n", udp_port + i, argv[optind + i]);
tx_fd.push_back(fd); rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i])); wlans.push_back(string(argv[optind + i]));
} }
#ifdef DEBUG_TX shared_ptr<Transmitter> t;
shared_ptr<Transmitter>t = shared_ptr<UdpTransmitter>(new UdpTransmitter(k, n, keypair, "127.0.0.1", 5601 + i));
#else
shared_ptr<Transmitter>t = shared_ptr<PcapTransmitter>(new PcapTransmitter(k, n, keypair, radio_port, wlans));
#endif
video_source(t, tx_fd); if(debug_port)
{
fprintf(stderr, "Using %lu ports from %d for wlan emulation\n", wlans.size(), debug_port);
t = shared_ptr<UdpTransmitter>(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port));
} else {
t = shared_ptr<PcapTransmitter>(new PcapTransmitter(k, n, keypair, radio_port, wlans));
}
data_source(t, rx_fd, poll_timeout);
}catch(runtime_error &e) }catch(runtime_error &e)
{ {
fprintf(stderr, "Error: %s\n", e.what()); fprintf(stderr, "Error: %s\n", e.what());

View File

@ -34,7 +34,7 @@ class Transmitter
public: public:
Transmitter(int k, int m, const std::string &keypair); Transmitter(int k, int m, const std::string &keypair);
virtual ~Transmitter(); virtual ~Transmitter();
void send_packet(const uint8_t *buf, size_t size); void send_packet(const uint8_t *buf, size_t size, uint8_t flags);
void send_session_key(void); void send_session_key(void);
virtual void select_output(int idx) = 0; virtual void select_output(int idx) = 0;
protected: protected:
@ -78,9 +78,16 @@ private:
class UdpTransmitter : public Transmitter class UdpTransmitter : public Transmitter
{ {
public: public:
UdpTransmitter(int k, int m, const std::string &keypair, const std::string &client_addr, int client_port) : Transmitter(k, m, keypair) UdpTransmitter(int k, int m, const std::string &keypair, const std::string &client_addr, int base_port) : Transmitter(k, m, keypair),\
base_port(base_port)
{ {
sockfd = open_udp_socket(client_addr, client_port); sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno)));
bzero((char *) &saddr, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = inet_addr(client_addr.c_str());
saddr.sin_port = htons((unsigned short)base_port);
} }
virtual ~UdpTransmitter() virtual ~UdpTransmitter()
@ -88,7 +95,10 @@ public:
close(sockfd); close(sockfd);
} }
virtual void select_output(int /*idx*/){} virtual void select_output(int idx)
{
saddr.sin_port = htons((unsigned short)(base_port + idx));
}
private: private:
virtual void inject_packet(const uint8_t *buf, size_t size) virtual void inject_packet(const uint8_t *buf, size_t size)
@ -106,8 +116,8 @@ private:
{ .iov_base = (void*)buf, { .iov_base = (void*)buf,
.iov_len = size }}; .iov_len = size }};
struct msghdr msghdr = { .msg_name = NULL, struct msghdr msghdr = { .msg_name = &saddr,
.msg_namelen = 0, .msg_namelen = sizeof(saddr),
.msg_iov = iov, .msg_iov = iov,
.msg_iovlen = 2, .msg_iovlen = 2,
.msg_control = NULL, .msg_control = NULL,
@ -117,23 +127,7 @@ private:
sendmsg(sockfd, &msghdr, MSG_DONTWAIT); sendmsg(sockfd, &msghdr, MSG_DONTWAIT);
} }
int open_udp_socket(const std::string &client_addr, int client_port)
{
struct sockaddr_in saddr;
int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno)));
bzero((char *) &saddr, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = inet_addr(client_addr.c_str());
saddr.sin_port = htons((unsigned short)client_port);
if (connect(fd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
{
throw std::runtime_error(string_format("Connect error: %s", strerror(errno)));
}
return fd;
}
int sockfd; int sockfd;
int base_port;
struct sockaddr_in saddr;
}; };

View File

@ -102,9 +102,9 @@ static uint8_t ieee80211_header[] __attribute__((unused)) = {
radiotap_header radiotap_header
ieee_80211_header ieee_80211_header
wblock_hdr_t { packet_type, nonce = (block_idx << 8 + fragment_idx) } wblock_hdr_t { packet_type, nonce = (block_idx << 8 + fragment_idx) }
wpacket_hdr_t { packet_size } # wpacket_hdr_t { flags, packet_size } #
data # data #
+-- encrypted +-- encrypted
*/ */
@ -113,10 +113,13 @@ static uint8_t ieee80211_header[] __attribute__((unused)) = {
#define BLOCK_IDX_MASK ((1LLU << 56) - 1) #define BLOCK_IDX_MASK ((1LLU << 56) - 1)
#define MAX_BLOCK_IDX ((1LLU << 55) - 1) #define MAX_BLOCK_IDX ((1LLU << 55) - 1)
// packet types
#define WFB_PACKET_DATA 0x1 #define WFB_PACKET_DATA 0x1
#define WFB_PACKET_KEY 0x2 #define WFB_PACKET_KEY 0x2
// packet flags
#define WFB_PACKET_FEC_ONLY 0x1
#define SESSION_KEY_ANNOUNCE_MSEC 1000 #define SESSION_KEY_ANNOUNCE_MSEC 1000
#define RX_ANT_MAX 4 #define RX_ANT_MAX 4
@ -148,6 +151,7 @@ typedef struct {
// Plain data packet after FEC decode // Plain data packet after FEC decode
typedef struct { typedef struct {
uint8_t flags;
uint16_t packet_size; // big endian uint16_t packet_size; // big endian
} __attribute__ ((packed)) wpacket_hdr_t; } __attribute__ ((packed)) wpacket_hdr_t;

View File

@ -24,6 +24,8 @@ stream_tx = 1 # radio port for mavlink tx
stream_rx = 2 # radio port for mavlink rx stream_rx = 2 # radio port for mavlink rx
fec_k = 1 # FEC K fec_k = 1 # FEC K
fec_n = 2 # FEC N fec_n = 2 # FEC N
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
port_rx = 14600 # udp port for internal use port_rx = 14600 # udp port for internal use
port_tx = 14601 # udp port range (from port_tx to port_tx + number of wlans) for internal use port_tx = 14601 # udp port range (from port_tx to port_tx + number of wlans) for internal use
@ -65,6 +67,7 @@ port_rx = 14700
port_tx = 14701 port_tx = 14701
fec_k = 1 # FEC K fec_k = 1 # FEC K
fec_n = 2 # FEC N fec_n = 2 # FEC N
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
peer = 'listen://0.0.0.0:14560' # incoming connection peer = 'listen://0.0.0.0:14560' # incoming connection
#peer = 'connect://127.0.0.1:14560' # outgoing connection #peer = 'connect://127.0.0.1:14560' # outgoing connection
@ -88,6 +91,8 @@ stats_port = None
stream = 3 stream = 3
fec_k = 8 # FEC K fec_k = 8 # FEC K
fec_n = 12 # FEC N fec_n = 12 # FEC N
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
peer = 'listen://0.0.0.0:5602' # listen for video stream (drone) peer = 'listen://0.0.0.0:5602' # listen for video stream (drone)
# Radio settings for TX and RX # Radio settings for TX and RX
@ -109,6 +114,7 @@ port_rx = 14800 # udp port for internal use
port_tx = 14801 # udp port range (from port_tx to port_tx + number of wlans) for internal use port_tx = 14801 # udp port range (from port_tx to port_tx + number of wlans) for internal use
fec_k = 1 # FEC K fec_k = 1 # FEC K
fec_n = 2 # FEC N fec_n = 2 # FEC N
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
ifname = 'gs-wfb' ifname = 'gs-wfb'
ifaddr = '10.5.0.1/24' ifaddr = '10.5.0.1/24'
@ -132,6 +138,7 @@ port_rx = 14900
port_tx = 14901 port_tx = 14901
fec_k = 1 # FEC K fec_k = 1 # FEC K
fec_n = 2 # FEC N fec_n = 2 # FEC N
fec_timeout = 0 # [ms], 0 to disable. If no new packets during timeout, emit one empty packet if FEC block is open
ifname = 'drone-wfb' ifname = 'drone-wfb'
ifaddr = '10.5.0.2/24' ifaddr = '10.5.0.2/24'

View File

@ -227,7 +227,7 @@ class RXProtocol(ProcessProtocol):
def __init__(self, antenna_stat, cmd, rx_id): def __init__(self, antenna_stat, cmd, rx_id):
self.cmd = cmd self.cmd = cmd
self.rx_id = rx_id self.rx_id = rx_id
self.ant = AntennaProtocol(antenna_stat, rx_id) self.ant = AntennaProtocol(antenna_stat, rx_id) if antenna_stat is not None else None
self.dbg = DbgProtocol(rx_id) self.dbg = DbgProtocol(rx_id)
self.df = defer.Deferred() self.df = defer.Deferred()
@ -235,7 +235,8 @@ class RXProtocol(ProcessProtocol):
log.msg('Started %s' % (self.rx_id,)) log.msg('Started %s' % (self.rx_id,))
def outReceived(self, data): def outReceived(self, data):
self.ant.dataReceived(data) if self.ant is not None:
self.ant.dataReceived(data)
def errReceived(self, data): def errReceived(self, data):
self.dbg.dataReceived(data) self.dbg.dataReceived(data)
@ -250,7 +251,7 @@ class RXProtocol(ProcessProtocol):
self.df.errback(status) self.df.errback(status)
def start(self): def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=None, childFDs={0: "w", 1: "r", 2: "r"}) df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df) return df.addCallback(lambda _: self.df)
@ -280,7 +281,7 @@ class TXProtocol(ProcessProtocol):
self.df.errback(status) self.df.errback(status)
def start(self): def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=None, df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ,
childFDs={0: "w", 1: "r", 2: "r"}) childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df) return df.addCallback(lambda _: self.df)
@ -330,11 +331,11 @@ def init_mavlink(profile, wlans):
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, (os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n)).split() + wlans cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d' % \ cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), (os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n)).split() + wlans cfg.fec_k, cfg.fec_n, cfg.fec_timeout)).split() + wlans
listen = None listen = None
connect = None connect = None
@ -423,11 +424,11 @@ def init_video(profile, wlans):
log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1])) log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1]))
# We don't use TX diversity for video streaming due to only one transmitter on the vehichle # We don't use TX diversity for video streaming due to only one transmitter on the vehichle
cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d %s' % \ cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d %s' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream, (os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream,
listen[1], os.path.join(settings.path.conf_dir, cfg.keypair), listen[1], os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, wlans[0])).split() cfg.fec_k, cfg.fec_n, cfg.fec_timeout, wlans[0])).split()
df = TXProtocol(cmd, 'video tx').start() df = TXProtocol(cmd, 'video tx').start()
elif connect_re.match(cfg.peer): elif connect_re.match(cfg.peer):
@ -459,11 +460,11 @@ def init_tunnel(profile, wlans):
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, (os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n)).split() + wlans cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d' % \ cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), (os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n)).split() + wlans cfg.fec_k, cfg.fec_n, cfg.fec_timeout)).split() + wlans
p_in = TUNTAPProtocol() p_in = TUNTAPProtocol()
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)] p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)]

View File

@ -0,0 +1,94 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import os
from twisted.python import log
from twisted.trial import unittest
from twisted.internet import reactor, defer
from twisted.internet.protocol import DatagramProtocol
from ..common import df_sleep
from ..server import RXProtocol, TXProtocol, call_and_check_rc
class UDP_TXRX(DatagramProtocol):
def __init__(self, tx_addr):
self.rxq = []
self.tx_addr = tx_addr
def datagramReceived(self, data, addr):
log.msg("got %r from %s" % (data, addr))
self.rxq.append(data)
def send_msg(self, data):
self.transport.write(data, self.tx_addr)
class TXRXTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
bindir = os.path.join(os.path.dirname(__file__), '../..')
yield call_and_check_rc(os.path.join(bindir, 'wfb_keygen'))
self.rxp = UDP_TXRX(('127.0.0.1', 10001))
self.txp = UDP_TXRX(('127.0.0.1', 10003))
self.rx_ep = reactor.listenUDP(10002, self.rxp)
self.tx_ep = reactor.listenUDP(10004, self.txp)
cmd_rx = [os.path.join(bindir, 'wfb_rx'), '-K', 'drone.key', '-a', '10001', '-u', '10002', 'wlan0']
cmd_tx = [os.path.join(bindir, 'wfb_tx'), '-K', 'gs.key', '-u', '10003', '-D', '10004', '-T', '30', 'wlan0']
self.rx_pp = RXProtocol(None, cmd_rx, 'debug rx')
self.tx_pp = TXProtocol(cmd_tx, 'debug tx')
self.rx_pp.start().addErrback(lambda f: f.trap('twisted.internet.error.ProcessTerminated'))
self.tx_pp.start().addErrback(lambda f: f.trap('twisted.internet.error.ProcessTerminated'))
# Wait for tx/rx processes to initialize
yield df_sleep(0.1)
def tearDown(self):
self.rx_pp.transport.signalProcess('KILL')
self.tx_pp.transport.signalProcess('KILL')
self.rx_ep.stopListening()
self.tx_ep.stopListening()
@defer.inlineCallbacks
def test_txrx(self):
self.assertEqual(len(self.txp.rxq), 0)
for i in range(16):
self.txp.send_msg(b'm%d' % (i + 1,))
yield df_sleep(0.1)
self.assertEqual(len(self.txp.rxq), 25) # 1 session + (8 data packets + 4 fec packets) * 2
# Check FEC fail and recovery
# 1. Fail on block #1: lost 5 packets
# 2. Recover on block #2: lost 3 packets
for i, pkt in enumerate(self.txp.rxq):
if i not in (4, 9, 10, 11, 12, 11 + 4, 11 + 5, 11 + 6):
self.rxp.send_msg(pkt)
yield df_sleep(0.1)
self.assertEqual([b'm%d' % (i + 1,) for i in range(16) if i + 1 != 4], self.rxp.rxq)
@defer.inlineCallbacks
def test_fec_timeout(self):
self.assertEqual(len(self.txp.rxq), 0)
for i in range(6):
self.txp.send_msg(b'm%d' % (i + 1,))
yield df_sleep(0.1)
self.assertEqual(len(self.txp.rxq), 13) # 1 session + 8 data packets + 4 fec packets
# Check FEC recovery
for i, pkt in enumerate(self.txp.rxq):
if i not in (2, 4):
self.rxp.send_msg(pkt)
yield df_sleep(0.1)
self.assertEqual([b'm%d' % (i + 1,) for i in range(6)], self.rxp.rxq)