Add distributed mode for control plane

This commit is contained in:
Vasily Evseenko 2024-09-13 22:32:09 +03:00
parent 557eef0c8a
commit a41cd378e7
17 changed files with 2148 additions and 1116 deletions

View File

@ -23,7 +23,7 @@ _CFLAGS := $(CFLAGS) -Wall -O2 -fno-strict-aliasing -DWFB_VERSION='"$(VERSION)-$
all: all_bin gs.key test
$(ENV):
virtualenv --python=$(PYTHON) $(ENV)
$(PYTHON) -m virtualenv $(ENV)
$$(PATH=$(ENV)/bin:$(ENV)/local/bin:$(PATH) which pip3) install --upgrade pip setuptools $(STDEB)
all_bin: wfb_rx wfb_tx wfb_keygen wfb_tx_cmd

View File

@ -15,7 +15,7 @@ apt update
apt upgrade
apt install python3-all python3-all-dev libpcap-dev libsodium-dev python3-pip python3-pyroute2 python3-msgpack \
python3-future python3-twisted python3-serial iw virtualenv debhelper dh-python fakeroot build-essential -y
python3-future python3-twisted python3-serial python3-jinja2 iw virtualenv debhelper dh-python fakeroot build-essential -y
# Build
make deb

View File

@ -9,7 +9,7 @@ Type=simple
EnvironmentFile=/etc/default/wifibroadcast
# per-profile environment
EnvironmentFile=-/etc/default/wifibroadcast.%i
ExecStart=/usr/bin/wfb-server %i ${WFB_NICS}
ExecStart=/bin/bash -c "exec /usr/bin/wfb-server --profiles $(echo %i | tr : ' ') --wlans ${WFB_NICS}"
TimeoutStopSec=5s
Restart=on-failure
RestartSec=5s

View File

