Ardupilot2/mk/PX4/Tools/genmsg/test/test_genmsg_msg_loader.py

685 lines
28 KiB
Python
Raw Normal View History

# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import random
def get_test_dir():
return os.path.abspath(os.path.join(os.path.dirname(__file__), 'files'))
def test_exceptions():
from genmsg import MsgNotFound
try:
raise MsgNotFound('hello')
except MsgNotFound:
pass
def test__convert_constant_value():
from genmsg.msg_loader import convert_constant_value
from genmsg import InvalidMsgSpec
assert 0. == convert_constant_value('float32', '0.0')
assert 0. == convert_constant_value('float64', '0.0')
assert 'fo o' == convert_constant_value('string', ' fo o ')
assert 1 == convert_constant_value('byte', '1')
assert 1 == convert_constant_value('char', '1')
assert 1 == convert_constant_value('int8', '1')
assert 12 == convert_constant_value('int16', '12')
assert -13 == convert_constant_value('int32', '-13')
assert 14 == convert_constant_value('int64', '14')
assert 0 == convert_constant_value('uint8', '0')
assert 18 == convert_constant_value('uint16', '18')
assert 19 == convert_constant_value('uint32', '19')
assert 20 == convert_constant_value('uint64', '20')
assert True == convert_constant_value('bool', '1')
assert False == convert_constant_value('bool', '0')
width_fail = [('int8', '129'), ('uint8', '256'),
('int16', '35536'), ('uint16', '-1'),('uint16', '65536'),
('int32', '3000000000'),('int32', '-2700000000'),
('uint32', '-1'),('uint32', '41000000000'),
('uint64', '-1')]
for t, v in width_fail:
try:
convert_constant_value(t, v)
assert False, "should have failed width check: %s, %s"%(t, v)
except InvalidMsgSpec:
pass
type_fail = [('int32', 'f'), ('float32', 'baz')]
for t, v in type_fail:
try:
convert_constant_value(t, v)
assert False, "should have failed type check: %s, %s"%(t, v)
except ValueError:
pass
try:
convert_constant_value('foo', '1')
assert False, "should have failed invalid type"
except InvalidMsgSpec:
pass
def test__load_constant_line():
from genmsg.msgs import Constant, InvalidMsgSpec
from genmsg.msg_loader import _load_constant_line
try:
_load_constant_line("int8 field=alpha")
assert False, "should have raised"
except InvalidMsgSpec:
pass
try:
_load_constant_line("int8 field=")
assert False, "should have raised"
except InvalidMsgSpec:
pass
try:
_load_constant_line("faketype field=1")
assert False, "should have raised"
except InvalidMsgSpec:
pass
c = _load_constant_line("int8 field=1")
assert c == Constant('int8', 'field', 1, '1')
c = _load_constant_line("string val=hello #world")
assert c == Constant('string', 'val', 'hello #world', 'hello #world')
def test__load_field_line():
from genmsg.msgs import InvalidMsgSpec, Field
from genmsg.msg_loader import _load_field_line, InvalidMsgSpec, Field, is_valid_msg_field_name
try:
_load_field_line("string", 'foo')
assert False, "should have raised"
except InvalidMsgSpec:
pass
assert not is_valid_msg_field_name('string[')
try:
_load_field_line("string data!", 'foo')
assert False, "should have raised"
except InvalidMsgSpec:
pass
try:
_load_field_line("string[ data", 'foo')
assert False, "should have raised"
except InvalidMsgSpec:
pass
f =_load_field_line("string str", 'foo')
assert f == ('string', 'str')
f =_load_field_line("string str #nonsense", 'foo')
assert f == ('string', 'str')
f =_load_field_line("String str #nonsense", '')
assert f == ('String', 'str')
f =_load_field_line("String str #nonsense", 'foo')
assert f == ('foo/String', 'str')
# make sure Header is mapped
f =_load_field_line("Header header #nonsense", 'somewhere')
assert f == ('std_msgs/Header', 'header'), f
f =_load_field_line("Header header #nonsense", '')
assert f == ('std_msgs/Header', 'header'), f
def test_load_msg_from_string():
# make sure Header -> std_msgs/Header conversion works
from genmsg.msgs import Constant
from genmsg.msg_loader import load_msg_from_string, MsgContext
context = MsgContext.create_default()
msgspec = load_msg_from_string(context, "Header header", 'test_pkg/HeaderTest')
print(msgspec)
assert msgspec.has_header()
assert msgspec.types == ['std_msgs/Header']
assert msgspec.names == ['header']
assert msgspec.constants == []
assert msgspec.short_name == 'HeaderTest'
assert msgspec.package == 'test_pkg'
assert msgspec.full_name == 'test_pkg/HeaderTest'
msgspec = load_msg_from_string(context, "int8 c=1\nHeader header\nint64 data", 'test_pkg/HeaderValsTest')
assert msgspec.has_header()
assert msgspec.types == ['std_msgs/Header', 'int64']
assert msgspec.names == ['header', 'data']
assert msgspec.constants == [Constant('int8', 'c', 1, '1')]
assert msgspec.short_name == 'HeaderValsTest'
assert msgspec.package == 'test_pkg'
assert msgspec.full_name == 'test_pkg/HeaderValsTest'
msgspec = load_msg_from_string(context, "string data\nint64 data2", 'test_pkg/ValsTest')
assert not msgspec.has_header()
assert msgspec.types == ['string', 'int64']
assert msgspec.names == ['data', 'data2']
assert msgspec.constants == []
assert msgspec.short_name == 'ValsTest'
assert msgspec.full_name == 'test_pkg/ValsTest'
def _validate_TestString(msgspec):
assert ['caller_id', 'orig_caller_id', 'data'] == msgspec.names, msgspec.names
assert ['string', 'string', 'string'] == msgspec.types, msgspec.types
def test_load_msg_from_file():
from genmsg.msgs import InvalidMsgSpec
from genmsg.msg_loader import load_msg_from_file, MsgContext
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'msg')
test_string_path = os.path.join(test_ros_dir, 'TestString.msg')
msg_context = MsgContext.create_default()
spec = load_msg_from_file(msg_context, test_string_path, 'test_ros/TestString')
assert spec.full_name == 'test_ros/TestString'
assert spec.package == 'test_ros'
assert spec.short_name == 'TestString'
_validate_TestString(spec)
# test repeat
spec_2 = load_msg_from_file(msg_context, test_string_path, 'test_ros/TestString')
assert spec == spec_2
assert spec.package == spec_2.package
assert spec.short_name == spec_2.short_name
# test w/ bad file
test_bad_path = os.path.join(test_ros_dir, 'Bad.msg')
try:
load_msg_from_file(msg_context, test_bad_path, 'test_ros/Bad')
assert False, "should have raised"
except InvalidMsgSpec:
pass
# supposed to register
assert msg_context.is_registered('test_ros/TestString'), msg_context
def test_load_msg_from_string_TestString():
from genmsg.msg_loader import load_msg_from_string, MsgContext
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'msg')
test_string_path = os.path.join(test_ros_dir, 'TestString.msg')
with open(test_string_path) as f:
text = f.read()
msg_context = MsgContext.create_default()
_validate_TestString(load_msg_from_string(msg_context, text, 'test_ros/TestString'))
# supposed to register
assert msg_context.is_registered('test_ros/TestString'), msg_context
def test_load_msg_by_type():
from genmsg.msg_loader import load_msg_by_type, MsgContext, MsgNotFound
test_d = get_test_dir()
geometry_d = os.path.join(test_d, 'geometry_msgs', 'msg')
test_ros_dir = os.path.join(test_d, 'test_ros', 'msg')
test_string_path = os.path.join(test_ros_dir, 'TestString.msg')
search_path = {
'test_ros': [ test_ros_dir ],
'geometry_msgs': [ geometry_d ],
}
msg_context = MsgContext.create_default()
msgspec = load_msg_by_type(msg_context, 'test_ros/TestString', search_path)
_validate_TestString(msgspec)
# supposed to register
assert msg_context.is_registered('test_ros/TestString'), msg_context
# test invalid search path
try:
load_msg_by_type(msg_context, 'test_ros/TestString', [test_string_path])
assert False, "should have raised"
except ValueError:
pass
# test not found
try:
load_msg_by_type(msg_context, 'test_ros/Fake', search_path)
assert False, "should have raised"
except MsgNotFound:
pass
# test all the known geometry msgs
test_d = get_test_dir()
for f in os.listdir(geometry_d):
if f.endswith('.msg'):
short = f[:-4]
msg_type = 'geometry_msgs/%s'%short
spec = load_msg_by_type(msg_context, msg_type, search_path)
assert spec is not None
assert spec.package == 'geometry_msgs'
assert spec.full_name == msg_type
assert spec.short_name == short
with open(os.path.join(geometry_d, f)) as file_h:
assert spec.text == file_h.read()
# all types with 'Stamped' in name have headers
if 'Stamped' in f:
assert spec.has_header(), msg_type
def test_get_msg_file():
from genmsg import MsgNotFound
from genmsg.msg_loader import get_msg_file
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'msg')
test_string_path = os.path.join(test_ros_dir, 'TestString.msg')
search_path = {
'test_ros': [ test_ros_dir ],
}
assert test_string_path == get_msg_file('test_ros', 'TestString', search_path)
try:
get_msg_file('test_ros', 'DNE', search_path)
assert False, "should have raised"
except MsgNotFound:
pass
try:
get_msg_file('bad_pkg', 'TestString', search_path)
assert False, "should have raised"
except MsgNotFound:
pass
# test with invalid search path
try:
get_msg_file('test_ros', 'TestString', [test_string_path])
assert False, "should have raised"
except ValueError:
pass
def test_get_srv_file():
from genmsg import MsgNotFound
from genmsg.msg_loader import get_srv_file
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'srv')
std_srvs_dir = os.path.join(test_d, 'std_srvs', 'srv')
empty_path = os.path.join(std_srvs_dir, 'Empty.srv')
search_path = {
'test_ros': [ test_ros_dir ],
'std_srvs': [ std_srvs_dir ],
}
assert empty_path == get_srv_file('std_srvs', 'Empty', search_path)
try:
get_srv_file('test_ros', 'DNE', search_path)
assert False, "should have raised"
except MsgNotFound:
pass
try:
get_srv_file('bad_pkg', 'TestString', search_path)
assert False, "should have raised"
except MsgNotFound:
pass
# test with invalid search path
try:
get_srv_file('std_srvs', 'Empty', [std_srvs_dir])
assert False, "should have raised"
except ValueError:
pass
def test_MsgContext():
from genmsg.msg_loader import MsgContext, load_msg_from_file
msg_context = MsgContext()
assert not msg_context.is_registered('time')
assert not msg_context.is_registered('duration')
msg_context = MsgContext.create_default()
# tripwires
repr(msg_context)
str(msg_context)
assert msg_context.is_registered('time'), msg_context._registered_packages
assert msg_context.is_registered('duration')
assert not msg_context.is_registered('test_ros/TestString')
assert not msg_context.is_registered('Header')
# start loading stuff into context
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'msg')
test_string_path = os.path.join(test_ros_dir, 'TestString.msg')
spec = load_msg_from_file(msg_context, test_string_path, 'test_ros/TestString')
msg_context.register('test_ros/TestString', spec)
assert msg_context.get_registered('test_ros/TestString') == spec
try:
msg_context.get_registered('bad/TestString')
assert False, 'should have raised'
except KeyError:
pass
assert msg_context.is_registered('test_ros/TestString')
# test Header
assert not msg_context.is_registered('Header')
assert not msg_context.is_registered('std_msgs/Header')
msg_context.register('std_msgs/Header', spec)
assert msg_context.is_registered('std_msgs/Header')
def test_load_srv_from_file():
from genmsg.msg_loader import MsgContext, load_srv_from_file
msg_context = MsgContext.create_default()
d = get_test_dir()
filename = os.path.join(d, 'test_ros', 'srv', 'AddTwoInts.srv')
with open(filename, 'r') as f:
text = f.read()
full_name = 'test_ros/AddTwoInts'
spec = load_srv_from_file(msg_context, filename, full_name)
assert spec == load_srv_from_file(msg_context, filename, full_name)
assert ['int64', 'int64'] == spec.request.types, spec.request.types
assert ['a', 'b'] == spec.request.names
assert text == spec.text
assert full_name == spec.full_name
def test_load_msg_depends():
#TODO: should there just be a 'load_msg, implicit=True?'
from genmsg.msg_loader import MsgContext, load_msg_by_type, load_msg_depends, MsgNotFound
test_d = get_test_dir()
search_path = {
'test_ros': [ os.path.join(test_d, 'test_ros', 'msg') ],
'std_msgs': [ os.path.join(test_d, 'std_msgs', 'msg') ],
'geometry_msgs': [ os.path.join(test_d, 'geometry_msgs', 'msg') ],
'sensor_msgs': [ os.path.join(test_d, 'sensor_msgs', 'msg') ],
'invalid': [ os.path.join(test_d, 'invalid', 'msg') ],
}
# Test not found
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'invalid/BadDepend', search_path)
try:
load_msg_depends(msg_context, root_spec, search_path)
assert False, "should have raised MsgNotFound"
except MsgNotFound:
pass
root_spec = load_msg_by_type(msg_context, 'invalid/BadLocalDepend', search_path)
try:
load_msg_depends(msg_context, root_spec, search_path)
assert False, "should have raised MsgNotFound"
except MsgNotFound:
pass
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'std_msgs/Int32', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'std_msgs', 'msg', 'Int32.msg')
assert file_p == msg_context.get_file('std_msgs/Int32')
assert [] == msg_context.get_depends('std_msgs/Int32')
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'std_msgs/Header', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'std_msgs', 'msg', 'Header.msg')
assert file_p == msg_context.get_file('std_msgs/Header')
assert [] == msg_context.get_depends('std_msgs/Header')
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'Header', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'std_msgs', 'msg', 'Header.msg')
assert file_p == msg_context.get_file('std_msgs/Header')
assert [] == msg_context.get_depends('std_msgs/Header')
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'std_msgs/Int32MultiArray', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'std_msgs', 'msg', 'Int32MultiArray.msg')
assert file_p == msg_context.get_file('std_msgs/Int32MultiArray')
val = msg_context.get_all_depends('std_msgs/Int32MultiArray')
assert set(['std_msgs/MultiArrayLayout', 'std_msgs/MultiArrayDimension']) == set(val), val
assert 2 == len(val), val
val = msg_context.get_depends('std_msgs/Int32MultiArray')
assert set(['std_msgs/MultiArrayLayout']) == set(val), val
for s in ['MultiArrayLayout', 'MultiArrayDimension']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
def test_load_msg_depends_stamped():
#TODO: should there just be a 'load_msg, implicit=True?'
from genmsg.msg_loader import MsgContext, load_msg_by_type, load_msg_depends
test_d = get_test_dir()
geometry_d = os.path.join(test_d, 'geometry_msgs', 'msg')
search_path = {
'test_ros': [ os.path.join(test_d, 'test_ros', 'msg') ],
'std_msgs': [ os.path.join(test_d, 'std_msgs', 'msg') ],
'geometry_msgs': [ geometry_d ],
'sensor_msgs': [ os.path.join(test_d, 'sensor_msgs', 'msg') ],
}
# Test with Stamped and deeper hierarchies, Header
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'geometry_msgs/PoseStamped', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'geometry_msgs', 'msg', 'PoseStamped.msg')
assert file_p == msg_context.get_file('geometry_msgs/PoseStamped')
val = msg_context.get_all_depends('geometry_msgs/PoseStamped')
assert set(['std_msgs/Header', 'geometry_msgs/Pose', 'geometry_msgs/Point', 'geometry_msgs/Quaternion']) == set(val), val
val = msg_context.get_depends('geometry_msgs/PoseStamped')
assert set(['std_msgs/Header', 'geometry_msgs/Pose']) == set(val), val
for s in ['Header']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
for s in ['Pose', 'Point', 'Quaternion']:
file_p = os.path.join(geometry_d, '%s.msg'%s)
assert file_p == msg_context.get_file('geometry_msgs/%s'%s)
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'geometry_msgs/TwistWithCovarianceStamped', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'geometry_msgs', 'msg', 'TwistWithCovarianceStamped.msg')
assert file_p == msg_context.get_file('geometry_msgs/TwistWithCovarianceStamped')
val = msg_context.get_all_depends('geometry_msgs/TwistWithCovarianceStamped')
assert set(['std_msgs/Header', 'geometry_msgs/TwistWithCovariance', 'geometry_msgs/Twist', 'geometry_msgs/Vector3']) == set(val), val
val = msg_context.get_depends('geometry_msgs/TwistWithCovarianceStamped')
assert set(['std_msgs/Header', 'geometry_msgs/TwistWithCovariance']) == set(val), val
for s in ['Header']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
for s in ['TwistWithCovariance', 'Twist', 'Vector3']:
file_p = os.path.join(geometry_d, '%s.msg'%s)
assert file_p == msg_context.get_file('geometry_msgs/%s'%s)
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'sensor_msgs/Imu', search_path)
load_msg_depends(msg_context, root_spec, search_path)
file_p = os.path.join(test_d, 'sensor_msgs', 'msg', 'Imu.msg')
assert file_p == msg_context.get_file('sensor_msgs/Imu')
val = msg_context.get_all_depends('sensor_msgs/Imu')
assert set(['std_msgs/Header', 'geometry_msgs/Quaternion', 'geometry_msgs/Vector3']) == set(val), val
val = msg_context.get_depends('sensor_msgs/Imu')
assert set(['std_msgs/Header', 'geometry_msgs/Quaternion', 'geometry_msgs/Vector3']) == set(val), val
for s in ['Header']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
for s in ['Quaternion', 'Vector3']:
file_p = os.path.join(geometry_d, '%s.msg'%s)
assert file_p == msg_context.get_file('geometry_msgs/%s'%s)
def test_load_depends_msg():
from genmsg.msg_loader import MsgContext, load_msg_by_type, load_depends, MsgNotFound, load_srv_by_type
test_d = get_test_dir()
geometry_d = os.path.join(test_d, 'geometry_msgs', 'msg')
msg_search_path = {
'test_ros': [ os.path.join(test_d, 'test_ros', 'msg') ],
'std_msgs': [ os.path.join(test_d, 'std_msgs', 'msg') ],
'geometry_msgs': [ geometry_d ],
'sensor_msgs': [ os.path.join(test_d, 'sensor_msgs', 'msg') ],
'invalid': [ os.path.join(test_d, 'invalid', 'msg') ],
}
# Test not found
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'invalid/BadDepend', msg_search_path)
try:
load_depends(msg_context, root_spec, msg_search_path)
assert False, "should have raised MsgNotFound"
except MsgNotFound:
pass
root_spec = load_msg_by_type(msg_context, 'invalid/BadLocalDepend', msg_search_path)
try:
load_depends(msg_context, root_spec, msg_search_path)
assert False, "should have raised MsgNotFound"
except MsgNotFound:
pass
# Test with msgs
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'geometry_msgs/PoseStamped', msg_search_path)
load_depends(msg_context, root_spec, msg_search_path)
file_p = os.path.join(test_d, 'geometry_msgs', 'msg', 'PoseStamped.msg')
assert file_p == msg_context.get_file('geometry_msgs/PoseStamped')
val = msg_context.get_all_depends('geometry_msgs/PoseStamped')
assert set(['std_msgs/Header', 'geometry_msgs/Pose', 'geometry_msgs/Point', 'geometry_msgs/Quaternion']) == set(val), val
val = msg_context.get_depends('geometry_msgs/PoseStamped')
assert set(['std_msgs/Header', 'geometry_msgs/Pose']) == set(val), val
for s in ['Header']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
for s in ['Pose', 'Point', 'Quaternion']:
file_p = os.path.join(geometry_d, '%s.msg'%s)
assert file_p == msg_context.get_file('geometry_msgs/%s'%s)
msg_context = MsgContext.create_default()
root_spec = load_msg_by_type(msg_context, 'sensor_msgs/Imu', msg_search_path)
load_depends(msg_context, root_spec, msg_search_path)
file_p = os.path.join(test_d, 'sensor_msgs', 'msg', 'Imu.msg')
assert file_p == msg_context.get_file('sensor_msgs/Imu')
val = msg_context.get_depends('sensor_msgs/Imu')
assert set(['std_msgs/Header', 'geometry_msgs/Quaternion', 'geometry_msgs/Vector3']) == set(val), val
for s in ['Header']:
file_p = os.path.join(test_d, 'std_msgs', 'msg', '%s.msg'%s)
assert file_p == msg_context.get_file('std_msgs/%s'%s)
for s in ['Quaternion', 'Vector3']:
file_p = os.path.join(geometry_d, '%s.msg'%s)
assert file_p == msg_context.get_file('geometry_msgs/%s'%s)
def test_load_depends_srv():
from genmsg.msg_loader import MsgContext, load_msg_by_type, load_depends, MsgNotFound, load_srv_by_type
test_d = get_test_dir()
geometry_d = os.path.join(test_d, 'geometry_msgs', 'msg')
msg_search_path = {
'test_ros': [ os.path.join(test_d, 'test_ros', 'msg') ],
'std_msgs': [ os.path.join(test_d, 'std_msgs', 'msg') ],
'geometry_msgs': [ geometry_d ],
'sensor_msgs': [ os.path.join(test_d, 'sensor_msgs', 'msg') ],
'invalid': [ os.path.join(test_d, 'invalid', 'msg') ],
}
# Test with srvs
srv_search_path = {
'test_ros': [ os.path.join(test_d, 'test_ros', 'srv') ],
'std_srvs': [ os.path.join(test_d, 'std_srvs', 'srv') ],
}
msg_context = MsgContext.create_default()
root_spec = load_srv_by_type(msg_context, 'test_ros/AddTwoInts', srv_search_path)
load_depends(msg_context, root_spec, msg_search_path)
val = msg_context.get_depends('test_ros/AddTwoIntsRequest')
assert val == [], val
val = msg_context.get_depends('test_ros/AddTwoIntsResponse')
assert val == [], val
# test with srv that has depends
msg_context = MsgContext.create_default()
response_deps = ['std_msgs/Header', 'geometry_msgs/Pose', 'geometry_msgs/PoseStamped', 'geometry_msgs/Point', 'geometry_msgs/Quaternion']
root_spec = load_srv_by_type(msg_context, 'test_ros/GetPoseStamped', srv_search_path)
load_depends(msg_context, root_spec, msg_search_path)
for d in response_deps:
assert msg_context.is_registered(d)
val = msg_context.get_depends('test_ros/GetPoseStampedRequest')
assert val == [], val
val = msg_context.get_depends('test_ros/GetPoseStampedResponse')
assert val == ['geometry_msgs/PoseStamped']
# Test with nonsense
class Foo(object): pass
try:
load_depends(msg_context, Foo(), msg_search_path)
assert False, "should have raised"
except ValueError:
pass
def test_load_srv_by_type():
from genmsg.msg_loader import load_srv_by_type, MsgContext, MsgNotFound
test_d = get_test_dir()
test_ros_dir = os.path.join(test_d, 'test_ros', 'srv')
std_srvs_dir = os.path.join(test_d, 'std_srvs', 'srv')
empty_path = os.path.join(std_srvs_dir, 'Empty.srv')
a2i_path = os.path.join(std_srvs_dir, 'AddTwoInts.srv')
search_path = {
'test_ros': [ test_ros_dir ],
'std_srvs': [ std_srvs_dir ],
}
msg_context = MsgContext.create_default()
spec = load_srv_by_type(msg_context, 'std_srvs/Empty', search_path)
assert msg_context.is_registered('std_srvs/EmptyRequest')
assert msg_context.is_registered('std_srvs/EmptyResponse')
assert msg_context.get_registered('std_srvs/EmptyRequest') == spec.request
assert msg_context.get_registered('std_srvs/EmptyResponse') == spec.response
assert msg_context.get_file('std_srvs/EmptyRequest') == empty_path, msg_context.get_file('std_srvs/EmptyRequest')
assert msg_context.get_file('std_srvs/EmptyResponse') == empty_path,msg_context.get_file('std_srvs/EmptyResponse')
assert spec.request.full_name == 'std_srvs/EmptyRequest'
assert spec.response.full_name == 'std_srvs/EmptyResponse'
assert spec.request.short_name == 'EmptyRequest'
assert spec.response.short_name == 'EmptyResponse'
assert spec.request.package == 'std_srvs'
assert spec.response.package == 'std_srvs'
for f in [spec.request.names, spec.request.types, spec.response.names, spec.response.types]:
assert [] == f
spec = load_srv_by_type(msg_context, 'test_ros/AddTwoInts', search_path)
assert msg_context.is_registered('test_ros/AddTwoIntsRequest')
assert msg_context.is_registered('test_ros/AddTwoIntsResponse')
assert msg_context.get_registered('test_ros/AddTwoIntsRequest') == spec.request
assert msg_context.get_registered('test_ros/AddTwoIntsResponse') == spec.response
assert spec.request.types == ['int64', 'int64'], spec.request.types
assert spec.request.names == ['a', 'b'], spec.request.names
assert spec.response.types == ['int64'], spec.response.types
assert spec.response.names == ['sum'], spec.response.names
# test invalid search path
try:
load_srv_by_type(msg_context, 'test_ros/AddTwoInts', [std_srvs_dir])
assert False, "should have raised"
except ValueError:
pass
# test not found
try:
load_srv_by_type(msg_context, 'test_ros/Fake', search_path)
assert False, "should have raised"
except MsgNotFound:
pass