Improve udp socket buffer overflow handling

1. Add option for socket buffer size for incoming messages on the tx side.
2. Check for socket buffer overflow and show warnings.

You can set socket buffer size system-wide via net.core.rmem_default or
via -R option in wfb_tx.
This commit is contained in:
Vasily Evseenko 2023-03-21 12:43:23 +03:00
parent ea9200d4af
commit 10706c5b2b
7 changed files with 95 additions and 24 deletions

View File

@ -313,7 +313,7 @@ void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx
.msg_iovlen = 2, .msg_iovlen = 2,
.msg_control = NULL, .msg_control = NULL,
.msg_controllen = 0, .msg_controllen = 0,
.msg_flags = 0}; .msg_flags = 0 };
sendmsg(sockfd, &msghdr, MSG_DONTWAIT); sendmsg(sockfd, &msghdr, MSG_DONTWAIT);
} }
@ -796,7 +796,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, sh
} }
} }
void network_loop(int srv_port, Aggregator &agg, int log_interval) void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_size)
{ {
wrxfwd_t fwd_hdr; wrxfwd_t fwd_hdr;
struct sockaddr_in sockaddr; struct sockaddr_in sockaddr;
@ -804,7 +804,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval)
uint64_t log_send_ts = 0; uint64_t log_send_ts = 0;
struct pollfd fds[1]; struct pollfd fds[1];
int fd = open_udp_socket_for_rx(srv_port); int fd = open_udp_socket_for_rx(srv_port, rcv_buf_size);
if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0) if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0)
{ {
@ -889,9 +889,11 @@ int main(int argc, char* const *argv)
int srv_port = 0; int srv_port = 0;
string client_addr = "127.0.0.1"; string client_addr = "127.0.0.1";
rx_mode_t rx_mode = LOCAL; rx_mode_t rx_mode = LOCAL;
int rcv_buf = 0;
string keypair = "rx.key"; string keypair = "rx.key";
while ((opt = getopt(argc, argv, "K:fa:c:u:p:l:i:e:")) != -1) { while ((opt = getopt(argc, argv, "K:fa:c:u:p:l:i:e:R:")) != -1) {
switch (opt) { switch (opt) {
case 'K': case 'K':
keypair = optarg; keypair = optarg;
@ -912,6 +914,9 @@ int main(int argc, char* const *argv)
case 'p': case 'p':
radio_port = atoi(optarg); radio_port = atoi(optarg);
break; break;
case 'R':
rcv_buf = atoi(optarg);
break;
case 'l': case 'l':
log_interval = atoi(optarg); log_interval = atoi(optarg);
break; break;
@ -925,8 +930,8 @@ int main(int argc, char* const *argv)
show_usage: show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]); fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-i link_id] interface1 [interface2] ...\n", argv[0]); fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]); fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval); fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "WFB-ng version " WFB_VERSION "\n"); fprintf(stderr, "WFB-ng version " WFB_VERSION "\n");
fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n"); fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n");
exit(1); exit(1);
@ -974,7 +979,7 @@ int main(int argc, char* const *argv)
if (optind > argc) goto show_usage; if (optind > argc) goto show_usage;
Aggregator agg(client_addr, client_port, keypair, epoch, channel_id); Aggregator agg(client_addr, client_port, keypair, epoch, channel_id);
network_loop(srv_port, agg, log_interval); network_loop(srv_port, agg, log_interval, rcv_buf);
}else{ }else{
throw runtime_error(string_format("Unknown rx_mode=%d", rx_mode)); throw runtime_error(string_format("Unknown rx_mode=%d", rx_mode));
} }

View File

