1. Add link quality info to RX packets

2. Add telemetry logs to RX
3. Use monotonic clock for timeouts
This commit is contained in:
Vasily Evseenko 2017-12-28 12:06:28 +03:00
parent d6781a3753
commit 9d6ebbfb19
7 changed files with 386 additions and 152 deletions

View File

@ -26,5 +26,5 @@ build_rpi: clean
tar czf dist/wifibroadcast_rpi.tar.gz tx rx keygen -C scripts tx_standalone.sh rx_standalone.sh
clean:
rm -f rx tx keygen dist *~ *.o
rm -rf rx tx keygen dist *~ *.o

433
rx.cpp
View File

@ -19,6 +19,7 @@
#include <assert.h>
#include <stdio.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@ -44,7 +45,7 @@ extern "C"
#include "rx.hpp"
Receiver::Receiver(const char *wlan, int radio_port, BaseAggregator *agg) : agg(agg)
Receiver::Receiver(const char *wlan, int wlan_idx, int radio_port, BaseAggregator *agg) : wlan_idx(wlan_idx), agg(agg)
{
char errbuf[PCAP_ERRBUF_SIZE];
@ -104,92 +105,98 @@ Receiver::~Receiver()
void Receiver::loop_iter(void)
{
struct pcap_pkthdr hdr;
const uint8_t* pkt = pcap_next(ppcap, &hdr);
for(;;) // loop while incoming queue is not empty
{
struct pcap_pkthdr hdr;
const uint8_t* pkt = pcap_next(ppcap, &hdr);
if (pkt == NULL) {
return;
}
int pktlen = hdr.caplen;
int pkt_rate = 0, antenna = 0, pwr = 0;
uint8_t flags = 0;
struct ieee80211_radiotap_iterator iterator;
int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL);
while (ret == 0) {
ret = ieee80211_radiotap_iterator_next(&iterator);
if (ret)
continue;
/* see if this argument is something we can use */
switch (iterator.this_arg_index)
{
/*
* You must take care when dereferencing iterator.this_arg
* for multibyte types... the pointer is not aligned. Use
* get_unaligned((type *)iterator.this_arg) to dereference
* iterator.this_arg for type "type" safely on all arches.
*/
case IEEE80211_RADIOTAP_RATE:
/* radiotap "rate" u8 is in
* 500kbps units, eg, 0x02=1Mbps
*/
pkt_rate = (*(uint8_t*)(iterator.this_arg))/2;
break;
case IEEE80211_RADIOTAP_ANTENNA:
antenna = *(uint8_t*)(iterator.this_arg);
break;
case IEEE80211_RADIOTAP_DBM_ANTSIGNAL:
pwr = *(int8_t*)(iterator.this_arg);
break;
case IEEE80211_RADIOTAP_FLAGS:
flags = *(uint8_t*)(iterator.this_arg);
break;
default:
if (pkt == NULL) {
break;
}
} /* while more rt headers */
if (ret != -ENOENT){
fprintf(stderr, "Error parsing radiotap header!\n");
return;
}
int pktlen = hdr.caplen;
// int pkt_rate = 0
uint8_t antenna = 0;
uint8_t rssi = 0;
uint8_t flags = 0;
struct ieee80211_radiotap_iterator iterator;
int ret = ieee80211_radiotap_iterator_init(&iterator, (ieee80211_radiotap_header*)pkt, pktlen, NULL);
if (flags & IEEE80211_RADIOTAP_F_FCS)
{
pktlen -= 4;
}
while (ret == 0) {
ret = ieee80211_radiotap_iterator_next(&iterator);
if (flags & IEEE80211_RADIOTAP_F_BADFCS)
{
fprintf(stderr, "Got packet with bad fsc\n");
return;
}
if (ret)
continue;
/* discard the radiotap header part */
pkt += iterator._max_length;
pktlen -= iterator._max_length;
/* see if this argument is something we can use */
//printf("%d mbit/s ant %d %ddBm size:%d\n", pkt_rate, antenna, pwr, pktlen);
switch (iterator.this_arg_index)
{
/*
* You must take care when dereferencing iterator.this_arg
* for multibyte types... the pointer is not aligned. Use
* get_unaligned((type *)iterator.this_arg) to dereference
* iterator.this_arg for type "type" safely on all arches.
*/
if (pktlen > sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header));
} else {
fprintf(stderr, "short packet (ieee header)\n");
return;
// case IEEE80211_RADIOTAP_RATE:
// /* radiotap "rate" u8 is in
// * 500kbps units, eg, 0x02=1Mbps
// */
// pkt_rate = (*(uint8_t*)(iterator.this_arg))/2;
// break;
case IEEE80211_RADIOTAP_ANTENNA:
antenna = *(uint8_t*)(iterator.this_arg);
break;
case IEEE80211_RADIOTAP_DBM_ANTSIGNAL:
rssi = *(int8_t*)(iterator.this_arg);
break;
case IEEE80211_RADIOTAP_FLAGS:
flags = *(uint8_t*)(iterator.this_arg);
break;
default:
break;
}
} /* while more rt headers */
if (ret != -ENOENT){
fprintf(stderr, "Error parsing radiotap header!\n");
continue;
}
if (flags & IEEE80211_RADIOTAP_F_FCS)
{
pktlen -= 4;
}
if (flags & IEEE80211_RADIOTAP_F_BADFCS)
{
fprintf(stderr, "Got packet with bad fsc\n");
continue;
}
/* discard the radiotap header part */
pkt += iterator._max_length;
pktlen -= iterator._max_length;
if (pktlen > sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header), wlan_idx, antenna, rssi, NULL);
} else {
fprintf(stderr, "short packet (ieee header)\n");
continue;
}
}
}
Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1)
Aggregator::Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair) : fec_k(k), fec_n(n), seq(0), rx_ring_front(0), rx_ring_alloc(0), last_known_block((uint64_t)-1),
count_p_all(0), count_p_dec_err(0), count_p_dec_ok(0), count_p_fec_recovered(0),
count_p_lost(0), count_p_bad(0)
{
sockfd = open_udp_socket_for_tx(client_addr, client_port);
fec_p = fec_new(fec_k, fec_n);
@ -242,9 +249,26 @@ Forwarder::Forwarder(const string &client_addr, int client_port)
}
void Forwarder::process_packet(const uint8_t *buf, size_t size)
void Forwarder::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr)
{
send(sockfd, buf, size, 0);
wrxfwd_t fwd_hdr = { .wlan_idx = wlan_idx,
.antenna = antenna,
.rssi = rssi };
struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr,
.iov_len = sizeof(fwd_hdr)},
{ .iov_base = (void*)buf,
.iov_len = size }};
struct msghdr msghdr = { .msg_name = NULL,
.msg_namelen = 0,
.msg_iov = iov,
.msg_iovlen = 2,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0};
sendmsg(sockfd, &msghdr, 0);
}
@ -273,7 +297,7 @@ int Aggregator::rx_ring_push(void)
2. Reduce packet injection speed or try to unify RX hardware.
*/
fprintf(stderr, "override block 0x%Lx with %d fragments\n", (long long unsigned int)(rx_ring[idx].block_idx), rx_ring[idx].has_fragments);
fprintf(stderr, "override block 0x%" PRIx64 " with %d fragments\n", rx_ring[idx].block_idx, rx_ring[idx].has_fragments);
rx_ring_front = modN(rx_ring_front + 1, RX_RING_SIZE);
return idx;
@ -311,15 +335,53 @@ int Aggregator::get_block_ring_idx(uint64_t block_idx)
return ring_idx;
}
void Aggregator::dump_stats(FILE *fp)
{
//timestamp in ms
uint64_t ts = get_time_ms();
void Aggregator::process_packet(const uint8_t *buf, size_t size)
for(antenna_stat_t::iterator it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tANT\t%" PRIx64 "\t%d:%d:%d:%d\n", ts, it->first, it->second.count_all, it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max);
}
antenna_stat.clear();
fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_p_dec_err, count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad);
count_p_all = 0;
count_p_dec_err = 0;
count_p_dec_ok = 0;
count_p_fec_recovered = 0;
count_p_lost = 0;
count_p_bad = 0;
}
void Aggregator::log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, uint8_t ant, uint8_t rssi)
{
// key: addr + port + wlan_idx + ant
uint64_t key = 0;
if (sockaddr != NULL && sockaddr->sin_family == AF_INET)
{
key = ((uint64_t)ntohl(sockaddr->sin_addr.s_addr) << 32 | (uint64_t)ntohs(sockaddr->sin_port) << 16);
}
key |= ((uint64_t)wlan_idx << 8 | (uint64_t)ant);
antenna_stat[key].log_rssi(rssi);
}
void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr)
{
uint8_t new_session_key[sizeof(session_key)];
count_p_all += 1;
if(size == 0) return;
if(size == 0) return;
if (size > MAX_FORWARDER_PACKET_SIZE)
{
fprintf(stderr, "long packet (fec payload)\n");
count_p_bad += 1;
return;
}
@ -329,6 +391,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
if(size < sizeof(wblock_hdr_t) + sizeof(wpacket_hdr_t))
{
fprintf(stderr, "short packet (fec header)\n");
count_p_bad += 1;
return;
}
break;
@ -337,6 +400,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
if(size != sizeof(wsession_key_t))
{
fprintf(stderr, "invalid session key packet\n");
count_p_bad += 1;
return;
}
@ -346,9 +410,12 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
tx_publickey, rx_secretkey) != 0)
{
fprintf(stderr, "unable to decrypt session key\n");
count_p_dec_err += 1;
return;
}
count_p_dec_ok += 1;
if (memcmp(session_key, new_session_key, sizeof(session_key)) != 0)
{
fprintf(stderr, "New session detected\n");
@ -370,6 +437,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
default:
fprintf(stderr, "Unknown packet type 0x%x\n", buf[0]);
count_p_bad += 1;
return;
}
@ -384,10 +452,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
sizeof(wblock_hdr_t),
(uint8_t*)(&(block_hdr->nonce)), session_key) != 0)
{
fprintf(stderr, "unable to decrypt packet #0x%Lx\n", (long long unsigned int)(be64toh(block_hdr->nonce)));
fprintf(stderr, "unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->nonce));
count_p_dec_err += 1;
return;
}
count_p_dec_ok += 1;
log_rssi(sockaddr, wlan_idx, antenna, rssi);
assert(decrypted_len <= MAX_FEC_PAYLOAD);
uint64_t block_idx = be64toh(block_hdr->nonce) >> 8;
@ -397,12 +469,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
if (block_idx > MAX_BLOCK_IDX)
{
fprintf(stderr, "block_idx overflow\n");
count_p_bad += 1;
return;
}
if (fragment_idx >= fec_n)
{
fprintf(stderr, "invalid fragment_idx: %d\n", fragment_idx);
count_p_bad += 1;
return;
}
@ -441,6 +515,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size)
apply_fec(ring_idx);
while(p->send_fragment_idx < fec_k)
{
count_p_fec_recovered += 1;
send_packet(ring_idx, p->send_fragment_idx);
p->send_fragment_idx += 1;
}
@ -468,6 +543,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
if (packet_seq > seq + 1)
{
fprintf(stderr, "%u packets lost\n", packet_seq - seq - 1);
count_p_lost += (packet_seq - seq - 1);
}
seq = packet_seq;
@ -475,6 +551,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
if(packet_size > MAX_PAYLOAD_SIZE)
{
fprintf(stderr, "corrupted packet %u\n", seq);
count_p_bad += 1;
}else{
send(sockfd, payload, packet_size, 0);
}
@ -512,17 +589,150 @@ void Aggregator::apply_fec(int ring_idx)
fec_decode(fec_p, (const uint8_t**)in_blocks, out_blocks, index, MAX_FEC_PAYLOAD);
}
void radio_loop(int argc, char* const *argv, int optind, int radio_port, shared_ptr<BaseAggregator> agg, int log_interval)
{
int nfds = min(argc - optind, MAX_RX_INTERFACES);
uint64_t log_send_ts = 0;
struct pollfd fds[MAX_RX_INTERFACES];
Receiver* rx[MAX_RX_INTERFACES];
memset(fds, '\0', sizeof(fds));
for(int i = 0; i < nfds; i++)
{
rx[i] = new Receiver(argv[optind + i], i, radio_port, agg.get());
fds[i].fd = rx[i]->getfd();
fds[i].events = POLLIN;
}
for(;;)
{
uint64_t cur_ts = get_time_ms();
int rc = poll(fds, nfds, log_send_ts > cur_ts ? log_send_ts - cur_ts : 0);
if (rc < 0){
if (errno == EINTR || errno == EAGAIN) continue;
throw runtime_error(string_format("Poll error: %s", strerror(errno)));
}
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg->dump_stats(stdout);
log_send_ts = get_time_ms() + log_interval;
}
if (rc == 0) continue; // timeout expired
for(int i = 0; rc > 0 && i < nfds; i++)
{
if (fds[i].revents & (POLLERR|POLLNVAL))
{
throw runtime_error("socket error!");
}
if (fds[i].revents & POLLIN){
rx[i]->loop_iter();
rc -= 1;
}
}
}
}
void network_loop(int srv_port, Aggregator &agg, int log_interval)
{
wrxfwd_t fwd_hdr;
struct sockaddr_in sockaddr;
uint8_t buf[MAX_FORWARDER_PACKET_SIZE];
uint64_t log_send_ts = 0;
struct pollfd fds[1];
int fd = open_udp_socket_for_rx(srv_port);
if(fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK) < 0)
{
throw runtime_error(string_format("Unable to set socket into nonblocked mode: %s", strerror(errno)));
}
memset(fds, '\0', sizeof(fds));
fds[0].fd = fd;
fds[0].events = POLLIN;
for(;;)
{
uint64_t cur_ts = get_time_ms();
int rc = poll(fds, 1, log_send_ts > cur_ts ? log_send_ts - cur_ts : 0);
if (rc < 0){
if (errno == EINTR || errno == EAGAIN) continue;
throw runtime_error(string_format("poll error: %s", strerror(errno)));
}
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg.dump_stats(stdout);
log_send_ts = get_time_ms() + log_interval;
}
if (rc == 0) continue; // timeout expired
// some events detected
if (fds[0].revents & (POLLERR | POLLNVAL))
{
throw runtime_error(string_format("socket error: %s", strerror(errno)));
}
if (fds[0].revents & POLLIN)
{
for(;;) // process pending rx
{
memset((void*)&sockaddr, '\0', sizeof(sockaddr));
struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr,
.iov_len = sizeof(fwd_hdr)},
{ .iov_base = (void*)buf,
.iov_len = sizeof(buf) }};
struct msghdr msghdr = { .msg_name = (void*)&sockaddr,
.msg_namelen = sizeof(sockaddr),
.msg_iov = iov,
.msg_iovlen = 2,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0};
ssize_t rsize = recvmsg(fd, &msghdr, 0);
if (rsize < 0)
{
break;
}
if (rsize < sizeof(wrxfwd_t))
{
fprintf(stderr, "short packet (rx fwd header)\n");
continue;
}
agg.process_packet(buf, rsize - sizeof(wrxfwd_t), fwd_hdr.wlan_idx, fwd_hdr.antenna, fwd_hdr.rssi, &sockaddr);
}
if(errno != EWOULDBLOCK) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
}
}
}
int main(int argc, char* const *argv)
{
int opt;
uint8_t k=8, n=12, radio_port=1;
int client_port=5600;
int srv_port=0;
string client_addr="127.0.0.1";
uint8_t k = 8, n = 12, radio_port = 1;
int log_interval = 1000;
int client_port = 5600;
int srv_port = 0;
string client_addr = "127.0.0.1";
rx_mode_t rx_mode = LOCAL;
string keypair = "rx.key";
while ((opt = getopt(argc, argv, "K:fa:k:n:c:u:p:")) != -1) {
while ((opt = getopt(argc, argv, "K:fa:k:n:c:u:p:l:")) != -1) {
switch (opt) {
case 'K':
keypair = optarg;
@ -549,12 +759,15 @@ int main(int argc, char* const *argv)
case 'p':
radio_port = atoi(optarg);
break;
case 'l':
log_interval = atoi(optarg);
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-p radio_port] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Local receiver: %s [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-p radio_port] [-l log_interval] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port]\n", argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, connect=%s:%d, radio_port=%d\n", keypair.c_str(), k, n, client_addr.c_str(), client_port, radio_port);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-k RS_K] [-n RS_N] [-c client_addr] [-u client_port] [-l log_interval]\n", argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, connect=%s:%d, radio_port=%d, log_interval=%d\n", keypair.c_str(), k, n, client_addr.c_str(), client_port, radio_port, log_interval);
exit(1);
}
}
@ -565,10 +778,6 @@ int main(int argc, char* const *argv)
{
if (optind >= argc) goto show_usage;
int nfds = min(argc - optind, MAX_RX_INTERFACES);
struct pollfd fds[MAX_RX_INTERFACES];
Receiver* rx[MAX_RX_INTERFACES];
shared_ptr<BaseAggregator> agg;
if(rx_mode == LOCAL){
agg = shared_ptr<Aggregator>(new Aggregator(client_addr, client_port, k, n, keypair));
@ -576,49 +785,13 @@ int main(int argc, char* const *argv)
agg = shared_ptr<Forwarder>(new Forwarder(client_addr, client_port));
}
memset(fds, '\0', sizeof(fds));
for(int i = 0; i < nfds; i++)
{
rx[i] = new Receiver(argv[optind + i], radio_port, agg.get());
fds[i].fd = rx[i]->getfd();
fds[i].events = POLLIN;
}
while(1)
{
int rc = poll(fds, nfds, 1000);
if (rc < 0){
if (errno == EINTR || errno == EAGAIN) continue;
throw runtime_error(string_format("Poll error: %s", strerror(errno)));
}
for(int i = 0; rc > 0 && i < nfds; i++)
{
if (fds[i].revents & POLLERR)
{
throw runtime_error("socket error!");
}
if (fds[i].revents & POLLIN){
rx[i]->loop_iter();
rc -= 1;
}
}
}
radio_loop(argc, argv, optind, radio_port, agg, log_interval);
}else if(rx_mode == AGGREGATOR)
{
if (optind > argc) goto show_usage;
uint8_t buf[MAX_FORWARDER_PACKET_SIZE];
int fd = open_udp_socket_for_rx(srv_port);
Aggregator agg(client_addr, client_port, k, n, keypair);
for(;;)
{
ssize_t rsize = recv(fd, buf, sizeof(buf), 0);
if (rsize < 0) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
agg.process_packet(buf, rsize);
}
network_loop(srv_port, agg, log_interval);
}else{
throw runtime_error(string_format("Unknown rx_mode=%d", rx_mode));
}

