Refactoring logging and ipc subsystem

This commit is contained in:
Vasily Evseenko 2024-11-29 15:29:06 +03:00
parent fbd9a61d01
commit d1b36002cb
6 changed files with 140 additions and 105 deletions

View File

@ -227,7 +227,7 @@ void Receiver::loop_iter(void)
} /* while more rt headers */
if (ret != -ENOENT && ant_idx < RX_ANT_MAX){
fprintf(stderr, "Error parsing radiotap header!\n");
WFB_ERR("Error parsing radiotap header!\n");
continue;
}
@ -244,7 +244,7 @@ void Receiver::loop_iter(void)
if (flags & IEEE80211_RADIOTAP_F_BADFCS)
{
fprintf(stderr, "Got packet with bad fsc\n");
WFB_ERR("Got packet with bad fsc\n");
continue;
}
@ -252,13 +252,12 @@ void Receiver::loop_iter(void)
pkt += iterator._max_length;
pktlen -= iterator._max_length;
//fprintf(stderr, "CAPTURE: mcs: %u, bw: %u\n", mcs_index, bandwidth);
if (pktlen > (int)sizeof(ieee80211_header))
{
agg->process_packet(pkt + sizeof(ieee80211_header), pktlen - sizeof(ieee80211_header),
wlan_idx, antenna, rssi, noise, freq, mcs_index, bandwidth, NULL);
} else {
fprintf(stderr, "Short packet (ieee header)\n");
WFB_ERR("Short packet (ieee header)\n");
continue;
}
}
@ -428,9 +427,7 @@ int Aggregator::rx_ring_push(void)
2. Reduce packet injection speed or try to unify RX hardware.
*/
#if 0
fprintf(stderr, "Override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments);
#endif
WFB_DBG("AGG: Override block 0x%" PRIx64 " flush %d fragments\n", rx_ring[rx_ring_front].block_idx, rx_ring[rx_ring_front].has_fragments);
count_p_override += 1;
@ -480,31 +477,31 @@ int Aggregator::get_block_ring_idx(uint64_t block_idx)
return ring_idx;
}
void Aggregator::dump_stats(FILE *fp)
void Aggregator::dump_stats(void)
{
//timestamp in ms
uint64_t ts = get_time_ms();
for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
IPC_MSG("%" PRIu64 "\tRX_ANT\t%u:%u:%u\t%" PRIx64 "\t%d" ":%d:%d:%d" ":%d:%d:%d\n",
ts, it->first.freq, it->first.mcs_index, it->first.bandwidth, it->first.antenna_id, it->second.count_all,
it->second.rssi_min, it->second.rssi_sum / it->second.count_all, it->second.rssi_max,
it->second.snr_min, it->second.snr_sum / it->second.count_all, it->second.snr_max);
}
fprintf(fp, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
IPC_MSG("%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u:%u:%u\n", ts, count_p_all, count_b_all, count_p_dec_err,
count_p_dec_ok, count_p_fec_recovered, count_p_lost, count_p_bad, count_p_outgoing, count_b_outgoing);
fflush(fp);
IPC_MSG_SEND();
if(count_p_override)
{
fprintf(stderr, "%u block overrides\n", count_p_override);
WFB_ERR("%u block overrides\n", count_p_override);
}
if(count_p_lost)
{
fprintf(stderr, "%u packets lost\n", count_p_lost);
WFB_ERR("%u packets lost\n", count_p_lost);
}
clear_stats();
@ -571,7 +568,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if (size > MAX_FORWARDER_PACKET_SIZE)
{
fprintf(stderr, "Long packet (fec payload)\n");
WFB_ERR("Long packet (fec payload)\n");
count_p_bad += 1;
return;
}
@ -581,7 +578,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
case WFB_PACKET_DATA:
if(size < sizeof(wblock_hdr_t) + crypto_aead_chacha20poly1305_ABYTES + sizeof(wpacket_hdr_t))
{
fprintf(stderr, "Short packet (fec header)\n");
WFB_ERR("Short packet (fec header)\n");
count_p_bad += 1;
return;
}
@ -593,7 +590,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if(size < sizeof(wsession_hdr_t) + sizeof(wsession_data_t) + crypto_box_MACBYTES || \
size > MAX_SESSION_PACKET_SIZE)
{
fprintf(stderr, "Invalid session key packet\n");
WFB_ERR("Invalid session key packet\n");
count_p_bad += 1;
return;
}
@ -604,7 +601,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
((wsession_hdr_t*)buf)->session_nonce,
tx_publickey, rx_secretkey) != 0)
{
fprintf(stderr, "Unable to decrypt session key\n");
WFB_ERR("Unable to decrypt session key\n");
count_p_dec_err += 1;
return;
}
@ -613,35 +610,35 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
if (be64toh(new_session_data->epoch) < epoch)
{
fprintf(stderr, "Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data->epoch), epoch);
WFB_ERR("Session epoch doesn't match: %" PRIu64 " < %" PRIu64 "\n", be64toh(new_session_data->epoch), epoch);
count_p_dec_err += 1;
return;
}
if (be32toh(new_session_data->channel_id) != channel_id)
{
fprintf(stderr, "Session channel_id doesn't match: %u != %u\n", be32toh(new_session_data->channel_id), channel_id);
WFB_ERR("Session channel_id doesn't match: %u != %u\n", be32toh(new_session_data->channel_id), channel_id);
count_p_dec_err += 1;
return;
}
if (new_session_data->fec_type != WFB_FEC_VDM_RS)
{
fprintf(stderr, "Unsupported FEC codec type: %d\n", new_session_data->fec_type);
WFB_ERR("Unsupported FEC codec type: %d\n", new_session_data->fec_type);
count_p_dec_err += 1;
return;
}
if (new_session_data->n < 1)
{
fprintf(stderr, "Invalid FEC N: %d\n", new_session_data->n);
WFB_ERR("Invalid FEC N: %d\n", new_session_data->n);
count_p_dec_err += 1;
return;
}
if (new_session_data->k < 1 || new_session_data->k > new_session_data->n)
{
fprintf(stderr, "Invalid FEC K: %d\n", new_session_data->k);
WFB_ERR("Invalid FEC K: %d\n", new_session_data->k);
count_p_dec_err += 1;
return;
}
@ -661,15 +658,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
init_fec(new_session_data->k, new_session_data->n);
fprintf(stdout, "%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n);
fflush(stdout);
IPC_MSG("%" PRIu64 "\tSESSION\t%" PRIu64 ":%u:%d:%d\n", get_time_ms(), epoch, WFB_FEC_VDM_RS, fec_k, fec_n);
IPC_MSG_SEND();
}
return;
default:
fprintf(stderr, "Unknown packet type 0x%x\n", buf[0]);
WFB_ERR("Unknown packet type 0x%x\n", buf[0]);
count_p_bad += 1;
return;
}
@ -685,7 +681,7 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
sizeof(wblock_hdr_t),
(uint8_t*)(&(block_hdr->data_nonce)), session_key) != 0)
{
fprintf(stderr, "Unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->data_nonce));
WFB_ERR("Unable to decrypt packet #0x%" PRIx64 "\n", be64toh(block_hdr->data_nonce));
count_p_dec_err += 1;
return;
}
@ -702,14 +698,14 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
// Should never happend due to generating new session key on tx side
if (block_idx > MAX_BLOCK_IDX)
{
fprintf(stderr, "block_idx overflow\n");
WFB_ERR("block_idx overflow\n");
count_p_bad += 1;
return;
}
if (fragment_idx >= fec_n)
{
fprintf(stderr, "Invalid fragment_idx: %d\n", fragment_idx);
WFB_ERR("Invalid fragment_idx: %d\n", fragment_idx);
count_p_bad += 1;
return;
}
@ -782,6 +778,8 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
{
if(! p->fragment_map[f_idx])
{
uint32_t fec_count = 0;
//Recover missed fragments using FEC
apply_fec(ring_idx);
@ -790,9 +788,15 @@ void Aggregator::process_packet(const uint8_t *buf, size_t size, uint8_t wlan_id
{
if(! p->fragment_map[f_idx])
{
count_p_fec_recovered += 1;
fec_count += 1;
}
}
if(fec_count)
{
count_p_fec_recovered += fec_count;
WFB_DBG("FEC recovered %d packets\n", fec_count);
}
break;
}
}
@ -820,6 +824,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
if (packet_seq > seq + 1 && seq > 0)
{
ANDROID_IPC_MSG("PKT_LOST\t%d", (packet_seq - seq - 1));
count_p_lost += (packet_seq - seq - 1);
}
@ -827,7 +832,7 @@ void Aggregator::send_packet(int ring_idx, int fragment_idx)
if(packet_size > MAX_PAYLOAD_SIZE)
{
fprintf(stderr, "Corrupted packet %u\n", seq);
WFB_ERR("Corrupted packet %u\n", seq);
count_p_bad += 1;
}
else if(!(flags & WFB_PACKET_FEC_ONLY))
@ -914,7 +919,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, un
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg->dump_stats(stdout);
agg->dump_stats();
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}
@ -961,7 +966,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s
cur_ts = get_time_ms();
if (cur_ts >= log_send_ts)
{
agg.dump_stats(stdout);
agg.dump_stats();
log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval);
}
@ -1064,12 +1069,12 @@ int main(int argc, char* const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]);
fprintf(stderr, "Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
fprintf(stderr, "Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
fprintf(stderr, "WFB-ng version %s\n", WFB_VERSION);
fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n");
WFB_INFO("Local receiver: %s [-K rx_key] [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-l log_interval] [-e epoch] [-i link_id] interface1 [interface2] ...\n", argv[0]);
WFB_INFO("Remote (forwarder): %s -f [-c client_addr] [-u client_port] [-p radio_port] [-R rcv_buf] [-i link_id] interface1 [interface2] ...\n", argv[0]);
WFB_INFO("Remote (aggregator): %s -a server_port [-K rx_key] [-c client_addr] [-R rcv_buf] [-u client_port] [-l log_interval] [-p radio_port] [-e epoch] [-i link_id]\n", argv[0]);
WFB_INFO("Default: K='%s', connect=%s:%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", log_interval=%d, rcv_buf=system_default\n", keypair.c_str(), client_addr.c_str(), client_port, link_id, radio_port, epoch, log_interval);
WFB_INFO("WFB-ng version %s\n", WFB_VERSION);
WFB_INFO("WFB-ng home page: <http://wfb-ng.org>\n");
exit(1);
}
}
@ -1080,7 +1085,7 @@ int main(int argc, char* const *argv)
if ((fd = open("/dev/random", O_RDONLY)) != -1) {
if (ioctl(fd, RNDGETENTCNT, &c) == 0 && c < 160) {
fprintf(stderr, "This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
WFB_ERR("This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
"Installing the rng-utils/rng-tools, jitterentropy or haveged packages may help.\n"
"On virtualized Linux environments, also consider using virtio-rng.\n"
"The service will not start until enough entropy has been collected.\n");
@ -1091,7 +1096,7 @@ int main(int argc, char* const *argv)
if (sodium_init() < 0)
{
fprintf(stderr, "Libsodium init failed\n");
WFB_ERR("Libsodium init failed\n");
return 1;
}
@ -1121,7 +1126,7 @@ int main(int argc, char* const *argv)
}
}catch(runtime_error &e)
{
fprintf(stderr, "Error: %s\n", e.what());
WFB_ERR("Error: %s\n", e.what());
exit(1);
}
return 0;

View File

@ -45,7 +45,7 @@ public:
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr) = 0;
virtual void dump_stats(FILE *fp) = 0;
virtual void dump_stats(void) = 0;
};
@ -57,7 +57,7 @@ public:
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth,sockaddr_in *sockaddr);
virtual void dump_stats(FILE *) {}
virtual void dump_stats(void) {}
private:
int sockfd;
struct sockaddr_in saddr;
@ -163,7 +163,7 @@ public:
virtual void process_packet(const uint8_t *buf, size_t size, uint8_t wlan_idx, const uint8_t *antenna,
const int8_t *rssi, const int8_t *noise, uint16_t freq, uint8_t mcs_index,
uint8_t bandwidth, sockaddr_in *sockaddr);
virtual void dump_stats(FILE *fp);
virtual void dump_stats(void);
// Make stats public for android userspace receiver
void clear_stats(void)

