mirror of
https://github.com/ArduPilot/ardupilot
synced 2025-01-14 12:48:31 -04:00
299 lines
12 KiB
Python
299 lines
12 KiB
Python
# Software License Agreement (BSD License)
|
|
#
|
|
# Copyright (c) 2009, Willow Garage, Inc.
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions
|
|
# are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above
|
|
# copyright notice, this list of conditions and the following
|
|
# disclaimer in the documentation and/or other materials provided
|
|
# with the distribution.
|
|
# * Neither the name of Willow Garage, Inc. nor the names of its
|
|
# contributors may be used to endorse or promote products derived
|
|
# from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import os
|
|
import sys
|
|
|
|
import random
|
|
|
|
def test_bare_msg_type():
|
|
import genmsg.msgs
|
|
tests = [(None, None), ('String', 'String'), ('std_msgs/String', 'std_msgs/String'),
|
|
('String[10]', 'String'), ('string[10]', 'string'), ('std_msgs/String[10]', 'std_msgs/String'),
|
|
]
|
|
for val, res in tests:
|
|
assert res == genmsg.msgs.bare_msg_type(val)
|
|
|
|
PKG = 'genmsg'
|
|
|
|
def test_resolve_type():
|
|
from genmsg.msgs import resolve_type, bare_msg_type
|
|
for t in ['string', 'string[]', 'string[14]', 'int32', 'int32[]']:
|
|
bt = bare_msg_type(t)
|
|
t == resolve_type(t, PKG)
|
|
|
|
assert 'foo/string' == resolve_type('foo/string', PKG)
|
|
assert 'std_msgs/Header' == resolve_type('Header', 'roslib')
|
|
assert 'std_msgs/Header' == resolve_type('std_msgs/Header', 'roslib')
|
|
assert 'std_msgs/Header' == resolve_type('Header', 'stereo_msgs')
|
|
assert 'std_msgs/String' == resolve_type('String', 'std_msgs')
|
|
assert 'std_msgs/String' == resolve_type('std_msgs/String', 'std_msgs')
|
|
assert 'std_msgs/String' == resolve_type('std_msgs/String', PKG)
|
|
assert 'std_msgs/String[]' == resolve_type('std_msgs/String[]', PKG)
|
|
|
|
def test_parse_type():
|
|
import genmsg.msgs
|
|
tests = [
|
|
('a', ('a', False, None)),
|
|
('int8', ('int8', False, None)),
|
|
('std_msgs/String', ('std_msgs/String', False, None)),
|
|
('a[]', ('a', True, None)),
|
|
('int8[]', ('int8', True, None)),
|
|
('std_msgs/String[]', ('std_msgs/String', True, None)),
|
|
('a[1]', ('a', True, 1)),
|
|
('int8[1]', ('int8', True, 1)),
|
|
('std_msgs/String[1]', ('std_msgs/String', True, 1)),
|
|
('a[11]', ('a', True, 11)),
|
|
('int8[11]', ('int8', True, 11)),
|
|
('std_msgs/String[11]', ('std_msgs/String', True, 11)),
|
|
]
|
|
for val, res in tests:
|
|
assert res == genmsg.msgs.parse_type(val)
|
|
|
|
fail = ['a[1][2]', 'a[][]', '', None, 'a[', 'a[[1]', 'a[1]]']
|
|
for f in fail:
|
|
try:
|
|
genmsg.msgs.parse_type(f)
|
|
assert False, "should have failed on %s"%f
|
|
except ValueError as e:
|
|
pass
|
|
|
|
def test_Constant():
|
|
import genmsg.msgs
|
|
vals = [random.randint(0, 1000) for i in range(0, 3)]
|
|
type_, name, val = [str(x) for x in vals]
|
|
x = genmsg.msgs.Constant(type_, name, val, str(val))
|
|
assert type_ == x.type
|
|
assert name == x.name
|
|
assert val == x.val
|
|
assert x == genmsg.msgs.Constant(type_, name, val, str(val))
|
|
|
|
assert x != 1
|
|
assert not x == 1
|
|
assert x != genmsg.msgs.Constant('baz', name, val, str(val))
|
|
assert x != genmsg.msgs.Constant(type_, 'foo', val, str(val))
|
|
assert x != genmsg.msgs.Constant(type_, name, 'foo', 'foo')
|
|
|
|
# tripwire
|
|
assert repr(x)
|
|
assert str(x)
|
|
|
|
try:
|
|
genmsg.msgs.Constant(None, name, val, str(val))
|
|
assert False, "should have raised"
|
|
except: pass
|
|
try:
|
|
genmsg.msgs.Constant(type_, None, val, str(val))
|
|
assert False, "should have raised"
|
|
except: pass
|
|
try:
|
|
genmsg.msgs.Constant(type_, name, None, 'None')
|
|
assert False, "should have raised"
|
|
except: pass
|
|
try:
|
|
genmsg.msgs.Constant(type_, name, val, None)
|
|
assert False, "should have raised"
|
|
except: pass
|
|
|
|
try:
|
|
x.foo = 'bar'
|
|
assert False, 'Constant should not allow arbitrary attr assignment'
|
|
except: pass
|
|
|
|
def test_MsgSpec():
|
|
def sub_test_MsgSpec(types, names, constants, text, full_name, has_header):
|
|
m = MsgSpec(types, names, constants, text, full_name)
|
|
assert m.types == types
|
|
assert m.names == names
|
|
assert m.text == text
|
|
assert has_header == m.has_header()
|
|
assert m.constants == constants
|
|
assert list(zip(types, names)) == m.fields()
|
|
assert m == MsgSpec(types, names, constants, text, full_name)
|
|
return m
|
|
|
|
from genmsg import MsgSpec, InvalidMsgSpec
|
|
from genmsg.msgs import Field
|
|
|
|
# don't allow duplicate fields
|
|
try:
|
|
MsgSpec(['int32', 'int64'], ['x', 'x'], [], 'int32 x\nint64 x', 'x/DupFields')
|
|
assert False, "should have raised"
|
|
except InvalidMsgSpec:
|
|
pass
|
|
# don't allow invalid fields
|
|
try:
|
|
MsgSpec(['string['], ['x'], [], 'int32 x\nint64 x', 'x/InvalidFields')
|
|
assert False, "should have raised"
|
|
except InvalidMsgSpec:
|
|
pass
|
|
|
|
# allow empty msg
|
|
empty = sub_test_MsgSpec([], [], [], '', 'x/Nothing', False)
|
|
assert [] == empty.fields()
|
|
assert [] == empty.parsed_fields()
|
|
assert 'x/Nothing' == empty.full_name
|
|
assert 'x' == empty.package
|
|
assert 'Nothing' == empty.short_name
|
|
|
|
# one-field
|
|
one_field = sub_test_MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneInt', False)
|
|
# make sure that equals tests every declared field
|
|
assert one_field == MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneInt')
|
|
assert one_field != MsgSpec(['uint32'], ['x'], [], 'int32 x', 'x/OneInt')
|
|
assert one_field != MsgSpec(['int32'], ['y'], [], 'int32 x', 'x/OneInt')
|
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'uint32 x', 'x/OneInt')
|
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'int32 x', 'x/OneIntBad')
|
|
# test against __ne__ as well
|
|
assert one_field != MsgSpec(['int32'], ['x'], [], 'uint32 x', 'x/OneInt')
|
|
assert [Field('x', 'int32')] == one_field.parsed_fields(), "%s vs %s"%([Field('x', 'int32')], one_field.parsed_fields())
|
|
#test str
|
|
assert "int32 x" == str(one_field).strip()
|
|
|
|
# test variations of multiple fields and headers
|
|
two_fields = sub_test_MsgSpec(['int32', 'string'], ['x', 'str'], [], 'int32 x\nstring str', 'x/TwoFields', False)
|
|
assert [Field('x', 'int32'), Field('str', 'string')] == two_fields.parsed_fields()
|
|
|
|
one_header = sub_test_MsgSpec(['std_msgs/Header'], ['header'], [], 'Header header', 'x/OneHeader', True)
|
|
header_and_fields = sub_test_MsgSpec(['std_msgs/Header', 'int32', 'string'], ['header', 'x', 'str'], [], 'Header header\nint32 x\nstring str', 'x/HeaderAndFields', True)
|
|
embed_types = sub_test_MsgSpec(['std_msgs/Header', 'std_msgs/Int32', 'string'], ['header', 'x', 'str'], [], 'Header header\nstd_msgs/Int32 x\nstring str', 'x/EmbedTypes', True)
|
|
#test strify
|
|
assert "int32 x\nstring str" == str(two_fields).strip()
|
|
|
|
# types and names mismatch
|
|
try:
|
|
MsgSpec(['int32', 'int32'], ['intval'], [], 'int32 intval\int32 y')
|
|
assert False, "types and names must align"
|
|
except: pass
|
|
|
|
# test (not) equals against non msgspec
|
|
assert not (one_field == 1)
|
|
assert one_field != 1
|
|
|
|
# test constants
|
|
from genmsg.msgs import Constant
|
|
msgspec = MsgSpec(['int32'], ['x'], [Constant('int8', 'c', 1, '1')], 'int8 c=1\nuint32 x', 'x/Constants')
|
|
assert msgspec.constants == [Constant('int8', 'c', 1, '1')]
|
|
# tripwire
|
|
str(msgspec)
|
|
repr(msgspec)
|
|
|
|
# test that repr doesn't throw an error
|
|
[repr(x) for x in [empty, one_field, one_header, two_fields, embed_types]]
|
|
|
|
def test_Field():
|
|
from genmsg.msgs import Field
|
|
|
|
field = Field('foo', 'string')
|
|
assert field == Field('foo', 'string')
|
|
assert field != Field('bar', 'string')
|
|
assert field != Field('foo', 'int32')
|
|
assert field != 1
|
|
assert not field == 1
|
|
|
|
assert field.name == 'foo'
|
|
assert field.type == 'string'
|
|
assert field.base_type == 'string'
|
|
assert field.is_array == False
|
|
assert field.array_len == None
|
|
assert field.is_header == False
|
|
assert field.is_builtin == True
|
|
|
|
field = Field('foo', 'std_msgs/String')
|
|
assert field.type == 'std_msgs/String'
|
|
assert field.base_type == 'std_msgs/String'
|
|
assert field.is_array == False
|
|
assert field.array_len == None
|
|
assert field.is_header == False
|
|
assert field.is_builtin == False
|
|
|
|
field = Field('foo', 'std_msgs/String[5]')
|
|
assert field.type == 'std_msgs/String[5]'
|
|
assert field.base_type == 'std_msgs/String'
|
|
assert field.is_array == True
|
|
assert field.array_len == 5
|
|
assert field.is_header == False
|
|
assert field.is_builtin == False
|
|
|
|
field = Field('foo', 'std_msgs/String[]')
|
|
assert field.type == 'std_msgs/String[]'
|
|
assert field.base_type == 'std_msgs/String'
|
|
assert field.is_array == True
|
|
assert field.array_len == None
|
|
assert field.is_header == False
|
|
assert field.is_builtin == False
|
|
|
|
field = Field('foo', 'std_msgs/Header')
|
|
assert field.type == 'std_msgs/Header'
|
|
assert field.is_header == True
|
|
assert field.is_builtin == False
|
|
|
|
field = Field('foo', 'std_msgs/Header[]')
|
|
assert field.type == 'std_msgs/Header[]'
|
|
assert field.is_header == False
|
|
|
|
#tripwire
|
|
repr(field)
|
|
|
|
def test_is_valid_msg_type():
|
|
import genmsg.msgs
|
|
vals = [
|
|
#basic
|
|
'F', 'f', 'Foo', 'Foo1',
|
|
'std_msgs/String',
|
|
# arrays
|
|
'Foo[]', 'Foo[1]', 'Foo[10]',
|
|
]
|
|
for v in vals:
|
|
assert genmsg.msgs.is_valid_msg_type(v), "genmsg.msgs.is_valid_msg_type should have returned True for '%s'"%v
|
|
|
|
# bad cases
|
|
vals = [None, '', '#', '%', 'Foo%', 'Woo Woo',
|
|
'/', '/String',
|
|
'Foo[f]', 'Foo[1d]', 'Foo[-1]', 'Foo[1:10]', 'Foo[', 'Foo]', 'Foo[]Bar']
|
|
for v in vals:
|
|
assert not genmsg.msgs.is_valid_msg_type(v), "genmsg.msgs.is_valid_msg_type should have returned False for '%s'"%v
|
|
|
|
def test_is_valid_constant_type():
|
|
import genmsg.msgs
|
|
valid = ['int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', \
|
|
'uint64', 'float32', 'float64', 'char', 'byte', 'string']
|
|
invalid = [
|
|
'std_msgs/String', '/', 'String',
|
|
'time', 'duration','header',
|
|
]
|
|
for v in valid:
|
|
assert genmsg.msgs.is_valid_constant_type(v), "genmsg.msgs.is_valid_constant_type should have returned True for '%s'"%v
|
|
for v in invalid:
|
|
assert not genmsg.msgs.is_valid_constant_type(v), "genmsg.msgs.is_valid_constant_type should have returned False for '%s'"%v
|
|
|