diff --git a/src/tx.cpp b/src/tx.cpp index 89b246f..09dbd2f 100644 --- a/src/tx.cpp +++ b/src/tx.cpp @@ -62,7 +62,7 @@ Transmitter::Transmitter(int k, int n, const string &keypair, uint64_t epoch, ui } FILE *fp; - if((fp = fopen(keypair.c_str(), "r")) == NULL) + if ((fp = fopen(keypair.c_str(), "r")) == NULL) { throw runtime_error(string_format("Unable to open %s: %s", keypair.c_str(), strerror(errno))); } @@ -179,9 +179,25 @@ void PcapTransmitter::inject_packet(const uint8_t *buf, size_t size) memcpy(p, buf, size); p += size; - if (pcap_inject(ppcap[current_output], txbuf, p - txbuf) != p - txbuf) + if (current_output >= 0) { - throw runtime_error(string_format("Unable to inject packet")); + // Normal mode + if (pcap_inject(ppcap[current_output], txbuf, p - txbuf) != p - txbuf) + { + throw runtime_error(string_format("Unable to inject packet")); + } + } + else + { + // Mirror mode - transmit packet via all cards + // Use only for different frequency channels + for(auto it=ppcap.begin(); it != ppcap.end(); it++) + { + if (pcap_inject(*it, txbuf, p - txbuf) != p - txbuf) + { + throw runtime_error(string_format("Unable to inject packet")); + } + } } } @@ -228,7 +244,7 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size, uint8_t flags) assert(size <= MAX_PAYLOAD_SIZE); // FEC-only packets are only for closing already opened blocks - if(fragment_idx == 0 && flags & WFB_PACKET_FEC_ONLY) + if (fragment_idx == 0 && flags & WFB_PACKET_FEC_ONLY) { return; } @@ -279,7 +295,7 @@ uint32_t extract_rxq_overflow(struct msghdr *msg) return 0; } -void data_source(shared_ptr &t, vector &rx_fd, int poll_timeout) +void data_source(shared_ptr &t, vector &rx_fd, int poll_timeout, bool mirror) { int nfds = rx_fd.size(); struct pollfd fds[nfds]; @@ -289,7 +305,7 @@ void data_source(shared_ptr &t, vector &rx_fd, int poll_timeou for(auto it=rx_fd.begin(); it != rx_fd.end(); it++, i++) { int fd = *it; - if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0) + if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0) { throw runtime_error(string_format("Unable to set socket into nonblocked mode: %s", strerror(errno))); } @@ -332,7 +348,7 @@ void data_source(shared_ptr &t, vector &rx_fd, int poll_timeou int fd = rx_fd[i]; - t->select_output(i); + t->select_output(mirror ? -1 : i); for(;;) { @@ -376,7 +392,7 @@ void data_source(shared_ptr &t, vector &rx_fd, int poll_timeou } t->send_packet(buf, rsize, 0); } - if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); + if (errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); } } } @@ -399,10 +415,10 @@ int main(int argc, char * const *argv) int debug_port = 0; int poll_timeout = 0; int rcv_buf = 0; - + bool mirror = false; string keypair = "tx.key"; - while ((opt = getopt(argc, argv, "K:k:n:u:p:B:G:S:L:M:D:T:i:e:R:f:")) != -1) { + while ((opt = getopt(argc, argv, "K:k:n:u:p:B:G:S:L:M:D:T:i:e:R:f:m")) != -1) { switch (opt) { case 'K': keypair = optarg; @@ -449,6 +465,9 @@ int main(int argc, char * const *argv) case 'e': epoch = atoll(optarg); break; + case 'm': + mirror = true; + break; case 'f': if (strcmp(optarg, "data") == 0) { @@ -468,9 +487,9 @@ int main(int argc, char * const *argv) break; default: /* '?' */ show_usage: - fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-T poll_timeout] [-e epoch] [-i link_id] [-f { data | rts }] interface1 [interface2] ...\n", + fprintf(stderr, "Usage: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-T poll_timeout] [-e epoch] [-i link_id] [-f { data | rts }] [ -m ] interface1 [interface2] ...\n", argv[0]); - fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d, poll_timeout=%d, rcv_buf=system_default, frame_type=data\n", + fprintf(stderr, "Default: K='%s', k=%d, n=%d, udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d, poll_timeout=%d, rcv_buf=system_default, frame_type=data, mirror=false\n", keypair.c_str(), k, n, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, poll_timeout); fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE); fprintf(stderr, "WFB-ng version " WFB_VERSION "\n"); @@ -498,7 +517,7 @@ int main(int argc, char * const *argv) exit(1); } - if(short_gi) + if (short_gi) { flags |= IEEE80211_RADIOTAP_MCS_SGI; } @@ -520,7 +539,7 @@ int main(int argc, char * const *argv) exit(1); } - if(ldpc) + if (ldpc) { flags |= IEEE80211_RADIOTAP_MCS_FEC_LDPC; } @@ -564,7 +583,7 @@ int main(int argc, char * const *argv) struct sockaddr_in saddr; socklen_t saddr_size = sizeof(saddr); - if(getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) + if (getsockname(fd, (struct sockaddr *)&saddr, &saddr_size) != 0) { throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno))); } @@ -576,7 +595,7 @@ int main(int argc, char * const *argv) wlans.push_back(string(argv[optind + i])); } - if(udp_port == 0) + if (udp_port == 0) { printf("LISTEN_UDP_END\n"); fflush(stdout); @@ -586,7 +605,7 @@ int main(int argc, char * const *argv) uint32_t channel_id = (link_id << 8) + radio_port; - if(debug_port) + if (debug_port) { fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port); t = shared_ptr(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id)); @@ -594,7 +613,7 @@ int main(int argc, char * const *argv) t = shared_ptr(new PcapTransmitter(k, n, keypair, epoch, channel_id, wlans)); } - data_source(t, rx_fd, poll_timeout); + data_source(t, rx_fd, poll_timeout, mirror); }catch(runtime_error &e) { fprintf(stderr, "Error: %s\n", e.what()); diff --git a/src/tx.hpp b/src/tx.hpp index b12571c..7ced3ed 100644 --- a/src/tx.hpp +++ b/src/tx.hpp @@ -100,6 +100,7 @@ public: virtual void select_output(int idx) { + assert(idx >= 0); saddr.sin_port = htons((unsigned short)(base_port + idx)); } diff --git a/wfb_ng/conf/master.cfg b/wfb_ng/conf/master.cfg index 8e9f8e4..208752a 100644 --- a/wfb_ng/conf/master.cfg +++ b/wfb_ng/conf/master.cfg @@ -113,6 +113,7 @@ stream_rx = None stream_tx = None keypair = None show_stats = True +mirror = False # Set to true if you want to mirror packet via all cards for redundancy. Not recommended if cards are on one frequency channel. # Radio settings for TX and RX bandwidth = 20 # bandwidth 20 or 40 MHz diff --git a/wfb_ng/server.py b/wfb_ng/server.py index 0c27a83..a8bf3b2 100644 --- a/wfb_ng/server.py +++ b/wfb_ng/server.py @@ -444,6 +444,7 @@ def init_udp_direct_tx(service_name, cfg, wlans, link_id, ant_sel_f): cmd = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s '\ '-B %(bw)d -G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d '\ + '%(mirror)s'\ '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -i %(link_id)d -R %(rcv_buf_size)d' % \ dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), frame_type=cfg.frame_type, @@ -455,6 +456,7 @@ def init_udp_direct_tx(service_name, cfg, wlans, link_id, ant_sel_f): stbc=cfg.stbc, ldpc=cfg.ldpc, mcs=cfg.mcs_index, + mirror='-m ' if cfg.mirror else '', fec_k=cfg.fec_k, fec_n=cfg.fec_n, fec_timeout=cfg.fec_timeout, @@ -560,6 +562,7 @@ def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f): cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d '\ + '%(mirror)s'\ '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -i %(link_id)d -R %(rcv_buf_size)d' % \ dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), frame_type=cfg.frame_type, @@ -571,6 +574,7 @@ def init_mavlink(service_name, cfg, wlans, link_id, ant_sel_f): stbc=cfg.stbc, ldpc=cfg.ldpc, mcs=cfg.mcs_index, + mirror='-m ' if cfg.mirror else '', fec_k=cfg.fec_k, fec_n=cfg.fec_n, fec_timeout=cfg.fec_timeout, @@ -649,6 +653,7 @@ def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f): cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d -G %(gi)s '\ '-S %(stbc)d -L %(ldpc)d -M %(mcs)d '\ + '%(mirror)s'\ '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -i %(link_id)d -R %(rcv_buf_size)d' % \ dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), frame_type=cfg.frame_type, @@ -660,6 +665,7 @@ def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f): stbc=cfg.stbc, ldpc=cfg.ldpc, mcs=cfg.mcs_index, + mirror='-m ' if cfg.mirror else '', fec_k=cfg.fec_k, fec_n=cfg.fec_n, fec_timeout=cfg.fec_timeout, @@ -687,8 +693,13 @@ def init_tunnel(service_name, cfg, wlans, link_id, ant_sel_f): # Broadcast keepalive message to all cards, not to active one # This allow to use direct antennas on both ends and/or differenct frequencies. + # But when mirroring enabled it will be done by wfb_tx itself + + if cfg.mirror: + p_in.all_peers = [p_tx_l[0]] + else: + p_in.all_peers = p_tx_l - p_in.all_peers = p_tx_l ant_sel_f.add_ant_sel_cb(ant_sel_cb) dl.append(RXProtocol(ant_sel_f, cmd_rx, '%s rx' % (service_name,)).start()) @@ -743,6 +754,7 @@ def init_udp_proxy(service_name, cfg, wlans, link_id, ant_sel_f): if cfg.stream_tx is not None: cmd_tx = ('%(cmd)s -f %(frame_type)s -p %(stream)d -u %(port)d -K %(key)s -B %(bw)d '\ '-G %(gi)s -S %(stbc)d -L %(ldpc)d -M %(mcs)d '\ + '%(mirror)s'\ '-k %(fec_k)d -n %(fec_n)d -T %(fec_timeout)d -i %(link_id)d -R %(rcv_buf_size)d' % \ dict(cmd=os.path.join(settings.path.bin_dir, 'wfb_tx'), frame_type=cfg.frame_type, @@ -754,6 +766,7 @@ def init_udp_proxy(service_name, cfg, wlans, link_id, ant_sel_f): stbc=cfg.stbc, ldpc=cfg.ldpc, mcs=cfg.mcs_index, + mirror='-m ' if cfg.mirror else '', fec_k=cfg.fec_k, fec_n=cfg.fec_n, fec_timeout=cfg.fec_timeout,