ardupilot/libraries/AP_Scripting/examples/net_test.lua

120 lines
3.6 KiB
Lua

--[[
example script to test lua socket API
--]]
local MAV_SEVERITY = {EMERGENCY=0, ALERT=1, CRITICAL=2, ERROR=3, WARNING=4, NOTICE=5, INFO=6, DEBUG=7}
PARAM_TABLE_KEY = 46
PARAM_TABLE_PREFIX = "NT_"
-- add a parameter and bind it to a variable
function bind_add_param(name, idx, default_value)
assert(param:add_param(PARAM_TABLE_KEY, idx, name, default_value), string.format('could not add param %s', name))
return Parameter(PARAM_TABLE_PREFIX .. name)
end
-- Setup Parameters
assert(param:add_table(PARAM_TABLE_KEY, PARAM_TABLE_PREFIX, 6), 'net_test: could not add param table')
--[[
// @Param: NT_ENABLE
// @DisplayName: enable network tests
// @Description: Enable network tests
// @Values: 0:Disabled,1:Enabled
// @User: Standard
--]]
local NT_ENABLE = bind_add_param('ENABLE', 1, 0)
if NT_ENABLE:get() == 0 then
return
end
local NT_TEST_IP = { bind_add_param('TEST_IP0', 2, 192),
bind_add_param('TEST_IP1', 3, 168),
bind_add_param('TEST_IP2', 4, 13),
bind_add_param('TEST_IP3', 5, 15) }
local NT_BIND_PORT = bind_add_param('BIND_PORT', 6, 15001)
local PORT_ECHO = 7
gcs:send_text(MAV_SEVERITY.INFO, "net_test: starting")
local function test_ip()
return string.format("%u.%u.%u.%u", NT_TEST_IP[1]:get(), NT_TEST_IP[2]:get(), NT_TEST_IP[3]:get(), NT_TEST_IP[4]:get())
end
local counter = 0
local sock_tcp_echo = Socket(0)
local sock_udp_echo = Socket(1)
local sock_tcp_in = Socket(0)
local sock_tcp_in2 = nil
local sock_udp_in = Socket(1)
assert(sock_tcp_echo, "net_test: failed to create tcp echo socket")
assert(sock_udp_echo, "net_test: failed to create udp echo socket")
assert(sock_tcp_in:bind("0.0.0.0",
NT_BIND_PORT:get()), string.format("net_test: failed to bind to TCP %u", NT_BIND_PORT:get()))
assert(sock_tcp_in:listen(1), "net_test: failed to listen")
assert(sock_udp_in:bind("0.0.0.0",
NT_BIND_PORT:get()), string.format("net_test: failed to bind to UDP %u", NT_BIND_PORT:get()))
--[[
test TCP or UDP echo
--]]
local function test_echo(name, sock)
if not sock:is_connected() then
if not sock:connect(test_ip(), PORT_ECHO) then
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_echo(%s): failed to connect", name))
return
end
if not sock:set_blocking(true) then
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_echo(%s): failed to set blocking", name))
return
end
end
local s = string.format("testing %u", counter)
local nsent = sock:send(s, #s)
if nsent ~= #s then
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_echo(%s): failed to send", name))
return
end
local r = sock:recv(#s)
if r then
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_echo(%s): got reply '%s'", name, r))
end
end
--[[
test a simple server
--]]
local function test_server(name, sock)
if name == "TCP" then
if not sock_tcp_in2 then
sock_tcp_in2 = sock:accept()
if not sock_tcp_in2 then
return
end
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_server(%s): new connection", name))
end
sock = sock_tcp_in2
end
local r = sock:recv(1024)
if r and #r > 0 then
gcs:send_text(MAV_SEVERITY.ERROR, string.format("test_server(%s): got input '%s'", name, r))
end
end
local function update()
test_echo("TCP", sock_tcp_echo)
test_echo("UDP", sock_udp_echo)
test_server("TCP", sock_tcp_in)
test_server("UDP", sock_udp_in)
counter = counter + 1
return update,1000
end
return update,100