From 9d6ebbfb19a98a248854ffe545852dfec17c2c49 Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Thu, 28 Dec 2017 12:06:28 +0300 Subject: [PATCH] 1. Add link quality info to RX packets 2. Add telemetry logs to RX 3. Use monotonic clock for timeouts --- Makefile | 2 +- rx.cpp | 433 ++++++++++++++++++++++++++++++++-------------- rx.hpp | 49 +++++- tx.cpp | 21 +-- tx.hpp | 19 +- wifibroadcast.cpp | 8 + wifibroadcast.hpp | 6 + 7 files changed, 386 insertions(+), 152 deletions(-) diff --git a/Makefile b/Makefile index 6208a86..c7fd331 100644 --- a/Makefile +++ b/Makefile @@ -26,5 +26,5 @@ build_rpi: clean tar czf dist/wifibroadcast_rpi.tar.gz tx rx keygen -C scripts tx_standalone.sh rx_standalone.sh clean: - rm -f rx tx keygen dist *~ *.o + rm -rf rx tx keygen dist *~ *.o diff --git a/rx.cpp b/rx.cpp index 60490cb..54d1f1c 100644 --- a/rx.cpp +++ b/rx.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -44,7 +45,7 @@ extern "C" #include "rx.hpp" -Receiver::Receiver(const char *wlan, int radio_port, BaseAggregator *agg) : agg(agg) +Receiver::Receiver(const char *wlan, int wlan_idx, int radio_port, BaseAggregator *agg) : wlan_idx(wlan_idx), agg(agg) { char errbuf[PCAP_ERRBUF_SIZE]; @@ -104,92 +105,98 @@ Receiver::~Receiver() void Receiver::loop_iter(void) { - struct pcap_pkthdr hdr; - const uint8_t* pkt = pcap_next(ppcap, &hdr); + for(;;) // loop while incoming queue is not empty + { + struct pcap_pkthdr hdr; + const uint8_t* pkt = pcap_next(ppcap, &hdr); - if (pkt == NULL) { - return; - } - - int pktlen = hdr.caplen; - int pkt_rate = 0, antenna = 0, pwr = 0; - uint8_t flags = 0; - struct ieee80211_radiotap_iterator iterator; - int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL); - - while (ret == 0) { - ret = ieee80211_radiotap_iterator_next(&iterator); - - if (ret) - continue; - - /* see if this argument is something we can use */ - - switch (iterator.this_arg_index) - { - /* - * You must take care when dereferencing iterator.this_arg - * for multibyte types... the pointer is not aligned. Use - * get_unaligned((type *)iterator.this_arg) to dereference - * iterator.this_arg for type "type" safely on all arches. - */ - case IEEE80211_RADIOTAP_RATE: - /* radiotap "rate" u8 is in - * 500kbps units, eg, 0x02=1Mbps - */ - pkt_rate = (*(uint8_t*)(iterator.this_arg))/2; - break; - - case IEEE80211_RADIOTAP_ANTENNA: - antenna = *(uint8_t*)(iterator.this_arg); - break; - - case IEEE80211_RADIOTAP_DBM_ANTSIGNAL: - pwr = *(int8_t*)(iterator.this_arg); - break; - - case IEEE80211_RADIOTAP_FLAGS: - flags = *(uint8_t*)(iterator.this_arg); - break; - - default: + if (pkt == NULL) { break; } - } /* while more rt headers */ - if (ret != -ENOENT){ - fprintf(stderr, "Error parsing radiotap header!\n"); - return; - } + int pktlen = hdr.caplen; + // int pkt_rate = 0 + uint8_t antenna = 0; + uint8_t rssi = 0; + uint8_t flags = 0; + struct ieee80211_radiotap_iterator iterator; + int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL); - if (flags & IEEE80211_RADIOTAP_F_FCS) - { - pktlen -= 4; - } + while (ret == 0) { + ret = ieee80211_radiotap_iterator_next(&iterator); - if (flags & IEEE80211_RADIOTAP_F_BADFCS) - { - fprintf(stderr, "Got packet with bad fsc\n"); - return; - } + if (ret) + continue; - /* discard the radiotap header part */ - pkt += iterator._max_length; - pktlen -= iterator._max_length; + /* see if this argument is something we can use */ - //printf("%d mbit/s ant %d %ddBm size:%d\n", pkt_rate, antenna, pwr, pktlen); + switch (iterator.this_arg_index) + { + /* + * You must take care when dereferencing iterator.this_arg + * for multibyte types... the pointer is not aligned. Use + * get_unaligned((type *)iterator.this_arg) to dereference + * iterator.this_arg for type "type" safely on all arches. + */ - if (pktlen > sizeof(ieee80211_header)) - { - agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header)); - } else { - fprintf(stderr, "short packet (ieee header)\n"); - return; + // case IEEE80211_RADIOTAP_RATE: + // /* radiotap "rate" u8 is in + // * 500kbps units, eg, 0x02=1Mbps + // */ + // pkt_rate = (*(uint8_t*)(iterator.this_arg))/2; + // break; + + case IEEE80211_RADIOTAP_ANTENNA: + antenna = *(uint8_t*)(iterator.this_arg); + break; + + case IEEE80211_RADIOTAP_DBM_ANTSIGNAL: + rssi = *(int8_t*)(iterator.this_arg); + break; + + case IEEE80211_RADIOTAP_FLAGS: + flags = *(uint8_t*)(iterator.this_arg); + break; + + default: + break; + } + } /* while more rt headers */ + + if (ret != -ENOENT){ + fprintf(stderr, "Error parsing radiotap header!\n"); + continue; + } + + if (flags & IEEE80211_RADIOTAP_F_FCS) + { + pktlen -= 4; + } + + if (flags & IEEE80211_RADIOTAP_F_BADFCS) + { + fprintf(stderr, "Got packet with bad fsc\n"); + continue; + } + + /* discard the radiotap header part */ + pkt += iterator._max_length; + pktlen -= iterator._max_length; + + if (pktlen > sizeof(ieee80211_header)) + { + agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, NULL); + } else { + fprintf(stderr, "short packet (ieee header)\n"); + continue; + } } } -Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1) +Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1), + count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0), + count_p_lost(0), count_p_bad(0) { sockfd = open_udp_socket_for_tx(client_addr, client_port); fec_p = fec_new(fec_k, fec_n); @@ -242,9 +249,26 @@ Forwarder::Forwarder(const string &client_addr, int client_port) } -void Forwarder::process_packet(const uint8_t *buf, size_t size) +void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr) { - send(sockfd, buf, size, 0); + wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx, + .antenna = antenna, + .rssi = rssi }; + + struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr, + .iov_len = sizeof(fwd_hdr)}, + { .iov_base = (void*)buf, + .iov_len = size }}; + + struct msghdr msghdr = { .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iov, + .msg_iovlen = 2, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0}; + + sendmsg(sockfd, &msghdr, 0); } @@ -273,7 +297,7 @@ int Aggregator::rx_ring_push(void) 2. Reduce packet injection speed or try to unify RX hardware. */ - fprintf(stderr, "override block 0x%Lx with %d fragments\n", (long long unsigned int)(rx_ring[idx].block_idx), rx_ring[idx].has_fragments); + fprintf(stderr, "override block 0x%" PRIx64 " with %d fragments\n", rx_ring[idx].block_idx, rx_ring[idx].has_fragments); rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE); return idx; @@ -311,15 +335,53 @@ int Aggregator::get_block_ring_idx(uint64_t block_idx) return ring_idx; } +void Aggregator::dump_stats(FILE *fp) +{ + //timestamp in ms + uint64_t ts = get_time_ms(); -void Aggregator::process_packet(const uint8_t *buf, size_t size) + for(antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++) + { + fprintf(fp, "%" PRIu64 "\tANT\t%" PRIx64 "\t%d:%d:%d:%d\n", ts, it->first, it->second.count_all, it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max); + } + antenna_stat.clear(); + + fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad); + count_p_all = 0; + count_p_dec_err = 0; + count_p_dec_ok = 0; + count_p_fec_recovered = 0; + count_p_lost = 0; + count_p_bad = 0; +} + + +void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, uint8_t ant, uint8_t rssi) +{ + // key: addr + port + wlan_idx + ant + uint64_t key = 0; + if (sockaddr != NULL && sockaddr->sin_family == AF_INET) + { + key = ((uint64_t)ntohl(sockaddr->sin_addr.s_addr) << 32 | (uint64_t)ntohs(sockaddr->sin_port) << 16); + } + + key |= ((uint64_t)wlan_idx << 8 | (uint64_t)ant); + + antenna_stat[key].log_rssi(rssi); +} + + +void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr) { uint8_t new_session_key[sizeof(session_key)]; + count_p_all += 1; + + if(size == 0) return; - if(size == 0) return; if (size > MAX_FORWARDER_PACKET_SIZE) { fprintf(stderr, "long packet (fec payload)\n"); + count_p_bad += 1; return; } @@ -329,6 +391,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) if(size < sizeof(wblock_hdr_t) + sizeof(wpacket_hdr_t)) { fprintf(stderr, "short packet (fec header)\n"); + count_p_bad += 1; return; } break; @@ -337,6 +400,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) if(size != sizeof(wsession_key_t)) { fprintf(stderr, "invalid session key packet\n"); + count_p_bad += 1; return; } @@ -346,9 +410,12 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) tx_publickey, rx_secretkey) != 0) { fprintf(stderr, "unable to decrypt session key\n"); + count_p_dec_err += 1; return; } + count_p_dec_ok += 1; + if (memcmp(session_key, new_session_key, sizeof(session_key)) != 0) { fprintf(stderr, "New session detected\n"); @@ -370,6 +437,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) default: fprintf(stderr, "Unknown packet type 0x%x\n", buf[0]); + count_p_bad += 1; return; } @@ -384,10 +452,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) sizeof(wblock_hdr_t), (uint8_t*)(&(block_hdr->nonce)), session_key) != 0) { - fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(be64toh(block_hdr->nonce))); + fprintf(stderr, "unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->nonce)); + count_p_dec_err += 1; return; } + count_p_dec_ok += 1; + log_rssi(sockaddr, wlan_idx, antenna, rssi); + assert(decrypted_len <= MAX_FEC_PAYLOAD); uint64_t block_idx = be64toh(block_hdr->nonce) >> 8; @@ -397,12 +469,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) if (block_idx > MAX_BLOCK_IDX) { fprintf(stderr, "block_idx overflow\n"); + count_p_bad += 1; return; } if (fragment_idx >= fec_n) { fprintf(stderr, "invalid fragment_idx: %d\n", fragment_idx); + count_p_bad += 1; return; } @@ -441,6 +515,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size) apply_fec(ring_idx); while(p->send_fragment_idx < fec_k) { + count_p_fec_recovered += 1; send_packet(ring_idx, p->send_fragment_idx); p->send_fragment_idx += 1; } @@ -468,6 +543,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx) if (packet_seq > seq + 1) { fprintf(stderr, "%u packets lost\n", packet_seq - seq - 1); + count_p_lost += (packet_seq - seq - 1); } seq = packet_seq; @@ -475,6 +551,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx) if(packet_size > MAX_PAYLOAD_SIZE) { fprintf(stderr, "corrupted packet %u\n", seq); + count_p_bad += 1; }else{ send(sockfd, payload, packet_size, 0); } @@ -512,17 +589,150 @@ void Aggregator::apply_fec(int ring_idx) fec_decode(fec_p, (const uint8_t**)in_blocks, out_blocks, index, MAX_FEC_PAYLOAD); } +void radio_loop(int argc, char* const *argv, int optind, int radio_port, shared_ptr agg, int log_interval) +{ + int nfds = min(argc - optind, MAX_RX_INTERFACES); + uint64_t log_send_ts = 0; + struct pollfd fds[MAX_RX_INTERFACES]; + Receiver* rx[MAX_RX_INTERFACES]; + + memset(fds, '\0', sizeof(fds)); + + for(int i = 0; i < nfds; i++) + { + rx[i] = new Receiver(argv[optind + i], i, radio_port, agg.get()); + fds[i].fd = rx[i]->getfd(); + fds[i].events = POLLIN; + } + + for(;;) + { + uint64_t cur_ts = get_time_ms(); + int rc = poll(fds, nfds, log_send_ts > cur_ts ? log_send_ts - cur_ts : 0); + + if (rc < 0){ + if (errno == EINTR || errno == EAGAIN) continue; + throw runtime_error(string_format("Poll error: %s", strerror(errno))); + } + + cur_ts = get_time_ms(); + + if (cur_ts >= log_send_ts) + { + agg->dump_stats(stdout); + log_send_ts = get_time_ms() + log_interval; + } + + if (rc == 0) continue; // timeout expired + + for(int i = 0; rc > 0 && i < nfds; i++) + { + if (fds[i].revents & (POLLERR|POLLNVAL)) + { + throw runtime_error("socket error!"); + } + if (fds[i].revents & POLLIN){ + rx[i]->loop_iter(); + rc -= 1; + } + } + } +} + +void network_loop(int srv_port, Aggregator &agg, int log_interval) +{ + wrxfwd_t fwd_hdr; + struct sockaddr_in sockaddr; + uint8_t buf[MAX_FORWARDER_PACKET_SIZE]; + + uint64_t log_send_ts = 0; + struct pollfd fds[1]; + int fd = open_udp_socket_for_rx(srv_port); + + if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0) + { + throw runtime_error(string_format("Unable to set socket into nonblocked mode: %s", strerror(errno))); + } + + memset(fds, '\0', sizeof(fds)); + fds[0].fd = fd; + fds[0].events = POLLIN; + + for(;;) + { + uint64_t cur_ts = get_time_ms(); + int rc = poll(fds, 1, log_send_ts > cur_ts ? log_send_ts - cur_ts : 0); + + if (rc < 0){ + if (errno == EINTR || errno == EAGAIN) continue; + throw runtime_error(string_format("poll error: %s", strerror(errno))); + } + + cur_ts = get_time_ms(); + + if (cur_ts >= log_send_ts) + { + agg.dump_stats(stdout); + log_send_ts = get_time_ms() + log_interval; + } + + if (rc == 0) continue; // timeout expired + + // some events detected + if (fds[0].revents & (POLLERR | POLLNVAL)) + { + throw runtime_error(string_format("socket error: %s", strerror(errno))); + } + + if (fds[0].revents & POLLIN) + { + for(;;) // process pending rx + { + memset((void*)&sockaddr, '\0', sizeof(sockaddr)); + + struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr, + .iov_len = sizeof(fwd_hdr)}, + { .iov_base = (void*)buf, + .iov_len = sizeof(buf) }}; + + struct msghdr msghdr = { .msg_name = (void*)&sockaddr, + .msg_namelen = sizeof(sockaddr), + .msg_iov = iov, + .msg_iovlen = 2, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0}; + + ssize_t rsize = recvmsg(fd, &msghdr, 0); + if (rsize < 0) + { + break; + } + + if (rsize < sizeof(wrxfwd_t)) + { + fprintf(stderr, "short packet (rx fwd header)\n"); + continue; + } + agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, fwd_hdr.rssi, &sockaddr); + } + if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); + } + } +} + int main(int argc, char* const *argv) { int opt; - uint8_t k=8, n=12, radio_port=1; - int client_port=5600; - int srv_port=0; - string client_addr="127.0.0.1"; + uint8_t k = 8, n = 12, radio_port = 1; + int log_interval = 1000; + int client_port = 5600; + int srv_port = 0; + string client_addr = "127.0.0.1"; rx_mode_t rx_mode = LOCAL; string keypair = "rx.key"; - while ((opt = getopt(argc, argv, "K:fa:k:n:c:u:p:")) != -1) { + while ((opt = getopt(argc, argv, "K:fa:k:n:c:u:p:l:")) != -1) { switch (opt) { case 'K': keypair = optarg; @@ -549,12 +759,15 @@ int main(int argc, char* const *argv) case 'p': radio_port = atoi(optarg); break; + case 'l': + log_interval = atoi(optarg); + break; default: /* '?' */ show_usage: - fprintf(stderr, "Local receiver: %s [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-p radio_port] interface1 [interface2] ...\n", argv[0]); + fprintf(stderr, "Local receiver: %s [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] interface1 [interface2] ...\n", argv[0]); fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] interface1 [interface2] ...\n", argv[0]); - fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port]\n", argv[0]); - fprintf(stderr, "Default: K='%s', k=%d, n=%d, connect=%s:%d, radio_port=%d\n", keypair.c_str(), k, n, client_addr.c_str(), client_port, radio_port); + fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-l log_interval]\n", argv[0]); + fprintf(stderr, "Default: K='%s', k=%d, n=%d, connect=%s:%d, radio_port=%d, log_interval=%d\n", keypair.c_str(), k, n, client_addr.c_str(), client_port, radio_port, log_interval); exit(1); } } @@ -565,10 +778,6 @@ int main(int argc, char* const *argv) { if (optind >= argc) goto show_usage; - int nfds = min(argc - optind, MAX_RX_INTERFACES); - struct pollfd fds[MAX_RX_INTERFACES]; - Receiver* rx[MAX_RX_INTERFACES]; - shared_ptr agg; if(rx_mode == LOCAL){ agg = shared_ptr(new Aggregator(client_addr, client_port, k, n, keypair)); @@ -576,49 +785,13 @@ int main(int argc, char* const *argv) agg = shared_ptr(new Forwarder(client_addr, client_port)); } - memset(fds, '\0', sizeof(fds)); - - for(int i = 0; i < nfds; i++) - { - rx[i] = new Receiver(argv[optind + i], radio_port, agg.get()); - fds[i].fd = rx[i]->getfd(); - fds[i].events = POLLIN; - } - - while(1) - { - int rc = poll(fds, nfds, 1000); - if (rc < 0){ - if (errno == EINTR || errno == EAGAIN) continue; - throw runtime_error(string_format("Poll error: %s", strerror(errno))); - } - - for(int i = 0; rc > 0 && i < nfds; i++) - { - if (fds[i].revents & POLLERR) - { - throw runtime_error("socket error!"); - } - if (fds[i].revents & POLLIN){ - rx[i]->loop_iter(); - rc -= 1; - } - } - } + radio_loop(argc, argv, optind, radio_port, agg, log_interval); }else if(rx_mode == AGGREGATOR) { if (optind > argc) goto show_usage; - - uint8_t buf[MAX_FORWARDER_PACKET_SIZE]; - int fd = open_udp_socket_for_rx(srv_port); Aggregator agg(client_addr, client_port, k, n, keypair); - for(;;) - { - ssize_t rsize = recv(fd, buf, sizeof(buf), 0); - if (rsize < 0) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); - agg.process_packet(buf, rsize); - } + network_loop(srv_port, agg, log_interval); }else{ throw runtime_error(string_format("Unknown rx_mode=%d", rx_mode)); } diff --git a/rx.hpp b/rx.hpp index 66e4bf2..6d57d11 100644 --- a/rx.hpp +++ b/rx.hpp @@ -17,6 +17,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include typedef enum { LOCAL, @@ -27,8 +28,8 @@ typedef enum { class BaseAggregator { public: - virtual void process_packet(const uint8_t *buf, size_t size) = 0; - + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr) = 0; + virtual void dump_stats(FILE *fp) = 0; protected: int open_udp_socket_for_tx(const string &client_addr, int client_port) { @@ -55,8 +56,8 @@ class Forwarder : public BaseAggregator public: Forwarder(const string &client_addr, int client_port); ~Forwarder(); - virtual void process_packet(const uint8_t *buf, size_t size); - + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr); + virtual void dump_stats(FILE *fp) {} private: int sockfd; }; @@ -78,15 +79,42 @@ static inline int modN(int x, int base) return (base + (x % base)) % base; } +class antennaItem +{ +public: + antennaItem(void) : count_all(0), rssi_sum(0), rssi_min(0), rssi_max(0) {} + + void log_rssi(uint8_t rssi){ + if(count_all == 0){ + rssi_min = rssi; + rssi_max = rssi; + } else { + rssi_min = min(rssi, rssi_min); + rssi_max = max(rssi, rssi_max); + } + rssi_sum += rssi; + count_all += 1; + } + + uint32_t count_all; + uint32_t rssi_sum; + uint8_t rssi_min; + uint8_t rssi_max; +}; + +typedef std::unordered_map antenna_stat_t; + class Aggregator : public BaseAggregator { public: Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair); ~Aggregator(); - virtual void process_packet(const uint8_t *buf, size_t size); + virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr); + virtual void dump_stats(FILE *fp); private: void send_packet(int ring_idx, int fragment_idx); void apply_fec(int ring_idx); + void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, uint8_t ant, uint8_t rssi); int get_block_ring_idx(uint64_t block_idx); int rx_ring_push(void); fec_t* fec_p; @@ -103,16 +131,25 @@ private: uint8_t rx_secretkey[crypto_box_SECRETKEYBYTES]; uint8_t tx_publickey[crypto_box_PUBLICKEYBYTES]; uint8_t session_key[crypto_aead_chacha20poly1305_KEYBYTES]; + + antenna_stat_t antenna_stat; + uint32_t count_p_all; + uint32_t count_p_dec_err; + uint32_t count_p_dec_ok; + uint32_t count_p_fec_recovered; + uint32_t count_p_lost; + uint32_t count_p_bad; }; class Receiver { public: - Receiver(const char* wlan, int port, BaseAggregator* agg); + Receiver(const char* wlan, int wlan_idx, int port, BaseAggregator* agg); ~Receiver(); void loop_iter(void); int getfd(void){ return fd; } private: + int wlan_idx; BaseAggregator *agg; int fd; pcap_t *ppcap; diff --git a/tx.cpp b/tx.cpp index d11f849..d303ddb 100644 --- a/tx.cpp +++ b/tx.cpp @@ -204,13 +204,6 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size) } } -uint64_t get_system_time(void) // in milliseconds -{ - struct timeval te; - gettimeofday(&te, NULL); - return te.tv_sec * 1000LL + te.tv_usec / 1000; -} - void video_source(Transmitter *t, int fd) { uint8_t buf[MAX_PAYLOAD_SIZE]; @@ -219,7 +212,7 @@ void video_source(Transmitter *t, int fd) { ssize_t rsize = recv(fd, buf, sizeof(buf), 0); if (rsize < 0) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno))); - uint64_t cur_ts = get_system_time(); + uint64_t cur_ts = get_time_ms(); if (cur_ts >= session_key_announce_ts) { // Announce session key @@ -246,12 +239,12 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency) size_t agg_size = 0; uint8_t agg_buf[MAX_PAYLOAD_SIZE]; - uint64_t expire_ts = get_system_time() + agg_latency; + uint64_t expire_ts = get_time_ms() + agg_latency; uint64_t session_key_announce_ts = 0; for(;;) { - uint64_t cur_ts = get_system_time(); + uint64_t cur_ts = get_time_ms(); int rc = poll(fds, 1, expire_ts > cur_ts ? expire_ts - cur_ts : 0); if (rc < 0){ @@ -263,7 +256,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency) { if(agg_size > 0) { - cur_ts = get_system_time(); + cur_ts = get_time_ms(); if (cur_ts >= session_key_announce_ts) { // Announce session key @@ -273,7 +266,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency) t->send_packet(agg_buf, agg_size); agg_size = 0; } - expire_ts = get_system_time() + agg_latency; + expire_ts = get_time_ms() + agg_latency; continue; } @@ -293,7 +286,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency) { if(agg_size > 0) { - cur_ts = get_system_time(); + cur_ts = get_time_ms(); if (cur_ts >= session_key_announce_ts) { // Announce session key @@ -303,7 +296,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency) t->send_packet(agg_buf, agg_size); agg_size = 0; } - expire_ts = get_system_time() + agg_latency; + expire_ts = get_time_ms() + agg_latency; } memcpy(agg_buf + agg_size, buf, rsize); agg_size += rsize; diff --git a/tx.hpp b/tx.hpp index fa9bd80..09a4503 100644 --- a/tx.hpp +++ b/tx.hpp @@ -85,7 +85,24 @@ public: private: virtual void inject_packet(const uint8_t *buf, size_t size) { - send(sockfd, buf, size, 0); + wrxfwd_t fwd_hdr = { .wlan_idx = (uint8_t)(rand() % 2), + .antenna = (uint8_t)(rand() % 2), + .rssi = (uint8_t)(rand() & 0xff) }; + + struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr, + .iov_len = sizeof(fwd_hdr)}, + { .iov_base = (void*)buf, + .iov_len = size }}; + + struct msghdr msghdr = { .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = iov, + .msg_iovlen = 2, + .msg_control = NULL, + .msg_controllen = 0, + .msg_flags = 0}; + + sendmsg(sockfd, &msghdr, 0); } int open_udp_socket(const string &client_addr, int client_port) diff --git a/wifibroadcast.cpp b/wifibroadcast.cpp index 7b3762c..6627aff 100644 --- a/wifibroadcast.cpp +++ b/wifibroadcast.cpp @@ -8,6 +8,14 @@ #include "wifibroadcast.hpp" +uint64_t get_time_ms(void) // in milliseconds +{ + struct timespec ts; + int rc = clock_gettime(CLOCK_MONOTONIC, &ts); + if (rc < 0) throw runtime_error(string_format("Error getting time: %s", strerror(errno))); + return ts.tv_sec * 1000LL + ts.tv_nsec / 1000000; +} + int open_udp_socket_for_rx(int port) { struct sockaddr_in saddr; diff --git a/wifibroadcast.hpp b/wifibroadcast.hpp index 0b848c6..973d3bc 100644 --- a/wifibroadcast.hpp +++ b/wifibroadcast.hpp @@ -113,6 +113,11 @@ static uint8_t ieee80211_header[] = { #define SESSION_KEY_ANNOUNCE_MSEC 1000 +typedef struct { + uint8_t wlan_idx; + uint8_t antenna; //RADIOTAP_ANTENNA + uint8_t rssi; //RADIOTAP_DBM_ANTSIGNAL +} __attribute__ ((packed)) wrxfwd_t; // Network packet headers. All numbers are in network (big endian) format // Encrypted packets can be either session key or data packet. @@ -143,5 +148,6 @@ typedef struct { #define MAX_FORWARDER_PACKET_SIZE (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header)) int open_udp_socket_for_rx(int port); +uint64_t get_time_ms(void); #endif