@ -50,7 +50,7 @@ extern "C"
using namespace std;
Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg) : wlan_idx(wlan_idx), agg(agg)
Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggregator *agg, int rcv_buf_size) : wlan_idx(wlan_idx), agg(agg)
{
char errbuf[PCAP_ERRBUF_SIZE];
@ -60,9 +60,10 @@ Receiver::Receiver(const char *wlan, int wlan_idx, uint32_t channel_id, BaseAggr
throw runtime_error(string_format("Unable to open interface %s in pcap: %s", wlan, errbuf));
}
if (pcap_set_snaplen(ppcap, 4096) !=0) throw runtime_error("set_snaplen failed");
if (rcv_buf_size > 0 && pcap_set_buffer_size(ppcap, rcv_buf_size) != 0) throw runtime_error("set_buffer_size failed");
if (pcap_set_snaplen(ppcap, MAX_PCAP_PACKET_SIZE) != 0) throw runtime_error("set_snaplen failed");
if (pcap_set_promisc(ppcap, 1) != 0) throw runtime_error("set_promisc failed");
if (pcap_set_timeout(ppcap, -1) !=0) throw runtime_error("set_timeout failed");
if (pcap_set_timeout(ppcap, -1) != 0) throw runtime_error("set_timeout failed");
if (pcap_set_immediate_mode(ppcap, 1) != 0) throw runtime_error(string_format("pcap_set_immediate_mode failed: %s", pcap_geterr(ppcap)));
if (pcap_activate(ppcap) !=0) throw runtime_error(string_format("pcap_activate failed: %s", pcap_geterr(ppcap)));
if (pcap_setnonblock(ppcap, 1, errbuf) != 0) throw runtime_error(string_format("set_nonblock failed: %s", errbuf));
@ -850,7 +851,7 @@ void Aggregator::apply_fec(int ring_idx)
fec_decode(fec_p, (const uint8_t**)in_blocks, out_blocks, index, MAX_FEC_PAYLOAD);
}
void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, shared_ptr<BaseAggregator> agg, int log_interval)
void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, shared_ptr<BaseAggregator> agg, int log_interval, int rcv_buf_size)
{
int nfds = argc - optind;
uint64_t log_send_ts = 0;
@ -866,7 +867,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, sh
for(int i = 0; i < nfds; i++)
{
rx[i] = new Receiver(argv[optind + i], i, channel_id, agg.get());
rx[i] = new Receiver(argv[optind + i], i, channel_id, agg.get(), rcv_buf_size);
fds[i].fd = rx[i]->getfd();
fds[i].events = POLLIN;
}
@ -970,7 +971,6 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
if (rsize < (ssize_t)sizeof(wrxfwd_t))
{
fprintf(stderr, "Short packet (rx fwd header)\n");
continue;
}
agg.process_packet(buf, rsize - sizeof(wrxfwd_t),
@ -1035,8 +1035,8 @@ int main(int argc, char* const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "WFB-ng version %s\n", WFB_VERSION);
@ -1080,7 +1080,7 @@ int main(int argc, char* const *argv)
agg = shared_ptr<Forwarder>(new Forwarder(client_addr, client_port));
}
radio_loop(argc, argv, optind, channel_id, agg, log_interval);
radio_loop(argc, argv, optind, channel_id, agg, log_interval, rcv_buf);
}else if(rx_mode == AGGREGATOR)
{
if (optind > argc) goto show_usage;

View File

@ -246,7 +246,7 @@ private:
class Receiver
{
public:
Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg);
Receiver(const char* wlan, int wlan_idx, uint32_t channel_id, BaseAggregator* agg, int rcv_buf_size);
~Receiver();
void loop_iter(void);
int getfd(void){ return fd; }

View File

@ -28,6 +28,7 @@
#include <assert.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <net/if.h>
#include <linux/if_packet.h>
#include <linux/if_ether.h>
@ -37,6 +38,7 @@
#include <string>
#include <memory>
#include <vector>
#include <set>
extern "C"
{
@ -187,44 +189,10 @@ void Transmitter::init_session(int k, int n)
assert(session_packet_size <= MAX_SESSION_PACKET_SIZE);
}
void RawSocketTransmitter::set_mark(uint32_t idx)
{
if (!use_qdisc)
{
return;
}
if (current_output >= 0)
{
int fd = sockfds[current_output];
uint32_t sockopt = fwmark + idx;
if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno)));
}
return;
}
// Handle mirror mode
for(auto it = sockfds.begin(); it != sockfds.end(); it++)
{
int fd = *it;
uint32_t sockopt = fwmark + idx;
if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno)));
}
}
}
RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay,
vector<tags_item_t> &tags, const vector<string> &wlans, radiotap_header_t &radiotap_header,
uint8_t frame_type, bool use_qdisc, uint32_t fwmark) : \
uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base) : \
Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags),
channel_id(channel_id),
current_output(0),
@ -232,7 +200,8 @@ RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair,
radiotap_header(radiotap_header),
frame_type(frame_type),
use_qdisc(use_qdisc),
fwmark(fwmark)
fwmark_base(fwmark_base),
fwmark(fwmark_base)
{
for(auto it=wlans.begin(); it!=wlans.end(); it++)
{
@ -275,6 +244,7 @@ RawSocketTransmitter::RawSocketTransmitter(int k, int n, const string &keypair,
}
sockfds.push_back(fd);
fd_fwmarks[fd] = 0;
}
}
@ -328,7 +298,21 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
{
// Normal mode - only one card do packet transmission in a time
uint64_t start_us = get_time_us();
int rc = sendmsg(sockfds[current_output], &msghdr, 0);
int fd = sockfds[current_output];
if (use_qdisc && fd_fwmarks[fd] != fwmark)
{
uint32_t sockopt = fwmark;
if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno)));
}
fd_fwmarks[fd] = fwmark;
}
int rc = sendmsg(fd, &msghdr, 0);
if (rc < 0 && errno != ENOBUFS)
{
@ -346,7 +330,21 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
for(auto it=sockfds.begin(); it != sockfds.end(); it++, i++)
{
uint64_t start_us = get_time_us();
int rc = sendmsg(*it, &msghdr, 0);
int fd = *it;
if (use_qdisc && fd_fwmarks[fd] != fwmark)
{
uint32_t sockopt = fwmark;
if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno)));
}
fd_fwmarks[fd] = fwmark;
}
int rc = sendmsg(fd, &msghdr, 0);
if (rc < 0 && errno != ENOBUFS)
{
@ -387,6 +385,161 @@ RawSocketTransmitter::~RawSocketTransmitter()
}
RemoteTransmitter::RemoteTransmitter(int k, int n, const string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay,
vector<tags_item_t> &tags, const vector<pair<string, vector<uint16_t>>> &remote_hosts, radiotap_header_t &radiotap_header,
uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base) : \
Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags),
channel_id(channel_id),
current_output(0),
ieee80211_seq(0),
radiotap_header(radiotap_header),
frame_type(frame_type),
use_qdisc(use_qdisc),
fwmark_base(fwmark_base),
fwmark(fwmark_base)
{
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno)));
int output = 0;
for(auto h_it=remote_hosts.begin(); h_it!=remote_hosts.end(); h_it++)
{
uint8_t wlan_id = 0;
for(auto p_it=h_it->second.begin(); p_it != h_it->second.end(); p_it++, output++, wlan_id++)
{
struct sockaddr_in saddr;
memset(&saddr, '\0', sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = inet_addr(h_it->first.c_str());
saddr.sin_port = htons((unsigned short)*p_it);
sockaddrs.push_back(saddr);
output_to_ant_id[output] = ((uint64_t)ntohl(saddr.sin_addr.s_addr) << 32) | (uint64_t)(wlan_id) << 8 | (uint64_t)0xff;
}
}
}
void RemoteTransmitter::inject_packet(const uint8_t *buf, size_t size)
{
assert(size <= MAX_FORWARDER_PACKET_SIZE);
uint8_t ieee_hdr[sizeof(ieee80211_header)];
// fill default values
memcpy(ieee_hdr, ieee80211_header, sizeof(ieee80211_header));
// frame_type
ieee_hdr[0] = frame_type;
// channel_id
uint32_t channel_id_be = htobe32(channel_id);
memcpy(ieee_hdr + SRC_MAC_THIRD_BYTE, &channel_id_be, sizeof(uint32_t));
memcpy(ieee_hdr + DST_MAC_THIRD_BYTE, &channel_id_be, sizeof(uint32_t));
// sequence number
ieee_hdr[FRAME_SEQ_LB] = ieee80211_seq & 0xff;
ieee_hdr[FRAME_SEQ_HB] = (ieee80211_seq >> 8) & 0xff;
ieee80211_seq += 16;
uint32_t _fwmark = use_qdisc ? htonl(this->fwmark) : 0;
struct iovec iov[4] = \
{
// fwmark
{
.iov_base = (void*)&_fwmark,
.iov_len = sizeof(_fwmark),
},
// radiotap header
{ .iov_base = (void*)&radiotap_header.header[0],
.iov_len = radiotap_header.header.size()
},
// ieee80211 header
{ .iov_base = (void*)ieee_hdr,
.iov_len = sizeof(ieee_hdr)
},
// packet payload
{ .iov_base = (void*)buf,
.iov_len = size
}
};
struct msghdr msghdr = \
{ .msg_name = NULL,
.msg_namelen = 0,
.msg_iov = iov,
.msg_iovlen = 4,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0};
struct sockaddr_in saddr;
if (current_output >= 0)
{
// Normal mode - only one card do packet transmission in a time
uint64_t start_us = get_time_us();
saddr = sockaddrs[current_output];
msghdr.msg_name = &saddr;
msghdr.msg_namelen = sizeof(saddr);
int rc = sendmsg(sockfd, &msghdr, 0);
if (rc < 0 && errno != ENOBUFS)
{
throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno)));
}
uint64_t key = output_to_ant_id[current_output];
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
else
{
// Mirror mode - transmit packet via all cards
// Use only for different frequency channels
int i = 0;
for(auto it=sockaddrs.begin(); it != sockaddrs.end(); it++, i++)
{
uint64_t start_us = get_time_us();
saddr = *it;
msghdr.msg_name = &saddr;
msghdr.msg_namelen = sizeof(saddr);
int rc = sendmsg(sockfd, &msghdr, 0);
if (rc < 0 && errno != ENOBUFS)
{
throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno)));
}
uint64_t key = output_to_ant_id[i];
antenna_stat[key].log_latency(get_time_us() - start_us, rc >= 0, size);
}
}
}
void RemoteTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
{
for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
ts, it->first,
it->second.count_p_injected, it->second.count_p_dropped,
it->second.latency_min,
it->second.latency_sum / (it->second.count_p_injected + it->second.count_p_dropped),
it->second.latency_max);
injected_packets += it->second.count_p_injected;
dropped_packets += it->second.count_p_dropped;
injected_bytes += it->second.count_b_injected;
}
antenna_stat.clear();
}
void Transmitter::send_block_fragment(size_t packet_size)
{
uint8_t ciphertext[MAX_FORWARDER_PACKET_SIZE];
@ -529,7 +682,7 @@ void data_source(shared_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0;
uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout
uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
uint32_t count_b_incoming = 0; // incoming udp bytes (received + dropped due to rxq overflow)
uint32_t count_b_incoming = 0; // incoming udp bytes (received only)
uint32_t count_p_injected = 0; // successfully injected packets (include additional fec packets)
uint32_t count_b_injected = 0; // successfully injected bytes (include additional fec packets)
uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout
@ -878,8 +1031,6 @@ radiotap_header_t init_radiotap_header(uint8_t stbc,
switch(bandwidth)
{
case 10:
flags |= IEEE80211_RADIOTAP_MCS_BW_20;
break;
case 20:
flags |= IEEE80211_RADIOTAP_MCS_BW_20;
break;
@ -942,8 +1093,6 @@ radiotap_header_t init_radiotap_header(uint8_t stbc,
switch(bandwidth)
{
case 10:
res.header[VHT_BW_OFF] = IEEE80211_RADIOTAP_VHT_BW_20M;
break;
case 20:
res.header[VHT_BW_OFF] = IEEE80211_RADIOTAP_VHT_BW_20M;
break;
@ -974,6 +1123,340 @@ radiotap_header_t init_radiotap_header(uint8_t stbc,
}
void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)
{
int nfds = rx_fd.size();
assert(nfds > 0);
struct pollfd fds[nfds];
memset(fds, '\0', sizeof(fds));
for(size_t i=0; i < rx_fd.size(); i++)
{
fds[i].fd = rx_fd[i];
fds[i].events = POLLIN;
}
uint32_t rxq_overflow = 0;
uint64_t log_send_ts = 0;
uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow)
uint32_t count_b_incoming = 0; // incoming udp bytes (received only)
uint32_t count_p_dropped = 0; // dropped due to rxq overflows or injection timeout
uint32_t count_p_bad = 0; // injected large packets that were bad
int start_fd_idx = 0;
for(;;)
{
uint64_t cur_ts = get_time_ms();
int poll_timeout = log_send_ts > cur_ts ? log_send_ts - cur_ts : 0;
int rc = poll(fds, nfds, poll_timeout);
if (rc < 0)
{
if (errno == EINTR || errno == EAGAIN) continue;
throw runtime_error(string_format("poll error: %s", strerror(errno)));
}
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts) // log timeout expired
{
if(count_p_dropped)
{
fprintf(stderr, "%u packets dropped\n", count_p_dropped);
}
if(count_p_bad)
{
fprintf(stderr, "%u packets bad\n", count_p_bad);
}
count_p_incoming = 0;
count_b_incoming = 0;
count_p_dropped = 0;
count_p_bad = 0;
log_send_ts = cur_ts + log_interval;
}
if (rc == 0) // poll timeout
{
continue;
}
// rc > 0: events detected
// start from last fd index and reset it to zero
int i = start_fd_idx;
for(start_fd_idx = 0; rc > 0; i++)
{
if (fds[i % nfds].revents & (POLLERR | POLLNVAL))
{
throw runtime_error(string_format("socket error: %s", strerror(errno)));
}
if (fds[i % nfds].revents & POLLIN)
{
uint8_t buf[MAX_DISTRIBUTION_PACKET_SIZE - sizeof(uint32_t) + 1];
uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))];
rc -= 1;
for(;;)
{
ssize_t rsize;
uint32_t _fwmark;
int fd = fds[i % nfds].fd;
struct iovec iov[2] = {
// fwmark
{
.iov_base = (void*)&_fwmark,
.iov_len = sizeof(_fwmark),
},
// packet with radiotap header
{
.iov_base = (void*)buf,
.iov_len = sizeof(buf),
}
};
struct msghdr msghdr = { .msg_name = NULL,
.msg_namelen = 0,
.msg_iov = iov,
.msg_iovlen = 2,
.msg_control = &cmsgbuf,
.msg_controllen = sizeof(cmsgbuf),
.msg_flags = 0 };
memset(cmsgbuf, '\0', sizeof(cmsgbuf));
if ((rsize = recvmsg(fd, &msghdr, MSG_DONTWAIT)) < 0)
{
if (errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
break;
}
if (rsize < (ssize_t)MIN_DISTRIBUTION_PACKET_SIZE || rsize > (ssize_t)MAX_DISTRIBUTION_PACKET_SIZE)
{
count_p_bad += 1;
continue;
}
rsize -= sizeof(uint32_t);
count_p_incoming += 1;
count_b_incoming += rsize;
uint32_t cur_rxq_overflow = extract_rxq_overflow(&msghdr);
if (cur_rxq_overflow != rxq_overflow)
{
// Count dropped packets as possible incoming
count_p_dropped += (cur_rxq_overflow - rxq_overflow);
count_p_incoming += (cur_rxq_overflow - rxq_overflow);
rxq_overflow = cur_rxq_overflow;
}
cur_ts = get_time_ms();
t.inject_packet(i % nfds, buf, rsize, ntohl(_fwmark));
if (cur_ts >= log_send_ts) // log timeout expired
{
// We need to transmit all packets from the queue before tx card switch
start_fd_idx = i % nfds;
break;
}
}
}
}
}
}
void injector_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, bool use_qdisc, int log_interval)
{
vector<int> rx_fd;
vector<string> wlans;
for(int i = 0; optind + i < argc; i++)
{
int bind_port = srv_port != 0 ? srv_port + i : 0;
int fd = open_udp_socket_for_rx(bind_port, rcv_buf);
if (srv_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
}
fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]);
rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i]));
}
if (srv_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
}
auto t = RawSocketInjector(wlans, use_qdisc);
packet_injector(t, rx_fd, log_interval);
}
int open_control_fd(int control_port)
{
int control_fd = open_udp_socket_for_rx(control_port, 0, 0x7f000001); // bind to 127.0.0.1 for security reasons
if (control_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(control_fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
control_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port);
}
fprintf(stderr, "Listen on %d for management commands\n", control_port);
return control_fd;
}
void local_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, int log_interval,
int udp_port, int debug_port, int k, int n, const string &keypair, int fec_timeout,
uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark,
radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror)
{
vector<int> rx_fd;
vector<string> wlans;
vector<tags_item_t> tags;
shared_ptr<Transmitter> t;
for(int i = 0; optind + i < argc; i++)
{
int bind_port = udp_port != 0 ? udp_port + i : 0;
int fd = open_udp_socket_for_rx(bind_port, rcv_buf);
if (udp_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
}
fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]);
rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i]));
}
if (udp_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
}
if (debug_port)
{
fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port);
t = shared_ptr<UdpTransmitter>(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id,
fec_delay, tags, use_qdisc, fwmark));
}
else
{
t = shared_ptr<RawSocketTransmitter>(new RawSocketTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags,
wlans, radiotap_header, frame_type, use_qdisc, fwmark));
}
int control_fd = open_control_fd(control_port);
data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
}
void distributor_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_buf, int log_interval,
int udp_port, int k, int n, const string &keypair, int fec_timeout,
uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, bool use_qdisc, uint32_t fwmark,
radiotap_header_t &radiotap_header, uint8_t frame_type, int control_port, bool mirror)
{
vector<int> rx_fd;
vector<pair<string, vector<uint16_t>>> remote_hosts;
int port_idx = 0;
set<string> hosts;
for(int i = optind; i < argc; i++)
{
vector<uint16_t> remote_ports;
char *p = argv[i];
char *t = NULL;
t = strsep(&p, ":");
if (t == NULL) continue;
string remote_host = string(t);
if(hosts.count(remote_host))
{
throw runtime_error(string_format("Duplicate host %s", remote_host.c_str()));
}
hosts.insert(remote_host);
for(int j=0; (t=strsep(&p, ",")) != NULL; j++)
{
uint16_t remote_port = atoi(t);
int bind_port = (udp_port != 0) ? (udp_port + port_idx++) : 0;
int fd = open_udp_socket_for_rx(bind_port, rcv_buf);
if (udp_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
uint64_t wlan_id = (uint64_t)ntohl(inet_addr(remote_host.c_str())) << 24 | j;
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%" PRIx64 "\n", get_time_ms(), bind_port, wlan_id);
}
fprintf(stderr, "Listen on %d for %s:%d\n", bind_port, remote_host.c_str(), remote_port);
rx_fd.push_back(fd);
remote_ports.push_back(remote_port);
}
remote_hosts.push_back(pair<string, vector<uint16_t>>(remote_host, remote_ports));
}
if (udp_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
}
vector<tags_item_t> tags;
shared_ptr<Transmitter> t = shared_ptr<RemoteTransmitter>(new RemoteTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags,
remote_hosts, radiotap_header, frame_type, use_qdisc, fwmark));
int control_fd = open_control_fd(control_port);
data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
}
int main(int argc, char * const *argv)
{
int opt;
@ -981,6 +1464,7 @@ int main(int argc, char * const *argv)
uint32_t fec_delay = 0;
uint32_t link_id = 0x0;
uint64_t epoch = 0;
int srv_port = 10000;
int udp_port=5600;
int control_port=0;
int log_interval = 1000;
@ -1000,9 +1484,17 @@ int main(int argc, char * const *argv)
uint8_t frame_type = FRAME_TYPE_DATA;
bool use_qdisc = false;
uint32_t fwmark = 0;
tx_mode_t tx_mode = LOCAL;
while ((opt = getopt(argc, argv, "K:k:n:u:p:F:l:B:G:S:L:M:N:D:T:i:e:R:f:mVQP:C:")) != -1) {
while ((opt = getopt(argc, argv, "dI:K:k:n:u:p:F:l:B:G:S:L:M:N:D:T:i:e:R:f:mVQP:C:")) != -1) {
switch (opt) {
case 'I':
tx_mode = INJECTOR;
srv_port = atoi(optarg);
break;
case 'd':
tx_mode = DISTRIBUTOR;
break;
case 'K':
keypair = optarg;
break;
@ -1095,7 +1587,13 @@ int main(int argc, char * const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS] [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] interface1 [interface2] ...\n",
fprintf(stderr, "Local TX: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
" [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] interface1 [interface2] ...\n",
argv[0]);
fprintf(stderr, "TX distributor: %s -d [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
" [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] host1:port1,port2,... [host2:port1,port2,...] ...\n",
argv[0]);
fprintf(stderr, "TX injector: %s -I port [-Q] [-R rcv_buf] [-l log_interval] interface1 [interface2] ...\n",
argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, fec_delay=%u [us], udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d vht_nss=%d, vht_mode=%d, fec_timeout=%d, log_interval=%d, rcv_buf=system_default, frame_type=data, mirror=false, use_qdisc=false, fwmark=%u, control_port=%d\n",
keypair.c_str(), k, n, fec_delay, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, vht_nss, vht_mode, fec_timeout, log_interval, fwmark, control_port);
@ -1134,70 +1632,34 @@ int main(int argc, char * const *argv)
try
{
auto radiotap_header = init_radiotap_header(stbc, ldpc, short_gi, bandwidth, mcs_index, vht_mode, vht_nss);
vector<int> rx_fd;
vector<string> wlans;
int control_fd = open_udp_socket_for_rx(control_port, 0, 0x7f000001); // bind to 127.0.0.1 for security reasons
if (control_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(control_fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
control_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port);
}
fprintf(stderr, "Listen on %d for management commands\n", control_port);
for(int i = 0; optind + i < argc; i++)
{
int bind_port = udp_port != 0 ? udp_port + i : 0;
int fd = open_udp_socket_for_rx(bind_port, rcv_buf);
if (udp_port == 0)
{
struct sockaddr_in saddr;
socklen_t saddr_size = sizeof(saddr);
if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0)
{
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%s\n", get_time_ms(), bind_port, argv[optind + i]);
}
fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]);
rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i]));
}
if (udp_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
}
vector<tags_item_t> tags;
shared_ptr<Transmitter> t;
uint32_t channel_id = (link_id << 8) + radio_port;
if (debug_port)
switch(tx_mode)
{
fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port);
t = shared_ptr<UdpTransmitter>(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id,
fec_delay, tags, use_qdisc, fwmark));
} else {
t = shared_ptr<RawSocketTransmitter>(new RawSocketTransmitter(k, n, keypair, epoch, channel_id, fec_delay, tags,
wlans, radiotap_header, frame_type, use_qdisc, fwmark));
}
case INJECTOR:
injector_loop(argc, argv, optind, srv_port, rcv_buf, use_qdisc, log_interval);
break;
data_source(t, rx_fd, control_fd, fec_timeout, mirror, log_interval);
}catch(runtime_error &e)
case LOCAL:
local_loop(argc, argv, optind, srv_port, rcv_buf, log_interval,
udp_port, debug_port, k, n, keypair, fec_timeout,
epoch, channel_id, fec_delay, use_qdisc, fwmark,
radiotap_header, frame_type, control_port, mirror);
break;
case DISTRIBUTOR:
distributor_loop(argc, argv, optind, srv_port, rcv_buf, log_interval,
udp_port, k, n, keypair, fec_timeout,
epoch, channel_id, fec_delay, use_qdisc, fwmark,
radiotap_header, frame_type, control_port, mirror);
break;
default:
assert(0);
}
}
catch(runtime_error &e)
{
fprintf(stderr, "Error: %s\n", e.what());
exit(1);

View File

@ -25,6 +25,7 @@
#include <errno.h>
#include <string>
#include <vector>
#include <map>
#include <string.h>
#include <stdexcept>
@ -52,6 +53,21 @@ typedef struct {
uint8_t vht_nss;
} radiotap_header_t;
radiotap_header_t init_radiotap_header(uint8_t stbc,
bool ldpc,
bool short_gi,
uint8_t bandwidth,
uint8_t mcs_index,
bool vht_mode,
uint8_t vht_nss);
typedef enum {
LOCAL,
INJECTOR,
DISTRIBUTOR
} tx_mode_t;
class Transmitter
{
public:
@ -95,6 +111,7 @@ private:
std::vector<tags_item_t> tags;
};
class txAntennaItem
{
public:
@ -140,19 +157,14 @@ class RawSocketTransmitter : public Transmitter
public:
RawSocketTransmitter(int k, int n, const std::string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, std::vector<tags_item_t> &tags,
const std::vector<std::string> &wlans, radiotap_header_t &radiotap_header,
uint8_t frame_type, bool use_qdisc, uint32_t fwmark);
uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base);
virtual ~RawSocketTransmitter();
virtual void select_output(int idx)
{
bool sw = current_output != idx;
current_output = idx;
if (sw)
{
// select_output call should happend only between data packets
// All FEC packets issued after last data packet in block and will have set_mark(1)
set_mark(0);
}
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void update_radiotap_header(radiotap_header_t &radiotap_header)
{
@ -166,16 +178,23 @@ public:
private:
virtual void inject_packet(const uint8_t *buf, size_t size);
virtual void set_mark(uint32_t idx);
virtual void set_mark(uint32_t idx)
{
fwmark = fwmark_base + idx;
}
const uint32_t channel_id;
int current_output;
uint16_t ieee80211_seq;
std::vector<int> sockfds;
std::map<int, uint32_t> fd_fwmarks;
tx_antenna_stat_t antenna_stat;
radiotap_header_t radiotap_header;
const uint8_t frame_type;
const bool use_qdisc;
const uint32_t fwmark;
const uint32_t fwmark_base;
uint32_t fwmark;
};
@ -183,8 +202,8 @@ class UdpTransmitter : public Transmitter
{
public:
UdpTransmitter(int k, int n, const std::string &keypair, const std::string &client_addr, int base_port, uint64_t epoch, uint32_t channel_id,
uint32_t fec_delay, std::vector<tags_item_t> &tags, bool use_qdisc, uint32_t fwmark): \
Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), radiotap_header({}), base_port(base_port), use_qdisc(use_qdisc), fwmark(fwmark)
uint32_t fec_delay, std::vector<tags_item_t> &tags, bool use_qdisc, uint32_t fwmark_base): \
Transmitter(k, n, keypair, epoch, channel_id, fec_delay, tags), radiotap_header({}), base_port(base_port), use_qdisc(use_qdisc), fwmark_base(fwmark_base)
{
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) throw std::runtime_error(string_format("Error opening socket: %s", strerror(errno)));
@ -215,7 +234,7 @@ public:
return;
}
uint32_t sockopt = this->fwmark + idx;
uint32_t sockopt = this->fwmark_base + idx;
if(setsockopt(sockfd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", sockfd, sockopt, strerror(errno)));
@ -271,13 +290,143 @@ private:
int base_port;
struct sockaddr_in saddr;
const bool use_qdisc;
const uint32_t fwmark;
const uint32_t fwmark_base;
};
radiotap_header_t init_radiotap_header(uint8_t stbc,
bool ldpc,
bool short_gi,
uint8_t bandwidth,
uint8_t mcs_index,
bool vht_mode,
uint8_t vht_nss);
class RemoteTransmitter : public Transmitter
{
public:
RemoteTransmitter(int k, int n, const std::string &keypair, uint64_t epoch, uint32_t channel_id, uint32_t fec_delay, std::vector<tags_item_t> &tags,
const std::vector<std::pair<std::string, std::vector<uint16_t>>> &remote_hosts, radiotap_header_t &radiotap_header,
uint8_t frame_type, bool use_qdisc, uint32_t fwmark_base);
virtual ~RemoteTransmitter()
{
close(sockfd);
}
virtual void select_output(int idx)
{
current_output = idx;
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void update_radiotap_header(radiotap_header_t &radiotap_header)
{
this->radiotap_header = radiotap_header;
}
virtual radiotap_header_t get_radiotap_header(void)
{
return radiotap_header;
}
private:
virtual void inject_packet(const uint8_t *buf, size_t size);
virtual void set_mark(uint32_t idx)
{
fwmark = fwmark_base + idx;
}
const uint32_t channel_id;
int current_output;
uint16_t ieee80211_seq;
int sockfd;
std::vector<struct sockaddr_in> sockaddrs;
std::map<int, uint64_t> output_to_ant_id;
tx_antenna_stat_t antenna_stat;
radiotap_header_t radiotap_header;
const uint8_t frame_type;
const bool use_qdisc;
const uint32_t fwmark_base;
uint32_t fwmark;
};
class RawSocketInjector
{
public:
RawSocketInjector(const vector<string> &wlans, bool use_qdisc) : use_qdisc(use_qdisc)
{
for(auto it=wlans.begin(); it!=wlans.end(); it++)
{
int fd = socket(PF_PACKET, SOCK_RAW, 0);
if (fd < 0)
{
throw runtime_error(string_format("Unable to open PF_PACKET socket: %s", strerror(errno)));
}
if(!use_qdisc)
{
const int optval = 1;
if(setsockopt(fd, SOL_PACKET, PACKET_QDISC_BYPASS, (const void *)&optval , sizeof(optval)) !=0)
{
close(fd);
throw runtime_error(string_format("Unable to set PACKET_QDISC_BYPASS: %s", strerror(errno)));
}
}
struct ifreq ifr;
memset(&ifr, '\0', sizeof(ifr));
strncpy(ifr.ifr_name, it->c_str(), sizeof(ifr.ifr_name) - 1);
if (ioctl(fd, SIOCGIFINDEX, &ifr) < 0)
{
close(fd);
throw runtime_error(string_format("Unable to get interface index for %s: %s", it->c_str(), strerror(errno)));
}
struct sockaddr_ll sll;
memset(&sll, '\0', sizeof(sll));
sll.sll_family = AF_PACKET;
sll.sll_ifindex = ifr.ifr_ifindex;
sll.sll_protocol = 0;
if (::bind(fd, (struct sockaddr *) &sll, sizeof(sll)) < 0)
{
close(fd);
throw runtime_error(string_format("Unable to bind to %s: %s", it->c_str(), strerror(errno)));
}
sockfds.push_back(fd);
fd_fwmarks[fd] = 0;
}
}
void inject_packet(int wlan_idx, const uint8_t *buf, size_t size, uint32_t fwmark)
{
int fd = sockfds[wlan_idx];
if (use_qdisc && fd_fwmarks[fd] != fwmark)
{
uint32_t sockopt = fwmark;
if(setsockopt(fd, SOL_SOCKET, SO_MARK, (const void *)&sockopt , sizeof(sockopt)) !=0)
{
throw runtime_error(string_format("Unable to set SO_MARK fd(%d)=%u: %s", fd, sockopt, strerror(errno)));
}
fd_fwmarks[fd] = fwmark;
}
if (send(fd, buf, size, 0) < 0 && errno != ENOBUFS)
{
throw runtime_error(string_format("Unable to inject packet: %s", strerror(errno)));
}
}
~RawSocketInjector()
{
for(auto it=sockfds.begin(); it != sockfds.end(); it++)
{
close(*it);
}
}
private:
std::vector<int> sockfds;
std::map<int, uint32_t> fd_fwmarks;
const bool use_qdisc;
};

View File

@ -62,10 +62,10 @@ uint64_t get_time_us(void) // in microseconds
return ts.tv_sec * 1000000LL + ts.tv_nsec / 1000;
}
int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr)
int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr, int socket_type, int socket_protocol)
{
struct sockaddr_in saddr;
int fd = socket(AF_INET, SOCK_DGRAM, 0);
int fd = socket(AF_INET, socket_type, socket_protocol);
if (fd < 0) throw runtime_error(string_format("Error opening socket: %s", strerror(errno)));
const int optval = 1;

View File

@ -253,8 +253,11 @@ typedef struct {
#define MAX_FEC_PAYLOAD (WIFI_MTU - sizeof(ieee80211_header) - sizeof(wblock_hdr_t) - crypto_aead_chacha20poly1305_ABYTES)
#define MAX_FORWARDER_PACKET_SIZE (WIFI_MTU - sizeof(ieee80211_header))
#define MAX_SESSION_PACKET_SIZE (WIFI_MTU - sizeof(ieee80211_header))
#define MIN_DISTRIBUTION_PACKET_SIZE (sizeof(uint32_t) + sizeof(radiotap_header_ht) + sizeof(ieee80211_header)) // ht hdr < vht hdr
#define MAX_DISTRIBUTION_PACKET_SIZE (sizeof(uint32_t) + sizeof(radiotap_header_vht) + WIFI_MTU)
#define MAX_PCAP_PACKET_SIZE (WIFI_MTU + 256) // radiotap header is variable but 8812au/eu has max rtap buffer size 256
int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr = INADDR_ANY);
int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr = INADDR_ANY, int socket_type = SOCK_DGRAM, int socket_protocol = 0);
uint64_t get_time_ms(void);
uint64_t get_time_us(void);