@ -28,6 +28,7 @@
#include <pcap/pcap.h> #include <pcap/pcap.h>
#include <assert.h> #include <assert.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <sys/socket.h>
#include <linux/random.h> #include <linux/random.h>
#include <inttypes.h> #include <inttypes.h>
@ -263,6 +264,21 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size, uint8_t flags)
} }
} }
// Extract SO_RXQ_OVFL counter
uint32_t extract_rxq_overflow(struct msghdr *msg)
{
struct cmsghdr *cmsg;
uint32_t rtn;
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_RXQ_OVFL) {
memcpy(&rtn, CMSG_DATA(cmsg), sizeof(rtn));
return rtn;
}
}
return 0;
}
void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int poll_timeout) void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int poll_timeout)
{ {
int nfds = rx_fd.size(); int nfds = rx_fd.size();
@ -283,6 +299,7 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int poll_timeou
} }
uint64_t session_key_announce_ts = 0; uint64_t session_key_announce_ts = 0;
uint32_t rxq_overflow = 0;
for(;;) for(;;)
{ {
@ -311,11 +328,39 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int poll_timeou
{ {
uint8_t buf[MAX_PAYLOAD_SIZE]; uint8_t buf[MAX_PAYLOAD_SIZE];
ssize_t rsize; ssize_t rsize;
uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))];
int fd = rx_fd[i]; int fd = rx_fd[i];
t->select_output(i); t->select_output(i);
while((rsize = recv(fd, buf, sizeof(buf), 0)) >= 0)
for(;;)
{ {
struct iovec iov = { .iov_base = (void*)buf,
.iov_len = sizeof(buf) };
struct msghdr msghdr = { .msg_name = NULL,
.msg_namelen = 0,
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = &cmsgbuf,
.msg_controllen = sizeof(cmsgbuf),
.msg_flags = 0 };
memset(cmsgbuf, '\0', sizeof(cmsgbuf));
if ((rsize = recvmsg(fd, &msghdr, 0)) < 0)
{
break;
}
uint32_t cur_rxq_overflow = extract_rxq_overflow(&msghdr);
if (cur_rxq_overflow != rxq_overflow)
{
fprintf(stderr, "UDP rxq overflow: %u packets dropped\n", cur_rxq_overflow - rxq_overflow);
rxq_overflow = cur_rxq_overflow;
}
uint64_t cur_ts = get_time_ms(); uint64_t cur_ts = get_time_ms();
if (cur_ts >= session_key_announce_ts) if (cur_ts >= session_key_announce_ts)
{ {
@ -347,10 +392,11 @@ int main(int argc, char * const *argv)
int mcs_index = 1; int mcs_index = 1;
int debug_port = 0; int debug_port = 0;
int poll_timeout = 0; int poll_timeout = 0;
int rcv_buf = 0;
string keypair = "tx.key"; string keypair = "tx.key";
while ((opt = getopt(argc, argv, "K:k:n:u:r:p:B:G:S:L:M:D:T:i:e:")) != -1) { while ((opt = getopt(argc, argv, "K:k:n:u:p:B:G:S:L:M:D:T:i:e:R:")) != -1) {
switch (opt) { switch (opt) {
case 'K': case 'K':
keypair = optarg; keypair = optarg;
@ -367,6 +413,9 @@ int main(int argc, char * const *argv)
case 'p': case 'p':
radio_port = atoi(optarg); radio_port = atoi(optarg);
break; break;
case 'R':
rcv_buf = atoi(optarg);
break;
case 'B': case 'B':
bandwidth = atoi(optarg); bandwidth = atoi(optarg);
break; break;
@ -396,9 +445,9 @@ int main(int argc, char * const *argv)
break; break;
default: /* '?' */ default: /* '?' */
show_usage: show_usage:
fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-T poll_timeout] [-e epoch] [-i link_id] interface1 [interface2] ...\n", fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-T poll_timeout] [-e epoch] [-i link_id] interface1 [interface2] ...\n",
argv[0]); argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d, poll_timeout=%d\n", fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d, poll_timeout=%d, rcv_buf=system_default\n",
keypair.c_str(), k, n, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, poll_timeout); keypair.c_str(), k, n, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, poll_timeout);
fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE); fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE);
fprintf(stderr, "WFB-ng version " WFB_VERSION "\n"); fprintf(stderr, "WFB-ng version " WFB_VERSION "\n");
@ -484,7 +533,7 @@ int main(int argc, char * const *argv)
vector<string> wlans; vector<string> wlans;
for(int i = 0; optind + i < argc; i++) for(int i = 0; optind + i < argc; i++)
{ {
int fd = open_udp_socket_for_rx(udp_port + i); int fd = open_udp_socket_for_rx(udp_port + i, rcv_buf);
fprintf(stderr, "Listen on %d for %s\n", udp_port + i, argv[optind + i]); fprintf(stderr, "Listen on %d for %s\n", udp_port + i, argv[optind + i]);
rx_fd.push_back(fd); rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i])); wlans.push_back(string(argv[optind + i]));

View File

@ -49,14 +49,30 @@ uint64_t get_time_ms(void) // in milliseconds
return ts.tv_sec * 1000LL + ts.tv_nsec / 1000000; return ts.tv_sec * 1000LL + ts.tv_nsec / 1000000;
} }
int open_udp_socket_for_rx(int port) int open_udp_socket_for_rx(int port, int rcv_buf_size)
{ {
struct sockaddr_in saddr; struct sockaddr_in saddr;
int fd = socket(AF_INET, SOCK_DGRAM, 0); int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd < 0) throw runtime_error(string_format("Error opening socket: %s", strerror(errno))); if (fd < 0) throw runtime_error(string_format("Error opening socket: %s", strerror(errno)));
int optval = 1; const int optval = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(int)); if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&optval , sizeof(optval)) !=0)
{
throw runtime_error(string_format("Unable to set SO_REUSEADDR: %s", strerror(errno)));
}
if(setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, (const void *)&optval , sizeof(optval)) != 0)
{
throw runtime_error(string_format("Unable to set SO_RXQ_OVFL: %s", strerror(errno)));
}
if (rcv_buf_size > 0)
{
if(setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (const void *)&rcv_buf_size , sizeof(rcv_buf_size)) !=0)
{
throw runtime_error(string_format("Unable to set SO_RCVBUF: %s", strerror(errno)));
}
}
bzero((char *) &saddr, sizeof(saddr)); bzero((char *) &saddr, sizeof(saddr));
saddr.sin_family = AF_INET; saddr.sin_family = AF_INET;