49
rx.hpp
View File

@ -17,6 +17,7 @@
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <unordered_map>
typedef enum {
LOCAL,
@ -27,8 +28,8 @@ typedef enum {
class BaseAggregator
{
public:
virtual void process_packet(const uint8_t *buf, size_t size) = 0;
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr) = 0;
virtual void dump_stats(FILE *fp) = 0;
protected:
int open_udp_socket_for_tx(const string &client_addr, int client_port)
{
@ -55,8 +56,8 @@ class Forwarder : public BaseAggregator
public:
Forwarder(const string &client_addr, int client_port);
~Forwarder();
virtual void process_packet(const uint8_t *buf, size_t size);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp) {}
private:
int sockfd;
};
@ -78,15 +79,42 @@ static inline int modN(int x, int base)
return (base + (x % base)) % base;
}
class antennaItem
{
public:
antennaItem(void) : count_all(0), rssi_sum(0), rssi_min(0), rssi_max(0) {}
void log_rssi(uint8_t rssi){
if(count_all == 0){
rssi_min = rssi;
rssi_max = rssi;
} else {
rssi_min = min(rssi, rssi_min);
rssi_max = max(rssi, rssi_max);
}
rssi_sum += rssi;
count_all += 1;
}
uint32_t count_all;
uint32_t rssi_sum;
uint8_t rssi_min;
uint8_t rssi_max;
};
typedef std::unordered_map<uint64_t, antennaItem> antenna_stat_t;
class Aggregator : public BaseAggregator
{
public:
Aggregator(const string &client_addr, int client_port, int k, int n, const string &keypair);
~Aggregator();
virtual void process_packet(const uint8_t *buf, size_t size);
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, uint8_t antenna, uint8_t rssi, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);
private:
void send_packet(int ring_idx, int fragment_idx);
void apply_fec(int ring_idx);
void log_rssi(const sockaddr_in *sockaddr, uint8_t wlan_idx, uint8_t ant, uint8_t rssi);
int get_block_ring_idx(uint64_t block_idx);
int rx_ring_push(void);
fec_t* fec_p;
@ -103,16 +131,25 @@ private:
uint8_t rx_secretkey[crypto_box_SECRETKEYBYTES];
uint8_t tx_publickey[crypto_box_PUBLICKEYBYTES];
uint8_t session_key[crypto_aead_chacha20poly1305_KEYBYTES];
antenna_stat_t antenna_stat;
uint32_t count_p_all;
uint32_t count_p_dec_err;
uint32_t count_p_dec_ok;
uint32_t count_p_fec_recovered;
uint32_t count_p_lost;
uint32_t count_p_bad;
};
class Receiver
{
public:
Receiver(const char* wlan, int port, BaseAggregator* agg);
Receiver(const char* wlan, int wlan_idx, int port, BaseAggregator* agg);
~Receiver();
void loop_iter(void);
int getfd(void){ return fd; }
private:
int wlan_idx;
BaseAggregator *agg;
int fd;
pcap_t *ppcap;

21
tx.cpp
View File

@ -204,13 +204,6 @@ void Transmitter::send_packet(const uint8_t *buf, size_t size)
}
}
uint64_t get_system_time(void) // in milliseconds
{
struct timeval te;
gettimeofday(&te, NULL);
return te.tv_sec * 1000LL + te.tv_usec / 1000;
}
void video_source(Transmitter *t, int fd)
{
uint8_t buf[MAX_PAYLOAD_SIZE];
@ -219,7 +212,7 @@ void video_source(Transmitter *t, int fd)
{
ssize_t rsize = recv(fd, buf, sizeof(buf), 0);
if (rsize < 0) throw runtime_error(string_format("Error receiving packet: %s", strerror(errno)));
uint64_t cur_ts = get_system_time();
uint64_t cur_ts = get_time_ms();
if (cur_ts >= session_key_announce_ts)
{
// Announce session key
@ -246,12 +239,12 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency)
size_t agg_size = 0;
uint8_t agg_buf[MAX_PAYLOAD_SIZE];
uint64_t expire_ts = get_system_time() + agg_latency;
uint64_t expire_ts = get_time_ms() + agg_latency;
uint64_t session_key_announce_ts = 0;
for(;;)
{
uint64_t cur_ts = get_system_time();
uint64_t cur_ts = get_time_ms();
int rc = poll(fds, 1, expire_ts > cur_ts ? expire_ts - cur_ts : 0);
if (rc < 0){
@ -263,7 +256,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency)
{
if(agg_size > 0)
{
cur_ts = get_system_time();
cur_ts = get_time_ms();
if (cur_ts >= session_key_announce_ts)
{
// Announce session key
@ -273,7 +266,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency)
t->send_packet(agg_buf, agg_size);
agg_size = 0;
}
expire_ts = get_system_time() + agg_latency;
expire_ts = get_time_ms() + agg_latency;
continue;
}
@ -293,7 +286,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency)
{
if(agg_size > 0)
{
cur_ts = get_system_time();
cur_ts = get_time_ms();
if (cur_ts >= session_key_announce_ts)
{
// Announce session key
@ -303,7 +296,7 @@ void mavlink_source(Transmitter *t, int fd, int agg_latency)
t->send_packet(agg_buf, agg_size);
agg_size = 0;
}
expire_ts = get_system_time() + agg_latency;
expire_ts = get_time_ms() + agg_latency;
}
memcpy(agg_buf + agg_size, buf, rsize);
agg_size += rsize;

19
tx.hpp
View File

@ -85,7 +85,24 @@ public:
private:
virtual void inject_packet(const uint8_t *buf, size_t size)
{
send(sockfd, buf, size, 0);
wrxfwd_t fwd_hdr = { .wlan_idx = (uint8_t)(rand() % 2),
.antenna = (uint8_t)(rand() % 2),
.rssi = (uint8_t)(rand() & 0xff) };
struct iovec iov[2] = {{ .iov_base = (void*)&fwd_hdr,
.iov_len = sizeof(fwd_hdr)},
{ .iov_base = (void*)buf,
.iov_len = size }};
struct msghdr msghdr = { .msg_name = NULL,
.msg_namelen = 0,
.msg_iov = iov,
.msg_iovlen = 2,
.msg_control = NULL,
.msg_controllen = 0,
.msg_flags = 0};
sendmsg(sockfd, &msghdr, 0);
}
int open_udp_socket(const string &client_addr, int client_port)

View File

@ -8,6 +8,14 @@
#include "wifibroadcast.hpp"
uint64_t get_time_ms(void) // in milliseconds
{
struct timespec ts;
int rc = clock_gettime(CLOCK_MONOTONIC, &ts);
if (rc < 0) throw runtime_error(string_format("Error getting time: %s", strerror(errno)));
return ts.tv_sec * 1000LL + ts.tv_nsec / 1000000;
}
int open_udp_socket_for_rx(int port)
{
struct sockaddr_in saddr;

View File

@ -113,6 +113,11 @@ static uint8_t ieee80211_header[] = {
#define SESSION_KEY_ANNOUNCE_MSEC 1000
typedef struct {
uint8_t wlan_idx;
uint8_t antenna; //RADIOTAP_ANTENNA
uint8_t rssi; //RADIOTAP_DBM_ANTSIGNAL
} __attribute__ ((packed)) wrxfwd_t;
// Network packet headers. All numbers are in network (big endian) format
// Encrypted packets can be either session key or data packet.
@ -143,5 +148,6 @@ typedef struct {
#define MAX_FORWARDER_PACKET_SIZE (MAX_PACKET_SIZE - sizeof(radiotap_header) - sizeof(ieee80211_header))
int open_udp_socket_for_rx(int port);
uint64_t get_time_ms(void);
#endif