View File

@ -1,5 +1,5 @@
[DEFAULT]
Depends3: python3-twisted, libpcap-dev, libsodium-dev, python3-pyroute2, python3-future, python3-serial, python3-msgpack
Depends3: python3-twisted, libpcap-dev, libsodium-dev, python3-pyroute2, python3-future, python3-serial, python3-msgpack, python3-jinja2
Package3: wfb-ng
Replaces3: wifibroadcast
Maintainer: Vasily Evseenko <svpcom@p2ptech.org>

View File

@ -125,8 +125,22 @@ def human_rate(r):
return '%3d %s' % (rate, mod)
def format_ant(ant_id):
if ant_id < (1<<32):
if ant_id & 0xff == 0xff:
return '%2X:X ' % (ant_id >> 8)
else:
return '%2X:%X ' % (ant_id >> 8, ant_id & 0xff)
if ant_id & 0xff == 0xff:
return '%08X:%X:X' % (ant_id >> 32, (ant_id >> 8) & 0xff)
else:
return '%08X:%X:%X' % (ant_id >> 32, (ant_id >> 8) & 0xff, ant_id & 0xff)
class AntennaStat(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024
is_cluster = False
def stringReceived(self, string):
attrs = msgpack.unpackb(string, strict_map_key=False, use_list=False)
@ -136,13 +150,14 @@ class AntennaStat(Int32StringReceiver):
elif attrs['type'] == 'tx':
self.draw_tx(attrs)
elif attrs['type'] == 'cli_title':
self.is_cluster = attrs['is_cluster']
set_window_title(attrs['cli_title'])
def draw_rx(self, attrs):
p = attrs['packets']
session_d = attrs['session']
stats_d = attrs['rx_ant_stats']
tx_ant = attrs.get('tx_ant')
tx_wlan = attrs.get('tx_wlan')
rx_id = attrs['id']
window = self.factory.windows.get(rx_id)
@ -175,14 +190,21 @@ class AntennaStat(Int32StringReceiver):
addstr_markup(window, 0, 20, flow_str)
if stats_d:
addstr_markup(window, 2, 20, '{Freq MCS BW [ANT] pkt/s} {RSSI} [dBm] {SNR} [dB]')
if self.is_cluster:
lpad = ' ' * 4
rpad = ' ' * 3
else:
lpad = ''
rpad = ''
addstr_markup(window, 2, 20, '{Freq MCS BW %s[ANT]%s pkt/s} {RSSI} [dBm] {SNR} [dB]' % (lpad, rpad))
for y, (((freq, mcs_index, bandwith), ant_id), v) in enumerate(sorted(stats_d.items()), 3):
pkt_s, rssi_min, rssi_avg, rssi_max, snr_min, snr_avg, snr_max = v
if y < ymax:
active_tx = (ant_id >> 8) == tx_ant
addstr_markup(window, y, 20, '%04d %3d %2d %s%04x%s %4d %3d < {%3d} < %3d %3d < {%3d} < %3d' % \
active_tx = ((ant_id >> 8) == tx_wlan)
addstr_markup(window, y, 20, '%04d %3d %2d %s%s%s %4d %3d < {%3d} < %3d %3d < {%3d} < %3d' % \
(freq, mcs_index, bandwith,
'{' if active_tx else '', ant_id, '}' if active_tx else '',
'{' if active_tx else '', format_ant(ant_id), '}' if active_tx else '',
pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max), 0 if active_tx else curses.A_DIM)
@ -221,7 +243,14 @@ class AntennaStat(Int32StringReceiver):
human_rate(p['injected_bytes'][0])))
if latency_d:
addstr_markup(window, 2, 20, '{[ANT] pkt/s} {\u00b0C} {Injection} [us]')
if self.is_cluster:
lpad = ' ' * 4
rpad = ' ' * 3
else:
lpad = ''
rpad = ''
addstr_markup(window, 2, 20, '{%s[ANT]%s pkt/s} {\u00b0C} {Injection} [us]' % (lpad, rpad))
for y, (k, v) in enumerate(sorted(latency_d.items()), 3):
k = int(k) # json doesn't support int keys
injected, dropped, lat_min, lat_avg, lat_max = v
@ -237,7 +266,7 @@ class AntennaStat(Int32StringReceiver):
temp = ' (--)'
if y < ymax:
addstr_markup(window, y, 21, '{%02x}(XX) %4d %3s %4d < {%4d} < %4d' % (k >> 8, injected, temp, lat_min, lat_avg, lat_max))
addstr_markup(window, y, 20, '{%s} %4d %3s %4d < {%4d} < %4d' % (format_ant(k), injected, temp, lat_min, lat_avg, lat_max))
else:
addstr_noerr(window, 2, 20, '[No data]', curses.A_REVERSE)
@ -262,7 +291,7 @@ class AntennaStatClientFactory(ReconnectingClientFactory):
curses.resize_term(height, width)
self.stdscr.clear()
service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile))
service_list = list((s_name, cfg.stream_rx is not None, cfg.stream_tx is not None) for s_name, _, cfg in parse_services(self.profile, None))
if not service_list:
rectangle(self.stdscr, 0, 0, height - 1, width - 1)

