From 7d1f048ca70912812976d2c87cc6bf767f513d07 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sun, 26 Nov 2023 11:46:07 +1100 Subject: [PATCH] AP_Networking: support UDP server, TCP client and TCP server and implement mavlink packetisation and flow control return --- libraries/AP_Networking/AP_Networking.cpp | 2 +- libraries/AP_Networking/AP_Networking.h | 31 +- .../AP_Networking/AP_Networking_address.cpp | 7 +- .../AP_Networking/AP_Networking_address.h | 5 +- .../AP_Networking/AP_Networking_port.cpp | 329 ++++++++++++++++-- .../AP_Networking/AP_Networking_tests.cpp | 45 +++ 6 files changed, 378 insertions(+), 41 deletions(-) diff --git a/libraries/AP_Networking/AP_Networking.cpp b/libraries/AP_Networking/AP_Networking.cpp index 7783adf77d..0349145f64 100644 --- a/libraries/AP_Networking/AP_Networking.cpp +++ b/libraries/AP_Networking/AP_Networking.cpp @@ -63,7 +63,7 @@ const AP_Param::GroupInfo AP_Networking::var_info[] = { // @Param: TESTS // @DisplayName: Test enable flags // @Description: Enable/Disable networking tests - // @Bitmask: 0:UDP echo test,1:TCP echo test + // @Bitmask: 0:UDP echo test,1:TCP echo test, 2:TCP discard test // @RebootRequired: True // @User: Advanced AP_GROUPINFO("TESTS", 7, AP_Networking, param.tests, 0), diff --git a/libraries/AP_Networking/AP_Networking.h b/libraries/AP_Networking/AP_Networking.h index c931fc2698..257a4950c7 100644 --- a/libraries/AP_Networking/AP_Networking.h +++ b/libraries/AP_Networking/AP_Networking.h @@ -184,6 +184,9 @@ private: enum class NetworkPortType { NONE = 0, UDP_CLIENT = 1, + UDP_SERVER = 2, + TCP_CLIENT = 3, + TCP_SERVER = 4, }; // class for NET_Pn_* parameters @@ -199,6 +202,7 @@ private: AP_Networking_IPV4 ip {"0.0.0.0"}; AP_Int32 port; SocketAPM *sock; + SocketAPM *listen_sock; bool is_initialized() override { return true; @@ -207,11 +211,22 @@ private: return false; } - void udp_client_init(const uint32_t size_rx, const uint32_t size_tx); + void wait_startup(); + void udp_client_init(void); + void udp_server_init(void); + void tcp_server_init(void); + void tcp_client_init(void); + void udp_client_loop(void); + void udp_server_loop(void); + void tcp_client_loop(void); + void tcp_server_loop(void); + + bool send_receive(void); private: bool init_buffers(const uint32_t size_rx, const uint32_t size_tx); + void thread_create(AP_HAL::MemberProc); uint32_t txspace() override; void _begin(uint32_t b, uint16_t rxS, uint16_t txS) override; @@ -222,10 +237,22 @@ private: void _flush() override {} bool _discard_input() override; + enum flow_control get_flow_control(void) override; + + uint32_t bw_in_bytes_per_second() const override { + return 1000000UL; + } + ByteBuffer *readbuffer; ByteBuffer *writebuffer; + char thread_name[10]; uint32_t last_size_tx; uint32_t last_size_rx; + bool packetise; + bool connected; + bool have_received; + bool close_on_recv_error; + HAL_Semaphore sem; }; @@ -236,10 +263,12 @@ private: enum { TEST_UDP_CLIENT = (1U<<0), TEST_TCP_CLIENT = (1U<<1), + TEST_TCP_DISCARD = (1U<<2), }; void start_tests(void); void test_UDP_client(void); void test_TCP_client(void); + void test_TCP_discard(void); #endif // AP_NETWORKING_TESTS_ENABLED // ports for registration with serial manager diff --git a/libraries/AP_Networking/AP_Networking_address.cpp b/libraries/AP_Networking/AP_Networking_address.cpp index 5cadbbe6f7..28900e387b 100644 --- a/libraries/AP_Networking/AP_Networking_address.cpp +++ b/libraries/AP_Networking/AP_Networking_address.cpp @@ -6,6 +6,7 @@ #if AP_NETWORKING_ENABLED +#include #include "AP_Networking.h" const AP_Param::GroupInfo AP_Networking_IPV4::var_info[] = { @@ -68,9 +69,11 @@ void AP_Networking_IPV4::set_default_uint32(uint32_t v) } } -const char* AP_Networking_IPV4::get_str() const +const char* AP_Networking_IPV4::get_str() { - return AP_Networking::convert_ip_to_str(get_uint32()); + const auto ip = ntohl(get_uint32()); + inet_ntop(AF_INET, &ip, strbuf, sizeof(strbuf)); + return strbuf; } #endif // AP_NETWORKING_ENABLED diff --git a/libraries/AP_Networking/AP_Networking_address.h b/libraries/AP_Networking/AP_Networking_address.h index a4a3b6f8b2..f985294867 100644 --- a/libraries/AP_Networking/AP_Networking_address.h +++ b/libraries/AP_Networking/AP_Networking_address.h @@ -18,12 +18,15 @@ public: void set_uint32(uint32_t addr); // return address as a null-terminated string - const char* get_str() const; + const char* get_str(); // set default address from a uint32 void set_default_uint32(uint32_t addr); static const struct AP_Param::GroupInfo var_info[]; + +private: + char strbuf[16]; }; /* diff --git a/libraries/AP_Networking/AP_Networking_port.cpp b/libraries/AP_Networking/AP_Networking_port.cpp index e70224cf4b..23c43df133 100644 --- a/libraries/AP_Networking/AP_Networking_port.cpp +++ b/libraries/AP_Networking/AP_Networking_port.cpp @@ -11,6 +11,8 @@ #include #include #include +#include +#include extern const AP_HAL::HAL& hal; @@ -22,11 +24,15 @@ extern const AP_HAL::HAL& hal; #define AP_NETWORKING_PORT_MIN_RXSIZE 2048 #endif +#ifndef AP_NETWORKING_PORT_STACK_SIZE +#define AP_NETWORKING_PORT_STACK_SIZE 1024 +#endif + const AP_Param::GroupInfo AP_Networking::Port::var_info[] = { // @Param: TYPE // @DisplayName: Port type - // @Description: Port type - // @Values: 0:Disabled, 1:UDP client + // @Description: Port type for network serial port. For the two client types a valid destination IP address must be set. For the two server types either 0.0.0.0 or a local address can be used. + // @Values: 0:Disabled, 1:UDP client, 2:TCP client, 3:TCP server // @RebootRequired: True // @User: Advanced AP_GROUPINFO_FLAGS("TYPE", 1, AP_Networking::Port, type, 0, AP_PARAM_FLAG_ENABLE), @@ -66,34 +72,113 @@ void AP_Networking::ports_init(void) case NetworkPortType::NONE: break; case NetworkPortType::UDP_CLIENT: - p.udp_client_init(AP_NETWORKING_PORT_MIN_RXSIZE, AP_NETWORKING_PORT_MIN_TXSIZE); + p.udp_client_init(); + break; + case NetworkPortType::UDP_SERVER: + p.udp_server_init(); + break; + case NetworkPortType::TCP_SERVER: + p.tcp_server_init(); + break; + case NetworkPortType::TCP_CLIENT: + p.tcp_client_init(); break; } - if (p.sock != nullptr) { + if (p.sock != nullptr || p.listen_sock != nullptr) { AP::serialmanager().register_port(&p); } } } +/* + wrapper for thread_create for port functions + */ +void AP_Networking::Port::thread_create(AP_HAL::MemberProc proc) +{ + const uint8_t idx = state.idx - AP_SERIALMANAGER_NET_PORT_1; + hal.util->snprintf(thread_name, sizeof(thread_name), "NET_P%u", unsigned(idx)); + + if (!init_buffers(AP_NETWORKING_PORT_MIN_RXSIZE, AP_NETWORKING_PORT_MIN_TXSIZE)) { + AP_BoardConfig::allocation_error("Failed to allocate %s buffers", thread_name); + return; + } + + if (!hal.scheduler->thread_create(proc, thread_name, AP_NETWORKING_PORT_STACK_SIZE, AP_HAL::Scheduler::PRIORITY_UART, 0)) { + AP_BoardConfig::allocation_error("Failed to allocate %s client thread", thread_name); + } +} + /* initialise a UDP client */ -void AP_Networking::Port::udp_client_init(const uint32_t size_rx, const uint32_t size_tx) +void AP_Networking::Port::udp_client_init(void) { - WITH_SEMAPHORE(sem); - if (!init_buffers(size_rx, size_tx)) { - return; - } - if (sock != nullptr) { - return; - } sock = new SocketAPM(true); if (sock == nullptr) { return; } - if (!hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_client_loop, void), "NET", 2048, AP_HAL::Scheduler::PRIORITY_UART, 0)) { - AP_BoardConfig::allocation_error("Failed to allocate UDP client thread"); + sock->set_blocking(false); + + // setup for packet boundaries if this is mavlink + packetise = (state.protocol == AP_SerialManager::SerialProtocol_MAVLink || + state.protocol == AP_SerialManager::SerialProtocol_MAVLink2); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_client_loop, void)); +} + +/* + initialise a UDP server + */ +void AP_Networking::Port::udp_server_init(void) +{ + sock = new SocketAPM(true); + if (sock == nullptr) { + return; } + sock->set_blocking(false); + + // setup for packet boundaries if this is mavlink + packetise = (state.protocol == AP_SerialManager::SerialProtocol_MAVLink || + state.protocol == AP_SerialManager::SerialProtocol_MAVLink2); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::udp_server_loop, void)); +} + +/* + initialise a TCP server + */ +void AP_Networking::Port::tcp_server_init(void) +{ + listen_sock = new SocketAPM(false); + if (listen_sock == nullptr) { + return; + } + listen_sock->reuseaddress(); + + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::tcp_server_loop, void)); +} + +/* + initialise a TCP client + */ +void AP_Networking::Port::tcp_client_init(void) +{ + sock = new SocketAPM(false); + if (sock != nullptr) { + sock->set_blocking(true); + thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::Port::tcp_client_loop, void)); + } +} + +/* + wait for networking stack to be up + */ +void AP_Networking::Port::wait_startup(void) +{ + while (!hal.scheduler->is_system_initialized()) { + hal.scheduler->delay(100); + } + hal.scheduler->delay(1000); } /* @@ -101,10 +186,7 @@ void AP_Networking::Port::udp_client_init(const uint32_t size_rx, const uint32_t */ void AP_Networking::Port::udp_client_loop(void) { - while (!hal.scheduler->is_system_initialized()) { - hal.scheduler->delay(100); - } - hal.scheduler->delay(1000); + wait_startup(); const char *dest = ip.get_str(); if (!sock->connect(dest, port.get())) { @@ -116,31 +198,188 @@ void AP_Networking::Port::udp_client_loop(void) GCS_SEND_TEXT(MAV_SEVERITY_INFO, "UDP[%u]: connected to %s:%u", (unsigned)state.idx, dest, unsigned(port.get())); - while (true) { - hal.scheduler->delay_microseconds(500); - WITH_SEMAPHORE(sem); + connected = true; - // handle outgoing packets - uint32_t available; - const auto *ptr = writebuffer->readptr(available); - if (ptr != nullptr) { - const auto ret = sock->send(ptr, available); - if (ret > 0) { - writebuffer->advance(ret); + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + active = send_receive(); + } +} + +/* + update a UDP server + */ +void AP_Networking::Port::udp_server_loop(void) +{ + wait_startup(); + + const char *addr = ip.get_str(); + if (!sock->bind(addr, port.get())) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "UDP[%u]: Failed to bind to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + delete sock; + sock = nullptr; + return; + } + sock->reuseaddress(); + + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "UDP[%u]: bound to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + active = send_receive(); + } +} + +/* + update a TCP server + */ +void AP_Networking::Port::tcp_server_loop(void) +{ + wait_startup(); + + const char *addr = ip.get_str(); + if (!listen_sock->bind(addr, port.get()) || !listen_sock->listen(1)) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "TCP[%u]: Failed to bind to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + delete listen_sock; + listen_sock = nullptr; + return; + } + + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: bound to %s:%u", (unsigned)state.idx, addr, unsigned(port.get())); + + close_on_recv_error = true; + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + if (sock == nullptr) { + sock = listen_sock->accept(100); + if (sock != nullptr) { + sock->set_blocking(false); + char buf[16]; + uint16_t last_port; + const char *last_addr = listen_sock->last_recv_address(buf, sizeof(buf), last_port); + if (last_addr != nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: connection from %s:%u", (unsigned)state.idx, last_addr, unsigned(last_port)); + } + connected = true; + sock->reuseaddress(); } } + if (sock != nullptr) { + active = send_receive(); + } + } +} - // handle incoming packets - const auto space = readbuffer->space(); - if (space > 0) { - const uint32_t n = MIN(350U, space); - uint8_t buf[n]; - const auto ret = sock->recv(buf, n, 0); - if (ret > 0) { - readbuffer->write(buf, ret); +/* + update a TCP client + */ +void AP_Networking::Port::tcp_client_loop(void) +{ + wait_startup(); + + close_on_recv_error = true; + + bool active = false; + while (true) { + if (!active) { + hal.scheduler->delay_microseconds(100); + } + if (sock == nullptr) { + sock = new SocketAPM(false); + if (sock == nullptr) { + continue; + } + sock->set_blocking(true); + connected = false; + } + if (!connected) { + const char *dest = ip.get_str(); + connected = sock->connect(dest, port.get()); + if (connected) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: connected to %s:%u", unsigned(state.idx), dest, unsigned(port.get())); + sock->set_blocking(false); + } else { + delete sock; + sock = nullptr; + // don't try and connect too fast + hal.scheduler->delay(100); + } + } + if (sock != nullptr && connected) { + active = send_receive(); + } + } +} + +/* + run one send/receive loop + */ +bool AP_Networking::Port::send_receive(void) +{ + + bool active = false; + WITH_SEMAPHORE(sem); + + // handle incoming packets + const auto space = readbuffer->space(); + if (space > 0) { + const uint32_t n = MIN(300U, space); + uint8_t buf[n]; + const auto ret = sock->recv(buf, n, 0); + if (close_on_recv_error && ret == 0) { + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP[%u]: closed connection", unsigned(state.idx)); + delete sock; + sock = nullptr; + return false; + } + if (ret > 0) { + readbuffer->write(buf, ret); + active = true; + have_received = true; + } + } + + if (connected) { + // handle outgoing packets + uint32_t available = writebuffer->available(); + available = MIN(300U, available); + if (packetise) { + available = mavlink_packetise(*writebuffer, available); + } + if (available > 0) { + uint8_t buf[available]; + auto n = writebuffer->peekbytes(buf, available); + if (n > 0) { + const auto ret = sock->send(buf, n); + if (ret > 0) { + writebuffer->advance(ret); + active = true; + } + } + } + } else { + if (type == NetworkPortType::UDP_SERVER && have_received) { + // connect the socket to the last receive address if we have one + char buf[16]; + uint16_t last_port; + const char *last_addr = sock->last_recv_address(buf, sizeof(buf), last_port); + if (last_addr != nullptr && port != 0) { + connected = sock->connect(last_addr, last_port); } } } + + return active; } /* @@ -210,4 +449,22 @@ bool AP_Networking::Port::init_buffers(const uint32_t size_rx, const uint32_t si return readbuffer != nullptr && writebuffer != nullptr; } +/* + return flow control state + */ +enum AP_HAL::UARTDriver::flow_control AP_Networking::Port::get_flow_control(void) +{ + const NetworkPortType ptype = (NetworkPortType)type; + switch (ptype) { + case NetworkPortType::TCP_CLIENT: + case NetworkPortType::TCP_SERVER: + return AP_HAL::UARTDriver::FLOW_CONTROL_ENABLE; + case NetworkPortType::UDP_CLIENT: + case NetworkPortType::UDP_SERVER: + case NetworkPortType::NONE: + break; + } + return AP_HAL::UARTDriver::FLOW_CONTROL_DISABLE; +} + #endif // AP_NETWORKING_ENABLED diff --git a/libraries/AP_Networking/AP_Networking_tests.cpp b/libraries/AP_Networking/AP_Networking_tests.cpp index 9370d93a23..2fc050d9dd 100644 --- a/libraries/AP_Networking/AP_Networking_tests.cpp +++ b/libraries/AP_Networking/AP_Networking_tests.cpp @@ -28,6 +28,11 @@ void AP_Networking::start_tests(void) "TCP_client", 8192, AP_HAL::Scheduler::PRIORITY_IO, -1); } + if (param.tests & TEST_TCP_DISCARD) { + hal.scheduler->thread_create(FUNCTOR_BIND_MEMBER(&AP_Networking::test_TCP_discard, void), + "TCP_discard", + 8192, AP_HAL::Scheduler::PRIORITY_UART, -1); + } } /* @@ -100,4 +105,44 @@ void AP_Networking::test_TCP_client(void) } } +/* + start TCP discard (throughput) test + */ +void AP_Networking::test_TCP_discard(void) +{ + while (!hal.scheduler->is_system_initialized()) { + hal.scheduler->delay(100); + } + hal.scheduler->delay(1000); + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "TCP_discard: starting"); + const char *dest = param.test_ipaddr.get_str(); + auto *sock = new SocketAPM(false); + if (sock == nullptr) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "TCP_discard: failed to create socket"); + return; + } + // connect to the discard service, which is port 9 + if (!sock->connect(dest, 9)) { + GCS_SEND_TEXT(MAV_SEVERITY_ERROR, "TCP_discard: connect failed"); + return; + } + const uint32_t bufsize = 1024; + uint8_t *buf = (uint8_t*)malloc(bufsize); + for (uint32_t i=0; isend(buf, bufsize); + const uint32_t now = AP_HAL::millis(); + if (now - last_report_ms >= 1000) { + float dt = (now - last_report_ms)*0.001; + GCS_SEND_TEXT(MAV_SEVERITY_INFO, "Discard throughput %.3f kbyte/sec", (total_sent/dt)*1.0e-3); + total_sent = 0; + last_report_ms = now; + } + } +} + #endif // AP_NETWORKING_ENABLED && AP_NETWORKING_TESTS_ENABLED