From 549fcda0acad2a30a2bd138829f45e83e93d77f6 Mon Sep 17 00:00:00 2001 From: Vasily Evseenko Date: Sun, 22 Sep 2024 15:33:58 +0300 Subject: [PATCH] Fix logic of periodic events --- src/rx.cpp | 8 +++--- src/tx.cpp | 53 +++++++++++++++++++++++------------- wfb_ng/__init__.py | 4 +++ wfb_ng/cluster.py | 1 + wfb_ng/tests/test_twisted.py | 7 +++++ wfb_ng/tuntap.py | 29 ++++++++++++++++---- 6 files changed, 74 insertions(+), 28 deletions(-) create mode 100644 wfb_ng/tests/test_twisted.py diff --git a/src/rx.cpp b/src/rx.cpp index 1e645c3..bdae84b 100644 --- a/src/rx.cpp +++ b/src/rx.cpp @@ -877,7 +877,7 @@ void Aggregator::apply_fec(int ring_idx) void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, unique_ptr &agg, int log_interval, int rcv_buf_size) { int nfds = argc - optind; - uint64_t log_send_ts = 0; + uint64_t log_send_ts = get_time_ms(); struct pollfd fds[MAX_RX_INTERFACES]; unique_ptr rx[MAX_RX_INTERFACES]; @@ -909,7 +909,7 @@ void radio_loop(int argc, char* const *argv, int optind, uint32_t channel_id, un if (cur_ts >= log_send_ts) { agg->dump_stats(stdout); - log_send_ts = cur_ts + log_interval; + log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval); } if (rc == 0) continue; // timeout expired @@ -934,7 +934,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s struct sockaddr_in sockaddr; uint8_t buf[MAX_FORWARDER_PACKET_SIZE]; - uint64_t log_send_ts = 0; + uint64_t log_send_ts = get_time_ms(); struct pollfd fds[1]; int fd = open_udp_socket_for_rx(srv_port, rcv_buf_size); @@ -956,7 +956,7 @@ void network_loop(int srv_port, Aggregator &agg, int log_interval, int rcv_buf_s if (cur_ts >= log_send_ts) { agg.dump_stats(stdout); - log_send_ts = cur_ts + log_interval; + log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval); } if (rc == 0) continue; // timeout expired diff --git a/src/tx.cpp b/src/tx.cpp index e5fde57..e8852a5 100644 --- a/src/tx.cpp +++ b/src/tx.cpp @@ -676,9 +676,9 @@ void data_source(unique_ptr &t, vector &rx_fd, int control_fd, fds[nfds].fd = control_fd; fds[nfds].events = POLLIN; - uint64_t session_key_announce_ts = 0; + uint64_t session_key_announce_ts = get_time_ms(); uint32_t rxq_overflow = 0; - uint64_t log_send_ts = 0; + uint64_t log_send_ts = get_time_ms(); uint64_t fec_close_ts = fec_timeout > 0 ? get_time_ms() + fec_timeout : 0; uint32_t count_p_fec_timeouts = 0; // empty packets sent to close fec block due to timeout uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow) @@ -735,7 +735,7 @@ void data_source(unique_ptr &t, vector &rx_fd, int control_fd, count_p_dropped = 0; count_p_truncated = 0; - log_send_ts = cur_ts + log_interval; + log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval); } // Check control socket first @@ -917,26 +917,30 @@ void data_source(unique_ptr &t, vector &rx_fd, int control_fd, // rc > 0: events detected // start from last fd index and reset it to zero - int i = start_fd_idx; - for(start_fd_idx = 0; rc > 0; i++) + int _tmp = start_fd_idx; + start_fd_idx = 0; + + for(int i = _tmp; rc > 0; i = (i + 1) % nfds) { - if (fds[i % nfds].revents & (POLLERR | POLLNVAL)) + assert(i < nfds); + + if (fds[i].revents & (POLLERR | POLLNVAL)) { throw runtime_error(string_format("socket error: %s", strerror(errno))); } - if (fds[i % nfds].revents & POLLIN) + if (fds[i].revents & POLLIN) { uint8_t buf[MAX_PAYLOAD_SIZE + 1]; uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))]; rc -= 1; - t->select_output(mirror ? -1 : (i % nfds)); + t->select_output(mirror ? -1 : (i)); for(;;) { ssize_t rsize; - int fd = fds[i % nfds].fd; + int fd = fds[i].fd; struct iovec iov = { .iov_base = (void*)buf, .iov_len = sizeof(buf) }; @@ -980,6 +984,9 @@ void data_source(unique_ptr &t, vector &rx_fd, int control_fd, { // Announce session key t->send_session_key(); + + // Session packet interval is not in fixed grid because + // we yield session packets only if there are data packets session_key_announce_ts = cur_ts + SESSION_KEY_ANNOUNCE_MSEC; } @@ -987,8 +994,10 @@ void data_source(unique_ptr &t, vector &rx_fd, int control_fd, if (cur_ts >= log_send_ts) // log timeout expired { + // Save current index and go to outer loop // We need to transmit all packets from the queue before tx card switch - start_fd_idx = i % nfds; + start_fd_idx = i; + rc = 0; break; } } @@ -1138,7 +1147,7 @@ void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) } uint32_t rxq_overflow = 0; - uint64_t log_send_ts = 0; + uint64_t log_send_ts = get_time_ms(); uint32_t count_p_incoming = 0; // incoming udp packets (received + dropped due to rxq overflow) uint32_t count_b_incoming = 0; // incoming udp bytes (received only) @@ -1178,7 +1187,7 @@ void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) count_p_dropped = 0; count_p_bad = 0; - log_send_ts = cur_ts + log_interval; + log_send_ts = cur_ts + log_interval - ((cur_ts - log_send_ts) % log_interval); } if (rc == 0) // poll timeout @@ -1188,15 +1197,19 @@ void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) // rc > 0: events detected // start from last fd index and reset it to zero - int i = start_fd_idx; - for(start_fd_idx = 0; rc > 0; i++) + int _tmp = start_fd_idx; + start_fd_idx = 0; + + for(int i = _tmp; rc > 0; i = (i + 1) % nfds) { - if (fds[i % nfds].revents & (POLLERR | POLLNVAL)) + assert(i < nfds); + + if (fds[i].revents & (POLLERR | POLLNVAL)) { throw runtime_error(string_format("socket error: %s", strerror(errno))); } - if (fds[i % nfds].revents & POLLIN) + if (fds[i].revents & POLLIN) { uint8_t buf[MAX_DISTRIBUTION_PACKET_SIZE - sizeof(uint32_t) + 1]; uint8_t cmsgbuf[CMSG_SPACE(sizeof(uint32_t))]; @@ -1206,7 +1219,7 @@ void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) { ssize_t rsize; uint32_t _fwmark; - int fd = fds[i % nfds].fd; + int fd = fds[i].fd; struct iovec iov[2] = { // fwmark @@ -1258,12 +1271,14 @@ void packet_injector(RawSocketInjector &t, vector &rx_fd, int log_interval) cur_ts = get_time_ms(); - t.inject_packet(i % nfds, buf, rsize, ntohl(_fwmark)); + t.inject_packet(i, buf, rsize, ntohl(_fwmark)); if (cur_ts >= log_send_ts) // log timeout expired { + // Save current index and go to outer loop // We need to transmit all packets from the queue before tx card switch - start_fd_idx = i % nfds; + start_fd_idx = i; + rc = 0; break; } } diff --git a/wfb_ng/__init__.py b/wfb_ng/__init__.py index 6595d51..9e8e2c8 100644 --- a/wfb_ng/__init__.py +++ b/wfb_ng/__init__.py @@ -25,6 +25,10 @@ import threading import atexit import time +from twisted.python import runtime +runtime.seconds = time.monotonic +runtime.Platform.seconds = staticmethod(time.monotonic) + from twisted.internet import utils, reactor from logging import currentframe from twisted.python import log diff --git a/wfb_ng/cluster.py b/wfb_ng/cluster.py index 5b7f0b7..d47c7bf 100644 --- a/wfb_ng/cluster.py +++ b/wfb_ng/cluster.py @@ -119,6 +119,7 @@ iw reg set {{ settings.common.wifi_region }} if which nmcli > /dev/null && ! nmcli device show {{ wlan }} | grep -q '(unmanaged)' then nmcli device set {{ wlan }} managed no + sleep 1 fi ip link set {{ wlan }} down diff --git a/wfb_ng/tests/test_twisted.py b/wfb_ng/tests/test_twisted.py new file mode 100644 index 0000000..783e021 --- /dev/null +++ b/wfb_ng/tests/test_twisted.py @@ -0,0 +1,7 @@ +from twisted.trial import unittest +from twisted.internet import reactor + + +class ClockTestCase(unittest.TestCase): + def test_reactor_has_monitonic_clock(self): + self.assertLess(reactor.seconds(), 1000000000) diff --git a/wfb_ng/tuntap.py b/wfb_ng/tuntap.py index 76ad160..206494c 100644 --- a/wfb_ng/tuntap.py +++ b/wfb_ng/tuntap.py @@ -29,6 +29,7 @@ from twisted.internet.protocol import Protocol, connectionDone from pyroute2 import IPRoute from contextlib import closing +from .conf import settings from .proxy import ProxyProtocol @@ -129,7 +130,7 @@ class TUNTAPTransport(abstract.FileDescriptor): class TUNTAPProtocol(Protocol, ProxyProtocol): noisy = False - keepalive_interval = 0.9 + keepalive_interval = 0.5 * settings.common.log_interval / 1000.0 def __init__(self, mtu, agg_timeout=None): self.all_peers = [] @@ -140,6 +141,8 @@ class TUNTAPProtocol(Protocol, ProxyProtocol): # Sent keepalive packets self.lc = task.LoopingCall(self.send_keepalive) self.lc.start(self.keepalive_interval, now=False) + self.pkt_in_sem = 0 + self.pkt_out_sem = 0 def _send_to_all_peers(self, data): for peer in self.all_peers: @@ -151,7 +154,9 @@ class TUNTAPProtocol(Protocol, ProxyProtocol): # call from peer only! def write(self, msg): - # Remove keepalive messages + self.pkt_in_sem = 2 + + # Ignore incoming keepalive messages if self.transport is None or not msg: return @@ -173,11 +178,25 @@ class TUNTAPProtocol(Protocol, ProxyProtocol): i += pkt_size def send_keepalive(self): - # Send keepalive message via all antennas. + # Send keepalive messages: + # 1. via all antennas if no RX from peer during 2 keepalive intervals + # 2. via current antenna if no TX to peer during one keepalive interval + # This allow to use multiple directed antennas on the both ends # and/or use different frequency channels on different cards. - self._send_to_all_peers(b'') + + if self.pkt_in_sem == 0: + self._send_to_all_peers(b'') + + elif self.pkt_out_sem == 0: + self._send_to_peer(b'') + + if self.pkt_in_sem > 0: + self.pkt_in_sem -= 1 + + if self.pkt_out_sem > 0: + self.pkt_out_sem -= 1 def dataReceived(self, data): - self.lc.reset() # reset keepalive timer + self.pkt_out_sem = 1 return self.messageReceived(struct.pack('!H', len(data)) + data)