149
wfb_ng/cluster.py Normal file
View File

@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2024 Vasily Evseenko <svpcom@p2ptech.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import itertools
from jinja2 import Environment, StrictUndefined
from .services import parse_services, hash_link_domain, bandwidth_map
from .conf import settings
def parse_cluster_services(profiles):
if not settings.cluster.nodes:
raise Exception('Cluster is empty!')
if not settings.cluster.server_address:
raise Exception('Server IP address is not set!')
udp_port_allocator = itertools.count(settings.cluster.base_port_server)
services = list((profile, parse_services(profile, udp_port_allocator)) for profile in profiles)
port_allocators = {}
cluster_nodes = {}
def update_node(node, profile, service_name, link_id, tx_port_base, wlans, srv_cfg):
d = dict(wlans = wlans,
link_id = link_id,
bandwidth = srv_cfg.bandwidth,
stream_tx = srv_cfg.stream_tx,
stream_rx = srv_cfg.stream_rx,
tx_port_base = tx_port_base,
rx_fwd = (settings.cluster.server_address, srv_cfg.udp_port_auto))
if node not in cluster_nodes:
cluster_nodes[node] = {}
cluster_nodes[node]['%s_%s' % (profile, service_name)] = d
def get_allocator(node):
alloc = port_allocators.get(node)
if alloc is not None:
return alloc
alloc = itertools.count(settings.cluster.base_port_node)
port_allocators[node] = alloc
return alloc
for profile, service_list in services:
link_id = hash_link_domain(getattr(settings, profile).link_domain)
for service_name, service_type, srv_cfg in service_list:
auto_peers = []
# Sort cluster nodes for stable result
for node, attrs in sorted(settings.cluster.nodes.items(), key=lambda x: x[0]):
ports = list(next(get_allocator(node)) for wlan in attrs['wlans'])
if not ports:
raise Exception('WiFi interface list is empty for node %s!' % (node,))
auto_peers.append('%s:%s' % (node, ','.join(map(str, ports))))
update_node(node, profile, service_name, link_id, min(ports), attrs['wlans'], srv_cfg)
srv_cfg.udp_peers_auto = auto_peers
return services, cluster_nodes
env = Environment(autoescape=False, undefined=StrictUndefined, trim_blocks=True, lstrip_blocks=True)
env.globals.update({'sorted': sorted, 'repr': repr, 'max': max,
'min': min, 'None': None, 'settings': settings})
script_template = '''\
#!/bin/bash
set -em
export LC_ALL=C
_cleanup()
{
plist=$(jobs -p)
if [ -n "$plist" ]
then
kill -TERM $plist || true
fi
exit 1
}
trap _cleanup EXIT
iw reg set {{ settings.common.wifi_region }}
for wlan in {{ wlans|join(' ') }}
do
if which nmcli > /dev/null && ! nmcli device show $wlan | grep -q '(unmanaged)'
then
nmcli device set $wlan managed no
fi
ip link set $wlan down
iw dev $wlan set monitor otherbss
ip link set $wlan up
iw dev $wlan set channel {{ settings.common.wifi_channel }} {{ ht_mode }}
{% if settings.common.wifi_txpower != None %}
iw dev $wlan set txpower fixed {{ settings.common.wifi_txpower }}
{% endif %}
done
{% for service, attrs in services.items() %}
# {{ service }}
{% if attrs['stream_rx'] != None %}
wfb_rx -f -c {{ attrs['rx_fwd'][0] }} -u {{ attrs['rx_fwd'][1] }} -p {{ attrs['stream_rx'] }} -i {{ attrs['link_id'] }} -R {{ settings.common.tx_rcv_buf_size }} {{ attrs['wlans']|join(' ') }} &
{% endif %}
{% if attrs['stream_tx'] != None %}
wfb_tx -I {{ attrs['tx_port_base'] }} -R {{ settings.common.tx_rcv_buf_size }} {{ attrs['wlans']|join(' ') }} &
{% endif %}
{% endfor %}
# Fail in case of connection loss
(set +x; sleep 1; exec cat > /dev/null) &
wait -n
'''
script_template = env.from_string(script_template)
def gen_cluster_scripts(cluster_nodes):
res = {}
for node, node_attrs in cluster_nodes.items():
wlans = sorted(set().union(*[srv_attrs['wlans'] for srv_attrs in node_attrs.values()]))
max_bw = max(srv_attrs['bandwidth'] for srv_attrs in node_attrs.values())
res[node] = script_template.render(wlans=wlans, ht_mode=bandwidth_map[max_bw], services=node_attrs)
return res