View File

@ -176,7 +176,7 @@ typedef struct {
#define MAX_FEC_PAYLOAD (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES) #define MAX_FEC_PAYLOAD (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES)
#define MAX_FORWARDER_PACKET_SIZE (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header)) #define MAX_FORWARDER_PACKET_SIZE (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header))
int open_udp_socket_for_rx(int port); int open_udp_socket_for_rx(int port, int rcv_buf_size);
uint64_t get_time_ms(void); uint64_t get_time_ms(void);
#endif #endif

View File

@ -33,6 +33,7 @@ tunnel_agg_timeout= 0.005 # aggragate tuntap packets if less than radio_mtu but
mavlink_agg_timeout = 0.1 # aggragate mavlink packets if less than radio_mtu but no longer than 100ms mavlink_agg_timeout = 0.1 # aggragate mavlink packets if less than radio_mtu but no longer than 100ms
mavlink_err_rate = True # If true then inject RX error rate else absolute values mavlink_err_rate = True # If true then inject RX error rate else absolute values
tx_sel_delta = 3 # hysteresis for antenna selection, [dB] tx_sel_delta = 3 # hysteresis for antenna selection, [dB]
tx_rcv_buf_size = 524288 # UDP SO_RCVBUF. Set 0 to use net.core.rmem_default. Increase in case of non-cbr data stream
wifi_channel = 165 # radio channel @5825 MHz, range: 5815-5835 MHz, width 20MHz wifi_channel = 165 # radio channel @5825 MHz, range: 5815-5835 MHz, width 20MHz
wifi_region = 'BO' # Set CRDA region wifi_region = 'BO' # Set CRDA region
wifi_txpower = None # Some cards don't support tx power settings wifi_txpower = None # Some cards don't support tx power settings

View File

@ -325,11 +325,11 @@ def init_mavlink(profile, wlans, link_id):
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, (os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \ cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d -R %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), (os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, settings.common.tx_rcv_buf_size)).split() + wlans
listen = None listen = None
connect = None connect = None
@ -438,11 +438,11 @@ def init_video(profile, wlans, link_id):
log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1])) log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1]))
# We don't use TX diversity for video streaming due to only one transmitter on the vehichle # We don't use TX diversity for video streaming due to only one transmitter on the vehichle
cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d %s' % \ cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d -R %d %s' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream, (os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream,
listen[1], os.path.join(settings.path.conf_dir, cfg.keypair), listen[1], os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, wlans[0])).split() cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, settings.common.tx_rcv_buf_size, wlans[0])).split()
df = TXProtocol(cmd, 'video tx').start() df = TXProtocol(cmd, 'video tx').start()
elif connect_re.match(cfg.peer): elif connect_re.match(cfg.peer):
@ -474,11 +474,11 @@ def init_tunnel(profile, wlans, link_id):
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx, (os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \ cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d -R %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), (os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index, cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, settings.common.tx_rcv_buf_size)).split() + wlans
p_in = TUNTAPProtocol(mtu=settings.common.radio_mtu, p_in = TUNTAPProtocol(mtu=settings.common.radio_mtu,
agg_timeout=settings.common.tunnel_agg_timeout) agg_timeout=settings.common.tunnel_agg_timeout)

View File

@ -40,9 +40,9 @@ class TXRXTestCase(unittest.TestCase):
link_id = int.from_bytes(os.urandom(3), 'big') link_id = int.from_bytes(os.urandom(3), 'big')
epoch = int(time.time()) epoch = int(time.time())
cmd_rx = [os.path.join(bindir, 'wfb_rx'), '-K', 'drone.key', '-a', '10001', '-u', '10002', cmd_rx = [os.path.join(bindir, 'wfb_rx'), '-K', 'drone.key', '-a', '10001', '-u', '10002',
'-i', str(link_id), '-e', str(epoch), 'wlan0'] '-i', str(link_id), '-e', str(epoch), '-R', str(512 * 1024), 'wlan0']
cmd_tx = [os.path.join(bindir, 'wfb_tx'), '-K', 'gs.key', '-u', '10003', '-D', '10004', '-T', '30', cmd_tx = [os.path.join(bindir, 'wfb_tx'), '-K', 'gs.key', '-u', '10003', '-D', '10004', '-T', '30',
'-i', str(link_id), '-e', str(epoch), 'wlan0'] '-i', str(link_id), '-e', str(epoch), '-R', str(512 * 1024), 'wlan0']
self.rx_pp = RXProtocol(None, cmd_rx, 'debug rx') self.rx_pp = RXProtocol(None, cmd_rx, 'debug rx')
self.tx_pp = TXProtocol(cmd_tx, 'debug tx') self.tx_pp = TXProtocol(cmd_tx, 'debug tx')