View File

@ -358,11 +358,11 @@ void RawSocketTransmitter::inject_packet(const uint8_t *buf, size_t size)
}
void RawSocketTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
void RawSocketTransmitter::dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
{
for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
IPC_MSG("%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
ts, it->first,
it->second.count_p_injected, it->second.count_p_dropped,
it->second.latency_min,
@ -520,11 +520,11 @@ void RemoteTransmitter::inject_packet(const uint8_t *buf, size_t size)
}
void RemoteTransmitter::dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
void RemoteTransmitter::dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes)
{
for(auto it = antenna_stat.begin(); it != antenna_stat.end(); it++)
{
fprintf(fp, "%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
IPC_MSG("%" PRIu64 "\tTX_ANT\t%" PRIx64 "\t%u:%u:%" PRIu64 ":%" PRIu64 ":%" PRIu64 "\n",
ts, it->first,
it->second.count_p_injected, it->second.count_p_dropped,
it->second.latency_min,
@ -565,7 +565,7 @@ void Transmitter::send_block_fragment(size_t packet_size)
void Transmitter::send_session_key(void)
{
//fprintf(stderr, "Announce session key\n");
WFB_DBG("Announce session key\n");
inject_packet((uint8_t*)session_packet, session_packet_size);
}
@ -711,20 +711,20 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
if (cur_ts >= log_send_ts) // log timeout expired
{
t->dump_stats(stdout, cur_ts, count_p_injected, count_p_dropped, count_b_injected);
t->dump_stats(cur_ts, count_p_injected, count_p_dropped, count_b_injected);
fprintf(stdout, "%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n",
IPC_MSG("%" PRIu64 "\tPKT\t%u:%u:%u:%u:%u:%u:%u\n",
cur_ts, count_p_fec_timeouts, count_p_incoming, count_b_incoming, count_p_injected, count_b_injected, count_p_dropped, count_p_truncated);
fflush(stdout);
IPC_MSG_SEND();
if(count_p_dropped)
{
fprintf(stderr, "%u packets dropped\n", count_p_dropped);
WFB_ERR("%u packets dropped\n", count_p_dropped);
}
if(count_p_truncated)
{
fprintf(stderr, "%u packets truncated\n", count_p_truncated);
WFB_ERR("%u packets truncated\n", count_p_truncated);
}
count_p_fec_timeouts = 0;
@ -786,7 +786,7 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
{
resp.rc = htonl(EINVAL);
sendto(fd, &resp, offsetof(cmd_resp_t, u), MSG_DONTWAIT, (sockaddr*)&from_addr, addr_size);
fprintf(stderr, "Rejecting new FEC settings");
WFB_ERR("Rejecting new FEC settings");
continue;
}
@ -802,7 +802,7 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
}
sendto(fd, &resp, offsetof(cmd_resp_t, u), MSG_DONTWAIT, (sockaddr*)&from_addr, addr_size);
fprintf(stderr, "Session restarted with FEC %d/%d\n", fec_k, fec_n);
WFB_INFO("Session restarted with FEC %d/%d\n", fec_k, fec_n);
}
break;
@ -830,13 +830,12 @@ void data_source(unique_ptr<Transmitter> &t, vector<int> &rx_fd, int control_fd,
{
resp.rc = htonl(EINVAL);
sendto(fd, &resp, offsetof(cmd_resp_t, u), MSG_DONTWAIT, (sockaddr*)&from_addr, addr_size);
fprintf(stderr, "Rejecting new radiotap header: %s\n", e.what());
WFB_ERR("Rejecting new radiotap header: %s\n", e.what());
continue;
}
sendto(fd, &resp, offsetof(cmd_resp_t, u), MSG_DONTWAIT, (sockaddr*)&from_addr, addr_size);
fprintf(stderr,
"Radiotap updated with stbc=%d, ldpc=%d, short_gi=%d, bandwidth=%d, mcs_index=%d, vht_mode=%d, vht_nss=%d\n",
WFB_INFO("Radiotap updated with stbc=%d, ldpc=%d, short_gi=%d, bandwidth=%d, mcs_index=%d, vht_mode=%d, vht_nss=%d\n",
req.u.cmd_set_radio.stbc,
req.u.cmd_set_radio.ldpc,
req.u.cmd_set_radio.short_gi,
@ -1174,12 +1173,12 @@ void packet_injector(RawSocketInjector &t, vector<int> &rx_fd, int log_interval)
{
if(count_p_dropped)
{
fprintf(stderr, "%u packets dropped\n", count_p_dropped);
WFB_ERR("%u packets dropped\n", count_p_dropped);
}
if(count_p_bad)
{
fprintf(stderr, "%u packets bad\n", count_p_bad);
WFB_ERR("%u packets bad\n", count_p_bad);
}
count_p_incoming = 0;
@ -1306,17 +1305,17 @@ void injector_loop(int argc, char* const* argv, int optind, int srv_port, int rc
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
}
fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]);
WFB_INFO("Listen on %d for %s\n", bind_port, argv[optind + i]);
rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i]));
}
if (srv_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
IPC_MSG_SEND();
}
auto t = RawSocketInjector(wlans, use_qdisc);
@ -1338,10 +1337,10 @@ int open_control_fd(int control_port)
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
control_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP_CONTROL\t%d\n", get_time_ms(), control_port);
}
fprintf(stderr, "Listen on %d for management commands\n", control_port);
WFB_INFO("Listen on %d for management commands\n", control_port);
return control_fd;
}
@ -1370,22 +1369,22 @@ void local_loop(int argc, char* const* argv, int optind, int srv_port, int rcv_b
throw runtime_error(string_format("Unable to get socket info: %s", strerror(errno)));
}
bind_port = ntohs(saddr.sin_port);
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP\t%d:%x\n", get_time_ms(), bind_port, i);
}
fprintf(stderr, "Listen on %d for %s\n", bind_port, argv[optind + i]);
WFB_INFO("Listen on %d for %s\n", bind_port, argv[optind + i]);
rx_fd.push_back(fd);
wlans.push_back(string(argv[optind + i]));
}
if (udp_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
IPC_MSG_SEND();
}
if (debug_port)
{
fprintf(stderr, "Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port);
WFB_INFO("Using %zu ports from %d for wlan emulation\n", wlans.size(), debug_port);
t = unique_ptr<UdpTransmitter>(new UdpTransmitter(k, n, keypair, "127.0.0.1", debug_port, epoch, channel_id,
fec_delay, tags, use_qdisc, fwmark));
}
@ -1446,10 +1445,10 @@ void distributor_loop(int argc, char* const* argv, int optind, int srv_port, int
bind_port = ntohs(saddr.sin_port);
uint64_t wlan_id = (uint64_t)ntohl(inet_addr(remote_host.c_str())) << 24 | j;
printf("%" PRIu64 "\tLISTEN_UDP\t%d:%" PRIx64 "\n", get_time_ms(), bind_port, wlan_id);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP\t%d:%" PRIx64 "\n", get_time_ms(), bind_port, wlan_id);
}
fprintf(stderr, "Listen on %d for %s:%d\n", bind_port, remote_host.c_str(), remote_port);
WFB_INFO("Listen on %d for %s:%d\n", bind_port, remote_host.c_str(), remote_port);
rx_fd.push_back(fd);
remote_ports.push_back(remote_port);
@ -1460,8 +1459,8 @@ void distributor_loop(int argc, char* const* argv, int optind, int srv_port, int
if (udp_port == 0)
{
printf("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
fflush(stdout);
IPC_MSG("%" PRIu64 "\tLISTEN_UDP_END\n", get_time_ms());
IPC_MSG_SEND();
}
vector<tags_item_t> tags;
@ -1577,17 +1576,17 @@ int main(int argc, char * const *argv)
case 'f':
if (strcmp(optarg, "data") == 0)
{
fprintf(stderr, "Using data frames\n");
WFB_INFO("Using data frames\n");
frame_type = FRAME_TYPE_DATA;
}
else if (strcmp(optarg, "rts") == 0)
{
fprintf(stderr, "Using rts frames\n");
WFB_INFO("Using rts frames\n");
frame_type = FRAME_TYPE_RTS;
}
else
{
fprintf(stderr, "Invalid frame type: %s\n", optarg);
WFB_ERR("Invalid frame type: %s\n", optarg);
exit(1);
}
break;
@ -1602,19 +1601,19 @@ int main(int argc, char * const *argv)
break;
default: /* '?' */
show_usage:
fprintf(stderr, "Local TX: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
WFB_INFO("Local TX: %s [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
" [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] interface1 [interface2] ...\n",
argv[0]);
fprintf(stderr, "TX distributor: %s -d [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
WFB_INFO("TX distributor: %s -d [-K tx_key] [-k RS_K] [-n RS_N] [-u udp_port] [-R rcv_buf] [-p radio_port] [-F fec_delay] [-B bandwidth] [-G guard_interval] [-S stbc] [-L ldpc] [-M mcs_index] [-N VHT_NSS]\n"
" [-T fec_timeout] [-l log_interval] [-e epoch] [-i link_id] [-f { data | rts }] [-m] [-V] [-Q] [-P fwmark] [-C control_port] host1:port1,port2,... [host2:port1,port2,...] ...\n",
argv[0]);
fprintf(stderr, "TX injector: %s -I port [-Q] [-R rcv_buf] [-l log_interval] interface1 [interface2] ...\n",
WFB_INFO("TX injector: %s -I port [-Q] [-R rcv_buf] [-l log_interval] interface1 [interface2] ...\n",
argv[0]);
fprintf(stderr, "Default: K='%s', k=%d, n=%d, fec_delay=%u [us], udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d vht_nss=%d, vht_mode=%d, fec_timeout=%d, log_interval=%d, rcv_buf=system_default, frame_type=data, mirror=false, use_qdisc=false, fwmark=%u, control_port=%d\n",
WFB_INFO("Default: K='%s', k=%d, n=%d, fec_delay=%u [us], udp_port=%d, link_id=0x%06x, radio_port=%u, epoch=%" PRIu64 ", bandwidth=%d guard_interval=%s stbc=%d ldpc=%d mcs_index=%d vht_nss=%d, vht_mode=%d, fec_timeout=%d, log_interval=%d, rcv_buf=system_default, frame_type=data, mirror=false, use_qdisc=false, fwmark=%u, control_port=%d\n",
keypair.c_str(), k, n, fec_delay, udp_port, link_id, radio_port, epoch, bandwidth, short_gi ? "short" : "long", stbc, ldpc, mcs_index, vht_nss, vht_mode, fec_timeout, log_interval, fwmark, control_port);
fprintf(stderr, "Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE);
fprintf(stderr, "WFB-ng version %s\n", WFB_VERSION);
fprintf(stderr, "WFB-ng home page: <http://wfb-ng.org>\n");
WFB_INFO("Radio MTU: %lu\n", (unsigned long)MAX_PAYLOAD_SIZE);
WFB_INFO("WFB-ng version %s\n", WFB_VERSION);
WFB_INFO("WFB-ng home page: <http://wfb-ng.org>\n");
exit(1);
}
}
@ -1629,7 +1628,7 @@ int main(int argc, char * const *argv)
if ((fd = open("/dev/random", O_RDONLY)) != -1) {
if (ioctl(fd, RNDGETENTCNT, &c) == 0 && c < 160) {
fprintf(stderr, "This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
WFB_ERR("This system doesn't provide enough entropy to quickly generate high-quality random numbers.\n"
"Installing the rng-utils/rng-tools, jitterentropy or haveged packages may help.\n"
"On virtualized Linux environments, also consider using virtio-rng.\n"
"The service will not start until enough entropy has been collected.\n");
@ -1640,7 +1639,7 @@ int main(int argc, char * const *argv)
if (sodium_init() < 0)
{
fprintf(stderr, "Libsodium init failed\n");
WFB_ERR("Libsodium init failed\n");
return 1;
}
@ -1676,7 +1675,7 @@ int main(int argc, char * const *argv)
}
catch(runtime_error &e)
{
fprintf(stderr, "Error: %s\n", e.what());
WFB_ERR("Error: %s\n", e.what());
exit(1);
}
return 0;

View File

@ -78,7 +78,7 @@ public:
void init_session(int k, int n);
void get_fec(int &k, int &n) { k = fec_k; n = fec_n; }
virtual void select_output(int idx) = 0;
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) = 0;
virtual void dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) = 0;
virtual void update_radiotap_header(radiotap_header_t &radiotap_header) = 0;
virtual radiotap_header_t get_radiotap_header(void) = 0;
protected:
@ -165,7 +165,7 @@ public:
current_output = idx;
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void update_radiotap_header(radiotap_header_t &radiotap_header)
{
this->radiotap_header = radiotap_header;
@ -214,7 +214,7 @@ public:
saddr.sin_port = htons((unsigned short)base_port);
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) {}
virtual void dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes) {}
virtual ~UdpTransmitter()
{
@ -310,7 +310,7 @@ public:
current_output = idx;
}
virtual void dump_stats(FILE *fp, uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void dump_stats(uint64_t ts, uint32_t &injected_packets, uint32_t &dropped_packets, uint32_t &injected_bytes);
virtual void update_radiotap_header(radiotap_header_t &radiotap_header)
{
this->radiotap_header = radiotap_header;

View File

@ -70,12 +70,18 @@ typedef struct {
} __attribute__ ((packed)) tun_packet_hdr_t;
#ifdef __DEBUG__
#define dbg_log(...) fprintf(stderr, __VA_ARGS__)
#else
#define dbg_log(...) ((void)0)
// Don't use possible C++ loggers
#ifdef WFB_DBG
#undef WFB_DBG
#endif
#ifdef __DEBUG__
#define WFB_DBG(...) fprintf(stderr, __VA_ARGS__)
#else
#define WFB_DBG(...) ((void)0)
#endif
void event_sig_cb(evutil_socket_t sig, short flags, void *arg)
{
switch (sig)
@ -88,7 +94,7 @@ void event_sig_cb(evutil_socket_t sig, short flags, void *arg)
assert(0);
}
dbg_log("Exiting...\n");
WFB_DBG("Exiting...\n");
event_base_loopexit (ev_base, NULL);
}
@ -99,7 +105,7 @@ void ev_ping_cb(evutil_socket_t fd, short flags, void *arg)
if(pkt_sem == 0)
{
dbg_log("send ping\n");
WFB_DBG("send ping\n");
sendto(fd, "", 0, MSG_DONTWAIT, (struct sockaddr*)&peer_addr, sizeof(peer_addr));
}
@ -134,7 +140,7 @@ void ev_tun_read_cb(evutil_socket_t fd, short flags, void *arg)
buf->batch_size = buf->data_size;
}
dbg_log("tun_read: packet_size=%d, batch_size=%zu, data_size=%zu\n", nread, buf->batch_size, buf->data_size);
WFB_DBG("tun_read: packet_size=%d, batch_size=%zu, data_size=%zu\n", nread, buf->batch_size, buf->data_size);
if(buf->data_size >= MTU || agg_timeout_ms == 0)
{
@ -183,7 +189,7 @@ void ev_socket_write_cb(evutil_socket_t fd, short flags, void *arg)
assert(buf->batch_size <= MTU);
sendto(fd, buf->data, buf->batch_size, MSG_DONTWAIT, (struct sockaddr*)&peer_addr, sizeof(peer_addr));
dbg_log("socket_write: batch_size=%zu, data_size=%zu\n", buf->batch_size, buf->data_size);
WFB_DBG("socket_write: batch_size=%zu, data_size=%zu\n", buf->batch_size, buf->data_size);
if(buf->data_size > buf->batch_size)
{
@ -232,7 +238,7 @@ void ev_tun_write_cb(evutil_socket_t fd, short flags, void *arg)
assert(buf->offset + sizeof(tun_packet_hdr_t) <= buf->data_size);
uint16_t packet_size = ntohs(((tun_packet_hdr_t*)(buf->data + buf->offset))->packet_size);
dbg_log("tun_write: off=%zu, psize=%zu + %d, data_size=%zu\n", buf->offset, sizeof(tun_packet_hdr_t), packet_size, buf->data_size);
WFB_DBG("tun_write: off=%zu, psize=%zu + %d, data_size=%zu\n", buf->offset, sizeof(tun_packet_hdr_t), packet_size, buf->data_size);
assert(buf->offset + sizeof(tun_packet_hdr_t) + packet_size <= buf->data_size);
nwrote = write(fd, buf->data + buf->offset + sizeof(tun_packet_hdr_t), packet_size);
@ -275,14 +281,14 @@ void ev_socket_read_cb(evutil_socket_t fd, short flags, void *arg)
{
// skip ping packet
event_add (ev_socket_read, NULL);
dbg_log("got ping\n");
WFB_DBG("got ping\n");
return;
}
buf->offset = 0;
buf->data_size = nread;
dbg_log("socket_read: off=%zu, data_size=%zu\n", buf->offset, buf->data_size);
WFB_DBG("socket_read: off=%zu, data_size=%zu\n", buf->offset, buf->data_size);
event_add(ev_tun_write, NULL);
}

View File

@ -257,6 +257,31 @@ typedef struct {
#define MAX_DISTRIBUTION_PACKET_SIZE (sizeof(uint32_t) + sizeof(radiotap_header_vht) + WIFI_MTU)
#define MAX_PCAP_PACKET_SIZE (WIFI_MTU + 256) // radiotap header is variable but 8812au/eu has max rtap buffer size 256
#ifndef WFB_DBG
#ifdef __DEBUG__
#define WFB_DBG(...) fprintf(stderr, __VA_ARGS__)
#else
#define WFB_DBG(...) ((void)0)
#endif
#endif
#ifndef WFB_ERR
#define WFB_ERR(...) fprintf(stderr, __VA_ARGS__)
#endif
#ifndef WFB_INFO
#define WFB_INFO(...) fprintf(stderr, __VA_ARGS__)
#endif
#ifndef ANDROID_IPC_MSG
#define ANDROID_IPC_MSG(...) ((void)0)
#endif
#ifndef IPC_MSG
#define IPC_MSG(...) fprintf(stdout, __VA_ARGS__)
#define IPC_MSG_SEND() fflush(stdout)
#endif
int open_udp_socket_for_rx(int port, int rcv_buf_size, uint32_t bind_addr = INADDR_ANY, int socket_type = SOCK_DGRAM, int socket_protocol = 0);
uint64_t get_time_ms(void);
uint64_t get_time_us(void);