View File

@ -43,6 +43,29 @@ temp_measurement_interval = 10 # [s] (8812eu only) Internal RF path temp measur
temp_overheat_warning = 60 # [*C] (8812eu only) Overheat warning threshold.
# Cluster definition for distributed mode
[cluster]
nodes = {
#'127.0.0.1': { 'wlans': ['wlan1', 'wlan2'] },
#'10.5.1.1' : { 'wlans': ['wlan0', 'wlan1'] },
}
# Cluster init can be auto (--cluster ssh) or manual (--cluster manual)
# In second case you need to generate node setup scripts via (--gen-init)
# and run them on cluster nodes yourself
ssh_user = 'root'
ssh_port = 22
ssh_key = None # Path to ssh private key. If None then it will try to use ssh-agent
server_address = None # Set to IP address which is reachable from all cluster nodes
base_port_server = 10000 # UDP ports allocated on server
base_port_node = 11000 # UDP ports allocated on nodes
## Stream allocation scheme:
# Down streams (vehicle to GS): 0 - 127

577
wfb_ng/protocols.py Normal file
View File

@ -0,0 +1,577 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2024 Vasily Evseenko <svpcom@p2ptech.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import sys
import msgpack
import os
import time
from itertools import groupby
from twisted.python import log, failure
from twisted.internet import reactor, defer, threads, task
from twisted.internet.protocol import ProcessProtocol, Factory
from twisted.protocols.basic import LineReceiver, Int32StringReceiver
from .conf import settings
class BadTelemetry(Exception):
pass
class WFBFlags(object):
LINK_LOST = 1
LINK_JAMMED = 2
fec_types = {1: 'VDM_RS'}
class StatisticsProtocol(Int32StringReceiver):
MAX_LENGTH = 1024 * 1024
def connectionMade(self):
self.sendString(msgpack.packb(dict(type='cli_title',
cli_title=self.factory.cli_title or "",
is_cluster=self.factory.is_cluster)))
self.factory.ui_sessions.append(self)
def stringReceived(self, string):
pass
def connectionLost(self, reason):
self.factory.ui_sessions.remove(self)
def send_stats(self, data):
self.sendString(msgpack.packb(data))
class RFTempMeter(object):
def __init__(self, wlans, measurement_interval):
# RF module temperature by rf_path
self.wlans = tuple(wlans)
self.rf_temperature = {}
self.lc = task.LoopingCall(self.read_temperature)
self.lc.start(measurement_interval, now=True)
def _cleanup(self):
self.lc.stop()
def read_temperature(self):
def _read_temperature():
res = {}
for idx, wlan in enumerate(self.wlans):
fname = '/proc/net/rtl88x2eu/%s/thermal_state' % (wlan,)
try:
with open(fname) as fd:
for line in fd:
line = line.strip()
if not line:
continue
d = {}
for f in line.split(','):
k, v = f.split(':', 1)
d[k.strip()] = int(v.strip())
ant_id = (idx << 8) + d['rf_path']
res[ant_id] = d['temperature']
except FileNotFoundError:
pass
except Exception as v:
reactor.callFromThread(log.err, v, 'Unable to parse %s:' % (fname,))
return res
def _got_temp(temp_d):
self.rf_temperature = temp_d
return threads.deferToThread(_read_temperature).addCallback(_got_temp)
class StatsAndSelectorFactory(Factory):
noisy = False
protocol = StatisticsProtocol
"""
Aggregate RX stats and select TX antenna
"""
def __init__(self, logger, cli_title=None, rf_temp_meter=None, is_cluster=False):
self.is_cluster = is_cluster
self.ant_sel_cb_list = []
self.rssi_cb_l = []
self.tx_sel = None
self.tx_sel_rssi_delta = settings.common.tx_sel_rssi_delta
self.tx_sel_counter_rel_delta = settings.common.tx_sel_counter_rel_delta
self.tx_sel_counter_abs_delta = settings.common.tx_sel_counter_abs_delta
# tcp sockets for UI
self.ui_sessions = []
# machine-readable logger
self.logger = logger
if logger is not None:
self.ui_sessions.append(logger)
self.cli_title = cli_title
self.rf_temp_meter = rf_temp_meter
def _cleanup(self):
pass
def add_ant_sel_cb(self, ant_sel_cb):
self.ant_sel_cb_list.append(ant_sel_cb)
ant_sel_cb(self.tx_sel)
def add_rssi_cb(self, rssi_cb):
self.rssi_cb_l.append(rssi_cb)
def _stats_agg_by_freq(self, ant_stats):
stats_agg = {}
for (((freq, mcs_index, bandwidth), ant_id),
(pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max)) in ant_stats.items():
if ant_id not in stats_agg:
stats_agg[ant_id] = (pkt_s,
rssi_min, rssi_avg * pkt_s, rssi_max,
snr_min, snr_avg * pkt_s, snr_max)
else:
tmp = stats_agg[ant_id]
stats_agg[ant_id] = (pkt_s + tmp[0],
min(rssi_min, tmp[1]),
rssi_avg * pkt_s + tmp[2],
max(rssi_max, tmp[3]),
min(snr_min, tmp[4]),
snr_avg * pkt_s + tmp[5],
max(snr_max, tmp[6]))
return dict((ant_id, (pkt_s,
rssi_min, rssi_avg // pkt_s, rssi_max,
snr_min, snr_avg // pkt_s, snr_max)) \
for ant_id, (pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max) in stats_agg.items())
def select_tx_antenna(self, stats_agg):
wlan_rssi_and_pkts = {}
max_pkts = 0
for wlan_id, grp in groupby(sorted(((ant_id >> 8), pkt_s, rssi_avg) \
for ant_id, (pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max) in stats_agg.items()),
lambda x: x[0]):
grp = list(grp)
# Use max average rssi [dBm] from all wlan's antennas
# Use max packet counter per antenna from all wlan's antennas
rssi = max(rssi for _, pkt_s, rssi in grp)
pkts = max(pkt_s for _, pkt_s, rssi in grp)
max_pkts = max(pkts, max_pkts)
wlan_rssi_and_pkts[wlan_id] = (rssi, pkts)
if not wlan_rssi_and_pkts:
return
# Select antennas with near-maximum RX packet counters only
tx_sel_counter_thr = max_pkts - max(self.tx_sel_counter_abs_delta, max_pkts * self.tx_sel_counter_rel_delta)
wlans_with_max_pkts = set(wlan_id for wlan_id, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if pkt_s >= tx_sel_counter_thr)
if not wlans_with_max_pkts:
return
new_max_rssi, new_tx_wlan = max((rssi, wlan_id) for wlan_id, (rssi, pkt_s) in wlan_rssi_and_pkts.items() if wlan_id in wlans_with_max_pkts)
cur_max_rssi = wlan_rssi_and_pkts.get(self.tx_sel, (-1000, 0))[0]
if new_tx_wlan == self.tx_sel:
return
if self.tx_sel in wlans_with_max_pkts and new_max_rssi - cur_max_rssi < self.tx_sel_rssi_delta:
# Already selected antenna with near-maximum RX packets counter
# and other antennas doesn't have significally large RSSI
return
log.msg('Switch TX wlan %x -> %x, RSSI %d -> %d[dB]' % (self.tx_sel if self.tx_sel is not None else -1,
new_tx_wlan, cur_max_rssi, new_max_rssi))
for ant_sel_cb in self.ant_sel_cb_list:
try:
ant_sel_cb(new_tx_wlan)
except Exception:
log.err()
self.tx_sel = new_tx_wlan
def process_new_session(self, rx_id, session):
if self.logger is not None:
self.logger.send_stats(dict(type='new_session',
timestamp = time.time(),
id=rx_id,
**session))
def update_rx_stats(self, rx_id, packet_stats, ant_stats, session):
stats_agg = self._stats_agg_by_freq(ant_stats)
card_rssi_l = list(rssi_avg
for pkt_s,
rssi_min, rssi_avg, rssi_max,
snr_min, snr_avg, snr_max
in stats_agg.values())
if stats_agg and self.ant_sel_cb_list:
self.select_tx_antenna(stats_agg)
if self.rssi_cb_l:
_idx = 0 if settings.common.mavlink_err_rate else 1
flags = 0
if not card_rssi_l:
flags |= WFBFlags.LINK_LOST
elif packet_stats['dec_err'][0] + packet_stats['bad'][0] > 0:
flags |= WFBFlags.LINK_JAMMED
rx_errors = min(packet_stats['dec_err'][_idx] + packet_stats['bad'][_idx] + packet_stats['lost'][_idx], 65535)
rx_fec = min(packet_stats['fec_rec'][_idx], 65535)
mav_rssi = (max(card_rssi_l) if card_rssi_l else -128) % 256
for rssi_cb in self.rssi_cb_l:
try:
rssi_cb(rx_id, mav_rssi, rx_errors, rx_fec, flags)
except Exception:
log.err()
if settings.common.debug:
log.msg('%s rssi %s TX %x %s %s' % (rx_id, max(card_rssi_l) if card_rssi_l else 'N/A',
self.tx_sel if self.tx_sel is not None else -1, packet_stats, ant_stats))
# Send stats to CLI sessions and logger
for s in self.ui_sessions:
s.send_stats(dict(type='rx',
timestamp = time.time(),
id=rx_id, tx_wlan=self.tx_sel,
packets=packet_stats, rx_ant_stats=ant_stats,
session=session))
def update_tx_stats(self, tx_id, packet_stats, ant_latency):
if settings.common.debug:
log.msg("%s %r %r" % (tx_id, packet_stats, ant_latency))
# Send stats to CLI sessions and logger
for s in self.ui_sessions:
rf_temperature = dict(self.rf_temp_meter.rf_temperature) if self.rf_temp_meter is not None else {}
s.send_stats(dict(type='tx',
timestamp = time.time(),
id=tx_id,
packets=packet_stats,
latency=ant_latency,
rf_temperature=rf_temperature))
class RXAntennaProtocol(LineReceiver):
delimiter = b'\n'
"""
wfb_rx log parser
"""
def __init__(self, ant_stat_cb, rx_id):
self.ant_stat_cb = ant_stat_cb
self.rx_id = rx_id
self.ant = {}
self.count_all = None
self.session = None
def lineReceived(self, line):
line = line.decode('utf-8').strip()
cols = line.split('\t')
try:
if len(cols) < 2:
raise BadTelemetry()
#ts = int(cols[0])
cmd = cols[1]
if cmd == 'RX_ANT':
if len(cols) != 5:
raise BadTelemetry()
self.ant[(tuple(int(i) for i in cols[2].split(':')), int(cols[3], 16))] = tuple(int(i) for i in cols[4].split(':'))
elif cmd == 'PKT':
if len(cols) != 3:
raise BadTelemetry()
p_all, b_all, p_dec_err, p_dec_ok, p_fec_rec, p_lost, p_bad, p_outgoing, b_outgoing = list(int(i) for i in cols[2].split(':'))
if not self.count_all:
self.count_all = (p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing)
else:
self.count_all = tuple((a + b) for a, b in zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing),
self.count_all))
stats = dict(zip(('all', 'all_bytes', 'dec_ok', 'fec_rec', 'lost', 'dec_err', 'bad', 'out', 'out_bytes'),
zip((p_all, b_all, p_dec_ok, p_fec_rec, p_lost, p_dec_err, p_bad, p_outgoing, b_outgoing),
self.count_all)))
# Send stats to aggregators
if self.ant_stat_cb is not None:
self.ant_stat_cb.update_rx_stats(self.rx_id, stats, dict(self.ant), self.session)
self.ant.clear()
elif cmd == 'SESSION':
if len(cols) != 3:
raise BadTelemetry()
epoch, fec_type, fec_k, fec_n = list(int(i) for i in cols[2].split(':'))
self.session = dict(fec_type=fec_types.get(fec_type, 'Unknown'), fec_k=fec_k, fec_n=fec_n, epoch=epoch)
log.msg('New session detected [%s]: FEC=%s K=%d, N=%d, epoch=%d' % (self.rx_id, fec_types.get(fec_type, 'Unknown'), fec_k, fec_n, epoch))
if self.ant_stat_cb is not None:
self.ant_stat_cb.process_new_session(self.rx_id, self.session)
else:
raise BadTelemetry()
except BadTelemetry:
log.msg('Bad telemetry [%s]: %s' % (self.rx_id, line), isError=1)
class DbgProtocol(LineReceiver):
delimiter = b'\n'
"""
stderr parser
"""
def __init__(self, rx_id):
self.rx_id = rx_id
def lineReceived(self, line):
log.msg('%s: %s' % (self.rx_id, line.decode('utf-8')))
class TXAntennaProtocol(LineReceiver):
delimiter = b'\n'
def __init__(self, ant_stat_cb, tx_id, ports_df, control_port_df):
self.ant_stat_cb = ant_stat_cb
self.tx_id = tx_id
self.ports_df = ports_df
self.control_port_df = control_port_df
self.ports = {}
self.control_port = None
self.ant = {}
self.count_all = None
def lineReceived(self, line):
cols = line.decode('utf-8').strip().split('\t')
if len(cols) < 2:
return
#ts = int(cols[0])
cmd = cols[1]
if cmd == 'LISTEN_UDP' and len(cols) == 3:
port, wlan_id = cols[2].split(':', 1)
self.ports[int(wlan_id, 16)] = int(port)
elif cmd == 'LISTEN_UDP_END' and self.ports_df is not None:
self.ports_df.callback(self.ports)
elif cmd == 'LISTEN_UDP_CONTROL' and len(cols) == 3 and self.control_port_df is not None:
port = cols[2]
self.control_port = int(port)
self.control_port_df.callback(self.control_port)
elif cmd == 'TX_ANT':
if len(cols) != 4:
raise BadTelemetry()
self.ant[int(cols[2], 16)] = tuple(int(i) for i in cols[3].split(':'))
elif cmd == 'PKT':
if len(cols) != 3:
raise BadTelemetry()
p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated = list(int(i) for i in cols[2].split(':'))
if not self.count_all:
self.count_all = (p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated)
else:
self.count_all = tuple((a + b) for a, b in zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated),
self.count_all))
stats = dict(zip(('fec_timeouts', 'incoming', 'incoming_bytes', 'injected', 'injected_bytes', 'dropped', 'truncated'),
zip((p_fec_timeouts, p_incoming, b_incoming, p_injected, b_injected, p_dropped, p_truncated),
self.count_all)))
# Send stats to aggregators
if self.ant_stat_cb is not None:
self.ant_stat_cb.update_tx_stats(self.tx_id, stats, dict(self.ant))
self.ant.clear()
class RXProtocol(ProcessProtocol):
"""
manager for wfb_rx process
"""
def __init__(self, ant_stat_cb, cmd, rx_id):
self.cmd = cmd
self.rx_id = rx_id
self.ant = RXAntennaProtocol(ant_stat_cb, rx_id) if ant_stat_cb else None
self.dbg = DbgProtocol(rx_id)
self.df = defer.Deferred()
def connectionMade(self):
log.msg('Started %s' % (self.rx_id,))
def outReceived(self, data):
if self.ant is not None:
self.ant.dataReceived(data)
def errReceived(self, data):
self.dbg.dataReceived(data)
def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped RX %s with code %s' % (self.rx_id, rc))
if rc == 0:
self.df.callback(str(status.value))
else:
self.df.errback(status)
def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ, childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df)
class TXProtocol(ProcessProtocol):
"""
manager for wfb_tx process
"""
def __init__(self, ant_stat_cb, cmd, tx_id, ports_df=None, control_port_df=None):
self.cmd = cmd
self.tx_id = tx_id
self.dbg = DbgProtocol(tx_id)
self.ports_df = ports_df
self.control_port_df = control_port_df
self.port_parser = TXAntennaProtocol(ant_stat_cb, tx_id, ports_df, control_port_df)
self.df = defer.Deferred()
def connectionMade(self):
log.msg('Started %s' % (self.tx_id,))
def outReceived(self, data):
self.port_parser.dataReceived(data)
def errReceived(self, data):
self.dbg.dataReceived(data)
def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped TX %s with code %s' % (self.tx_id, rc))
if self.ports_df is not None:
self.ports_df.cancel()
if self.control_port_df is not None:
self.control_port_df.cancel()
if rc == 0:
self.df.callback(str(status.value))
else:
self.df.errback(status)
def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ,
childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df)
class SSHClientProtocol(ProcessProtocol):
"""
manager for wfb_tx process
"""
def __init__(self, host, username, cmd, *cmd_args, stdin=None, key=None, port=22, use_agent=True):
self.host = host
self.username = username
self.cmd = cmd
self.cmd_args = cmd_args
self.stdin = stdin
self.key = key
self.port = 22
self.use_agent = use_agent
self.dbg = DbgProtocol('ssh %s' % (host,))
self.df = defer.Deferred()
def connectionMade(self):
log.msg('Started ssh %s' % (self.host,))
if self.stdin is not None:
self.transport.write(self.stdin.encode('utf-8'))
def outReceived(self, data):
self.dbg.dataReceived(data)
def errReceived(self, data):
self.dbg.dataReceived(data)
def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped ssh %s with code %s' % (self.host, rc))
if rc == 0:
self.df.callback(str(status.value))
else:
self.df.errback(status)
def start(self):
args = ['ssh',
'-o', 'StrictHostKeyChecking=no',
'-o', 'KbdInteractiveAuthentication=no',
'-o', 'PasswordAuthentication=no']
if self.stdin is None:
args += ['-n']
if self.key:
args += ['-i', self.key,
'-o', 'IdentitiesOnly=yes']
args += ['%s@%s' % (self.username, self.host), self.cmd] + list(self.cmd_args)
env = dict(os.environ)
if not self.use_agent:
env.pop('SSH_AUTH_SOCK', None)
df = defer.maybeDeferred(reactor.spawnProcess, self,
args[0], args, env=env,
childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df)

File diff suppressed because it is too large Load Diff

512
wfb_ng/services.py Normal file
View File

@ -0,0 +1,512 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2024 Vasily Evseenko <svpcom@p2ptech.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import sys
import os
import re
import hashlib
from twisted.python import log, failure
from twisted.internet import reactor, defer, main as ti_main, threads
from twisted.internet.serialport import SerialPort
from .protocols import RXProtocol, TXProtocol
from .proxy import UDPProxyProtocol, MavlinkSerialProxyProtocol, MavlinkUDPProxyProtocol
from .mavlink_protocol import MavlinkARMProtocol, MavlinkTCPFactory, MavlinkLoggerProtocol
from .tuntap import TUNTAPProtocol, TUNTAPTransport
from .config_parser import Section
from .conf import settings
connect_re = re.compile(r'^connect://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE)
listen_re = re.compile(r'^listen://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE)
serial_re = re.compile(r'^serial:(?P<dev>[a-z0-9\-\_/]+):(?P<baud>[0-9]+)$', re.IGNORECASE)
bandwidth_map = {
5: '5MHz',
10: '10MHz',
20: 'HT20',
40: 'HT40+',
80: '80MHz',
160: '160MHz',
}
def hash_link_domain(link_domain):
return int.from_bytes(hashlib.sha1(link_domain.encode('utf-8')).digest()[:3], 'big')
def parse_services(profile_name, udp_port_allocator):
res = []
for stream in getattr(settings, profile_name).streams:
cfg = Section()
stream = dict(stream)
name = stream.pop('name')
service_type = stream.pop('service_type')
for profile in stream.pop('profiles'):
cfg.__dict__.update(getattr(settings, profile).__dict__)
cfg.__dict__.update(stream)
# Allocate udp port for cluster aggregator and services
if udp_port_allocator is not None:
cfg.udp_port_auto = next(udp_port_allocator)
res.append((name, service_type, cfg))
return res
@defer.inlineCallbacks
def init_udp_direct_tx(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster):
# Direct udp doesn't support TX diversity - only first card will be used.
# But if mirror mode is enabled it will use all cards.
if not cfg.mirror and (len(wlans) > 1 or ',' in wlans[0]):
raise Exception("udp_direct_tx doesn't supports diversity but multiple cards selected. Use udp_proxy for such case.")
if not listen_re.match(cfg.peer):
raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer))
m = listen_re.match(cfg.peer)
listen = m.group('addr'), int(m.group('port'))
log.msg('Listen for %s stream %d on %s:%d' % (service_name, cfg.stream_tx, listen[0], listen[1]))
cmd = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s '\
'-B %(bw)d -G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\
'%(mirror)s%(force_vht)s%(qdisc)s '\
'-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'),
cluster=' -d' if is_cluster else '',
frame_type=cfg.frame_type,
stream=cfg.stream_tx,
port=listen[1],
control_port = cfg.control_port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
bw=cfg.bandwidth,
force_vht=' -V' if cfg.force_vht else '',
qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '',
gi="short" if cfg.short_gi else "long",
stbc=cfg.stbc,
ldpc=cfg.ldpc,
mcs=cfg.mcs_index,
mirror=' -m' if cfg.mirror else '',
fec_k=cfg.fec_k,
fec_n=cfg.fec_n,
fec_timeout=cfg.fec_timeout,
fec_delay=cfg.fec_delay,
link_id=link_id,
rcv_buf_size=settings.common.tx_rcv_buf_size)
).split() + wlans
control_port_df = defer.Deferred() if cfg.control_port == 0 else None
df = TXProtocol(ant_sel_f, cmd, 'video tx', control_port_df=control_port_df).start()
log.msg('%s: %s' % (service_name, ' '.join(cmd),))
control_port = cfg.control_port
if control_port == 0:
control_port = yield control_port_df
log.msg('%s use wfb_tx control_port %d' % (service_name, control_port))
yield df
def init_udp_direct_rx(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster):
if not connect_re.match(cfg.peer):
raise Exception('%s: unsupported peer address: %s' % (service_name, cfg.peer))
m = connect_re.match(cfg.peer)
connect = m.group('addr'), int(m.group('port'))
log.msg('Send %s stream %d to %s:%d' % (service_name, cfg.stream_rx, connect[0], connect[1]))
cmd = ('%(cmd)s%(cluster)s -p %(stream)d -c %(ip_addr)s -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'),
cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '',
stream=cfg.stream_rx,
ip_addr=connect[0],
port=connect[1],
key=os.path.join(settings.path.conf_dir, cfg.keypair),
rcv_buf_size=settings.common.tx_rcv_buf_size,
link_id=link_id)).split() + (wlans if not is_cluster else [])
df = RXProtocol(ant_sel_f, cmd, '%s rx' % (service_name,)).start()
log.msg('%s: %s' % (service_name, ' '.join(cmd),))
return df
@defer.inlineCallbacks
def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster):
listen = None
connect = None
serial = None
osd_peer = None
if connect_re.match(cfg.peer):
m = connect_re.match(cfg.peer)
connect = m.group('addr'), int(m.group('port'))
log.msg('Connect %s stream %d(RX), %d(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1]))
elif listen_re.match(cfg.peer):
m = listen_re.match(cfg.peer)
listen = m.group('addr'), int(m.group('port'))
log.msg('Listen for %s stream %d(RX), %d(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1]))
elif serial_re.match(cfg.peer):
m = serial_re.match(cfg.peer)
serial = m.group('dev'), int(m.group('baud'))
log.msg('Open serial port %s on speed %d' % (serial[0], serial[1]))
else:
raise Exception('Unsupported peer address: %s' % (cfg.peer,))
if cfg.osd is not None and connect_re.match(cfg.osd):
m = connect_re.match(cfg.osd)
osd_peer = m.group('addr'), int(m.group('port'))
log.msg('Mirror %s stream to OSD at %s:%d' % (service_name, osd_peer[0], osd_peer[1]))
rx_hooks = []
tx_hooks = []
if cfg.call_on_arm or cfg.call_on_disarm:
arm_proto = MavlinkARMProtocol(cfg.call_on_arm, cfg.call_on_disarm)
rx_hooks.append(arm_proto.dataReceived)
tx_hooks.append(arm_proto.dataReceived)
if cfg.log_messages and ant_sel_f.logger is not None:
mav_log_proto = MavlinkLoggerProtocol(ant_sel_f.logger)
rx_hooks.append(mav_log_proto.dataReceived)
tx_hooks.append(mav_log_proto.dataReceived)
if serial:
p_in = MavlinkSerialProxyProtocol(agg_max_size=settings.common.radio_mtu,
agg_timeout=settings.common.mavlink_agg_timeout,
inject_rssi=cfg.inject_rssi,
mavlink_sys_id=cfg.mavlink_sys_id,
mavlink_comp_id=cfg.mavlink_comp_id,
rx_hooks=rx_hooks, tx_hooks=tx_hooks)
else:
# The first argument is not None only if we initiate mavlink connection
p_in = MavlinkUDPProxyProtocol(connect, agg_max_size=settings.common.radio_mtu,
agg_timeout=settings.common.mavlink_agg_timeout,
inject_rssi=cfg.inject_rssi,
mirror=osd_peer,
mavlink_sys_id=cfg.mavlink_sys_id,
mavlink_comp_id=cfg.mavlink_comp_id,
rx_hooks=rx_hooks, tx_hooks=tx_hooks)
p_rx = UDPProxyProtocol()
p_rx.peer = p_in
rx_socket = reactor.listenUDP(0, p_rx)
sockets = [rx_socket]
cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'),
cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '',
stream=cfg.stream_rx,
port=rx_socket.getHost().port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
rcv_buf_size=settings.common.tx_rcv_buf_size,
link_id=link_id)).split() + (wlans if not is_cluster else [])
cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\
'-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\
'%(mirror)s%(force_vht)s%(qdisc)s '\
'-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'),
cluster=' -d' if is_cluster else '',
frame_type=cfg.frame_type,
stream=cfg.stream_tx,
port=0,
control_port=cfg.control_port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
bw=cfg.bandwidth,
force_vht=' -V' if cfg.force_vht else '',
qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '',
gi="short" if cfg.short_gi else "long",
stbc=cfg.stbc,
ldpc=cfg.ldpc,
mcs=cfg.mcs_index,
mirror=' -m' if cfg.mirror else '',
fec_k=cfg.fec_k,
fec_n=cfg.fec_n,
fec_timeout=cfg.fec_timeout,
fec_delay=cfg.fec_delay,
link_id=link_id,
rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans
log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx)))
log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx)))
# Setup mavlink TCP proxy
if cfg.mavlink_tcp_port:
mav_tcp_f = MavlinkTCPFactory(p_in)
p_in.rx_hooks.append(mav_tcp_f.write)
reactor.listenTCP(cfg.mavlink_tcp_port, mav_tcp_f)
tx_ports_df = defer.Deferred()
control_port_df = defer.Deferred() if cfg.control_port == 0 else None
dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()]
# Wait while wfb_tx allocates ephemeral udp ports and reports them back
tx_ports = yield tx_ports_df
control_port = cfg.control_port
if control_port == 0:
control_port = yield control_port_df
log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port))
p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items())
if serial:
serial_port = SerialPort(p_in, os.path.join('/dev', serial[0]), reactor, baudrate=serial[1])
serial_port._serial.exclusive = True
else:
serial_port = None
sockets += [ reactor.listenUDP(listen[1] if listen else 0, p_in) ]
sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ]
def ant_sel_cb(wlan_id):
p_in.peer = p_tx_map[wlan_id] \
if wlan_id is not None \
else list(p_tx_map.values())[0]
ant_sel_f.add_ant_sel_cb(ant_sel_cb)
# Report RSSI to OSD
ant_sel_f.add_rssi_cb(p_in.send_rssi)
dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start())
def _cleanup(x):
if serial_port is not None:
serial_port.loseConnection()
serial_port.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
for s in sockets:
s.stopListening()
return x
yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)
@defer.inlineCallbacks
def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster):
p_in = TUNTAPProtocol(mtu=settings.common.radio_mtu,
agg_timeout=settings.common.tunnel_agg_timeout)
p_rx = UDPProxyProtocol()
p_rx.peer = p_in
rx_socket = reactor.listenUDP(0, p_rx)
sockets = [rx_socket]
cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'),
cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '',
stream=cfg.stream_rx,
port=rx_socket.getHost().port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
rcv_buf_size=settings.common.tx_rcv_buf_size,
link_id=link_id)).split() + (wlans if not is_cluster else [])
cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d -G %(gi)s '\
'-S %(stbc)d -L %(ldpc)d -M %(mcs)d'\
'%(mirror)s%(force_vht)s%(qdisc)s '\
'-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'),
cluster=' -d' if is_cluster else '',
frame_type=cfg.frame_type,
stream=cfg.stream_tx,
port=0,
control_port=cfg.control_port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
bw=cfg.bandwidth,
force_vht=' -V' if cfg.force_vht else '',
qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '',
gi="short" if cfg.short_gi else "long",
stbc=cfg.stbc,
ldpc=cfg.ldpc,
mcs=cfg.mcs_index,
mirror=' -m' if cfg.mirror else '',
fec_k=cfg.fec_k,
fec_n=cfg.fec_n,
fec_timeout=cfg.fec_timeout,
fec_delay=cfg.fec_delay,
link_id=link_id,
rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans
log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx)))
log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx),))
tx_ports_df = defer.Deferred()
control_port_df = defer.Deferred() if cfg.control_port == 0 else None
dl = [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()]
# Wait while wfb_tx allocates ephemeral udp ports and reports them back
tx_ports = yield tx_ports_df
control_port = cfg.control_port
if control_port == 0:
control_port = yield control_port_df
log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port))
p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items())
tun_ep = TUNTAPTransport(reactor, p_in, cfg.ifname, cfg.ifaddr, mtu=settings.common.radio_mtu, default_route=cfg.default_route)
sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ]
def ant_sel_cb(wlan_id):
p_in.peer = p_tx_map[wlan_id] \
if wlan_id is not None \
else list(p_tx_map.values())[0]
# Broadcast keepalive message to all cards, not to active one
# This allow to use direct antennas on both ends and/or differenct frequencies.
# But when mirroring enabled it will be done by wfb_tx itself
if cfg.mirror:
p_in.all_peers = list(p_tx_map.values())[0:1]
else:
p_in.all_peers = list(p_tx_map.values())
ant_sel_f.add_ant_sel_cb(ant_sel_cb)
dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start())
def _cleanup(x):
tun_ep.loseConnection()
for s in sockets:
s.stopListening()
return x
yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)
@defer.inlineCallbacks
def init_udp_proxy(service_name, cfg, wlans, link_id, ant_sel_f, is_cluster):
listen = None
connect = None
if connect_re.match(cfg.peer):
m = connect_re.match(cfg.peer)
connect = m.group('addr'), int(m.group('port'))
log.msg('Connect %s stream %s(RX), %s(TX) to %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, connect[0], connect[1]))
elif listen_re.match(cfg.peer):
m = listen_re.match(cfg.peer)
listen = m.group('addr'), int(m.group('port'))
log.msg('Listen for %s stream %s(RX), %s(TX) on %s:%d' % (service_name, cfg.stream_rx, cfg.stream_tx, listen[0], listen[1]))
else:
raise Exception('Unsupported peer address: %s' % (cfg.peer,))
# The first argument is not None only if we initiate mavlink connection
p_in = UDPProxyProtocol(connect)
sockets = [reactor.listenUDP(listen[1] if listen else 0, p_in)]
dl = []
if cfg.stream_rx is not None:
p_rx = UDPProxyProtocol()
p_rx.peer = p_in
rx_socket = reactor.listenUDP(0, p_rx)
sockets = [rx_socket]
cmd_rx = ('%(cmd)s%(cluster)s -p %(stream)d -u %(port)d -K %(key)s -R %(rcv_buf_size)d -i %(link_id)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_rx'),
cluster=' -a %d' % (cfg.udp_port_auto,) if is_cluster else '',
stream=cfg.stream_rx,
port=rx_socket.getHost().port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
rcv_buf_size=settings.common.tx_rcv_buf_size,
link_id=link_id)).split() + (wlans if not is_cluster else [])
log.msg('%s RX: %s' % (service_name, ' '.join(cmd_rx)))
dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start())
if cfg.stream_tx is not None:
cmd_tx = ('%(cmd)s%(cluster)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\
'-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d'\
'%(mirror)s%(force_vht)s%(qdisc)s '\
'-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -F %(fec_delay)d -i %(link_id)d -R %(rcv_buf_size)d -C %(control_port)d' % \
dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'),
cluster=' -d' if is_cluster else '',
frame_type=cfg.frame_type,
stream=cfg.stream_tx,
port=0,
control_port=cfg.control_port,
key=os.path.join(settings.path.conf_dir, cfg.keypair),
bw=cfg.bandwidth,
force_vht=' -V' if cfg.force_vht else '',
qdisc=' -Q -P %d' % (cfg.fwmark,) if cfg.use_qdisc else '',
gi="short" if cfg.short_gi else "long",
stbc=cfg.stbc,
ldpc=cfg.ldpc,
mcs=cfg.mcs_index,
mirror=' -m' if cfg.mirror else '',
fec_k=cfg.fec_k,
fec_n=cfg.fec_n,
fec_timeout=cfg.fec_timeout,
fec_delay=cfg.fec_delay,
link_id=link_id,
rcv_buf_size=settings.common.tx_rcv_buf_size)).split() + wlans
log.msg('%s TX: %s' % (service_name, ' '.join(cmd_tx)))
tx_ports_df = defer.Deferred()
control_port_df = defer.Deferred() if cfg.control_port == 0 else None
dl += [TXProtocol(ant_sel_f, cmd_tx, '%s tx' % (service_name,), tx_ports_df, control_port_df).start()]
# Wait while wfb_tx allocates ephemeral udp ports and reports them back
tx_ports = yield tx_ports_df
control_port = cfg.control_port
if control_port == 0:
control_port = yield control_port_df
log.msg('%s use wfb_tx ports %s, control_port %d' % (service_name, tx_ports, control_port))
p_tx_map = dict((wlan_id, UDPProxyProtocol(('127.0.0.1', port))) for wlan_id, port in tx_ports.items())
sockets += [ reactor.listenUDP(0, p_tx) for p_tx in p_tx_map.values() ]
def ant_sel_cb(wlan_id):
p_in.peer = p_tx_map[wlan_id] \
if wlan_id is not None \
else list(p_tx_map.values())[0]
ant_sel_f.add_ant_sel_cb(ant_sel_cb)
def _cleanup(x):
for s in sockets:
s.stopListening()
return x
yield defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup)\
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)

View File

@ -12,7 +12,9 @@ from twisted.internet import reactor, defer
from twisted.internet.protocol import DatagramProtocol
from ..common import df_sleep
from ..server import RXProtocol, TXProtocol, call_and_check_rc
from ..protocols import RXProtocol, TXProtocol
from .. import call_and_check_rc
class UDP_TXRX(DatagramProtocol):
def __init__(self, tx_addr):