orb_exists support for muorb

Also added builtin command wait_for_topic

Signed-off-by: Mark Charlebois <charlebm@gmail.com>
This commit is contained in:
Mark Charlebois 2016-06-21 10:39:30 -07:00 committed by Lorenz Meier
parent dc787830b5
commit 62a3e07423
15 changed files with 462 additions and 67 deletions

View File

@ -72,6 +72,18 @@ interface px4muorb{
*/
AEEResult param_update_value_from_shmem( in unsigned long param, rout sequence<octet> value);
/**
* Interface called from krait to inform of a published topic.
* @param topic_name: name of the topic being advertised
*/
AEEResult topic_advertised(in string topic_name);
/**
* Interface called from krait to inform of a published topic.
* @param topic_name: name of the topic being advertised
*/
AEEResult topic_unadvertised(in string topic_name);
/**
* Interface to add a subscriber to the identified topic
* @param topic_name
@ -125,6 +137,7 @@ interface px4muorb{
* 1 ==> add_subscriber
* 2 ==> remove_subscriber
* 3 ==> recieve_topic_data
* 4 ==> advertised_topic
* @note: for message types, 1 & 2, the data pointer returned is null with length of 0.
* @param topic_name
* The topic being returned.
@ -141,7 +154,7 @@ interface px4muorb{
/**
* Since the receive_msg is a blocking call, the client will not be able to peform a clean shutdown. Calling this
* function will unblock the receive msg an return an error code.
* Ideally this should be implemented as a time for the receive msg call.
* Ideally this should be implemented as a timeout for the #receive_msg call.
* @param none
* @return status
* 0 = success

View File

@ -158,6 +158,35 @@ int px4muorb_param_update_value_from_shmem(uint32_t param, const uint8_t *value,
release_shmem_lock(__FILE__, __LINE__);
return 0;
int px4muorb_topic_advertised(const char *topic_name)
{
int rc = 0;
PX4_INFO("TEST px4muorb_topic_advertised of [%s] on remote side...", topic_name);
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
if (rxHandler != nullptr) {
rc = rxHandler->process_remote_topic(topic_name, 1);
} else {
rc = -1;
}
return rc;
}
int px4muorb_topic_unadvertised(const char *topic_name)
{
int rc = 0;
PX4_INFO("TEST px4muorb_topic_unadvertised of [%s] on remote side...", topic_name);
uORB::FastRpcChannel *channel = uORB::FastRpcChannel::GetInstance();
uORBCommunicator::IChannelRxHandler *rxHandler = channel->GetRxHandler();
if (rxHandler != nullptr) {
rc = rxHandler->process_remote_topic(topic_name, 0);
} else {
rc = -1;
}
return rc;
}
int px4muorb_add_subscriber(const char *name)

View File

@ -49,6 +49,10 @@ extern "C" {
int px4muorb_param_update_value_from_shmem(uint32_t param, const uint8_t *value, int data_len_in_bytes) __EXPORT;
int px4muorb_topic_advertised(const char *name) __EXPORT;
int px4muorb_topic_unadvertised(const char *name) __EXPORT;
int px4muorb_add_subscriber(const char *name) __EXPORT;
int px4muorb_remove_subscriber(const char *name) __EXPORT;

View File

@ -68,6 +68,85 @@ uORB::FastRpcChannel::FastRpcChannel()
_RemoteSubscribers.clear();
}
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
{
int16_t rc = 0;
hrt_abstime t1, t2;
static hrt_abstime check_time = 0;
PX4_DEBUG("=========publish topic[%s] to remote...", messageName);
t1 = hrt_absolute_time();
_QueueMutex.lock();
bool overwriteData = false;
if (IsControlQFull()) {
// queue is full. Overwrite the oldest data.
//PX4_WARN("[topic_advertised] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]",
// _ControlQInIndex, _ControlQOutIndex, _MAX_MSG_QUEUE_SIZE);
_ControlQOutIndex++;
if (_ControlQOutIndex == _MAX_MSG_QUEUE_SIZE) {
_ControlQOutIndex = 0;
}
overwriteData = true;
_dropped_pkts++;
}
_ControlMsgQueue[ _ControlQInIndex ]._Type = _CONTROL_MSG_TYPE_ADVERTISE;
_ControlMsgQueue[ _ControlQInIndex ]._MsgName = messageName;
_ControlQInIndex++;
if (_ControlQInIndex == _MAX_MSG_QUEUE_SIZE) {
_ControlQInIndex = 0;
}
// the assumption here is that each caller reads only one data from either control or data queue.
//if (!overwriteData) {
if (ControlQSize() == 1 && DataQSize() == 0) { // post it only of the queue moves from empty to available.
_DataAvailableSemaphore.post();
}
if ((unsigned long)ControlQSize() < _min_q) { _min_q = (unsigned long)ControlQSize(); }
if ((unsigned long)ControlQSize() > _max_q) { _max_q = (unsigned long)ControlQSize(); }
_count++;
_avg_q = ((double)((_avg_q * (_count - 1)) + (unsigned long)(ControlQSize()))) / (double)(_count);
_QueueMutex.unlock();
t2 = hrt_absolute_time();
if ((unsigned long)(t2 - check_time) > 10000000) {
//PX4_DEBUG("MsgName: %20s, t1: %lu, t2: %lu, dt: %lu",messageName, (unsigned long) t1, (unsigned long) t2, (unsigned long) (t2-t1));
//PX4_DEBUG("Q. Stats: min: %lu, max : %lu, avg: %lu count: %lu ", _min_q, _max_q, (unsigned long)(_avg_q * 1000.0), _count);
_min_q = _MAX_MSG_QUEUE_SIZE * 2;
_max_q = 0;
_avg_q = 0;
_count = 0;
check_time = t2;
}
return rc;
}
//==============================================================================
//==============================================================================
/*
//TODO: verify if needed
int16_t uORB::FastRpcChannel::topic_unadvertised(const char *messageName)
{
int16_t rc = 0;
PX4_DEBUG("=========unpublish topic[%s] to remote...", messageName);
return rc;
}
*/
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
@ -176,7 +255,7 @@ int16_t uORB::FastRpcChannel::send_message(const char *messageName, int32_t leng
// the assumption here is that each caller reads only one data from either control or data queue.
//if (!overwriteData) {
if (DataQSize() == 1) { // post it only of the queue moves from empty to available.
if (DataQSize() == 1 && ControlQSize() == 0) { // post it only of the queue moves from empty to available.
_DataAvailableSemaphore.post();
}
@ -403,54 +482,103 @@ int16_t uORB::FastRpcChannel::get_bulk_data
*topic_count = 0;
int32_t topic_count_to_return = 0;
if (DataQSize() != 0) {
//PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() );
topic_count_to_return = DataQSize();
if (DataQSize() != 0 || ControlQSize() != 0) {
if (DataQSize() != 0) {
//PX4_DEBUG( "get_bulk_data: QSize: %d", DataQSize() );
topic_count_to_return = DataQSize();
while (DataQSize() != 0) {
// this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
//_DataAvailableSemaphore.wait();
if (get_data_msg_size_at(_DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
// there is enough space in the buffer, copy the data.
//PX4_DEBUG( "Coping Data to buffer..." );
copy_result = copy_data_to_buffer(_DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
while (DataQSize() != 0) {
// this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
//_DataAvailableSemaphore.wait();
if (get_msg_size_at(true, _DataQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
// there is enough space in the buffer, copy the data.
//PX4_DEBUG( "Coping Data to buffer..." );
copy_result = copy_msg_to_buffer(true, _DataQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
if (copy_result == -1) {
if (copy_result == -1) {
if (bytes_copied == 0) {
rc = -1;
}
break;
} else {
//PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
// buffer[bytes_copied], \
// buffer[bytes_copied+1], \
// buffer[bytes_copied+2], \
// buffer[bytes_copied+3] );
bytes_copied += copy_result;
(*topic_count)++;
*returned_bytes = bytes_copied;
_DataQOutIndex++;
if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) {
_DataQOutIndex = 0;
}
}
} else {
if (bytes_copied == 0) {
rc = -1;
PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
} else {
PX4_DEBUG("Exiting out of the while loop...");
}
break;
} else {
//PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
// buffer[bytes_copied], \
// buffer[bytes_copied+1], \
// buffer[bytes_copied+2], \
// buffer[bytes_copied+3] );
bytes_copied += copy_result;
(*topic_count)++;
*returned_bytes = bytes_copied;
_DataQOutIndex++;
if (_DataQOutIndex == _MAX_MSG_QUEUE_SIZE) {
_DataQOutIndex = 0;
}
}
} else {
if (bytes_copied == 0) {
rc = -1;
PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
} else {
PX4_DEBUG("Exiting out of the while loop...");
}
break;
}
}
if (ControlQSize() != 0) {
//PX4_DEBUG( "get_bulk_data: QSize: %d", ControlQSize() );
topic_count_to_return += ControlQSize();
while (ControlQSize() != 0) {
// this is a hack as we are using a counting semaphore. Should be re-implemented with cond_variable and wait.
//_DataAvailableSemaphore.wait();
if (get_msg_size_at(false, _ControlQOutIndex) < (max_buffer_in_bytes - bytes_copied)) {
// there is enough space in the buffer, copy the data.
//PX4_DEBUG( "Coping Control msg to buffer..." );
copy_result = copy_msg_to_buffer(false, _ControlQOutIndex, buffer, bytes_copied, max_buffer_in_bytes);
if (copy_result == -1) {
if (bytes_copied == 0) {
rc = -1;
}
break;
} else {
//PX4_DEBUG( "[%d] %02x %02x %02x %02x", *topic_count,\
// buffer[bytes_copied], \
// buffer[bytes_copied+1], \
// buffer[bytes_copied+2], \
// buffer[bytes_copied+3] );
bytes_copied += copy_result;
(*topic_count)++;
*returned_bytes = bytes_copied;
_ControlQOutIndex++;
if (_ControlQOutIndex == _MAX_MSG_QUEUE_SIZE) {
_ControlQOutIndex = 0;
}
}
} else {
if (bytes_copied == 0) {
rc = -1;
PX4_WARN("ERROR: Insufficent space in data buffer, no topics returned");
} else {
PX4_DEBUG("Exiting out of the while loop...");
}
break;
}
}
}
} else {
PX4_ERR("[get_data_bulk] Error: Semaphore is up when there is no data on the control/data queues");
rc = -1;
@ -488,20 +616,22 @@ int16_t uORB::FastRpcChannel::get_bulk_data
return rc;
}
int32_t uORB::FastRpcChannel::get_data_msg_size_at(int32_t index)
int32_t uORB::FastRpcChannel::get_msg_size_at(bool isData, int32_t index)
{
// the assumption here is that this is called within the context of semaphore,
// hence lock/unlock is not needed.
int32_t rc = 0;
rc += _DataMsgQueue[ index ]._Length;
rc += _DataMsgQueue[ index ]._MsgName.size() + 1;
if (isData) {
rc += _DataMsgQueue[ index ]._Length;
rc += _DataMsgQueue[ index ]._MsgName.size() + 1;
} else {
rc += _ControlMsgQueue[ index ]._MsgName.size() + 1;
}
rc += _PACKET_HEADER_SIZE;
return rc;
}
int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset,
int32_t uORB::FastRpcChannel::copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset,
int32_t dst_buffer_len)
{
int32_t rc = -1;
@ -510,12 +640,19 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds
// * sem_lock is acquired for data protection
// * the dst_buffer is validated to
uint16_t msg_size = (isData ?
(uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size()) :
(uint16_t)(_ControlMsgQueue[ src_index ]._MsgName.size()));
// compute the different offsets to pack the packets.
int32_t field_header_offset = offset;
int32_t field_topic_name_offset = field_header_offset + sizeof(struct BulkTransferHeader);
int32_t field_data_offset = field_topic_name_offset + _DataMsgQueue[ src_index ]._MsgName.size() + 1;
int32_t field_data_offset = field_topic_name_offset + msg_size + 1;
struct BulkTransferHeader header = { (uint16_t)(_DataMsgQueue[ src_index ]._MsgName.size() + 1), (uint16_t)(_DataMsgQueue[ src_index ]._Length) };
int16_t msg_type = isData ? _DATA_MSG_TYPE : _ControlMsgQueue[ src_index ]._Type;
struct BulkTransferHeader header = { (uint16_t)msg_type,( uint16_t)(msg_size + 1),
(uint16_t)(isData ? (_DataMsgQueue[ src_index ]._Length) : 0) };
//PX4_DEBUG( "Offsets: header[%d] name[%d] data[%d]",
@ -523,7 +660,7 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds
// field_topic_name_offset,
// field_data_offset );
if ((field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) {
if (isData && (field_data_offset + _DataMsgQueue[ src_index ]._Length) < dst_buffer_len) {
memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header));
// pack the data here.
memmove
@ -541,10 +678,32 @@ int32_t uORB::FastRpcChannel::copy_data_to_buffer(int32_t src_index, uint8_t *ds
memmove(&(dst_buffer[field_data_offset]), _DataMsgQueue[ src_index ]._Buffer, _DataMsgQueue[ src_index ]._Length);
rc = field_data_offset + _DataMsgQueue[ src_index ]._Length - offset;
} else if (field_data_offset < dst_buffer_len) { //This is a control message
memmove(&(dst_buffer[field_header_offset]), (char *)(&header), sizeof(header));
// pack the data here.
memmove
(
&(dst_buffer[field_topic_name_offset]),
_ControlMsgQueue[ src_index ]._MsgName.c_str(),
_ControlMsgQueue[ src_index ]._MsgName.size()
);
if (_ControlMsgQueue[ src_index ]._MsgName.size() == 0) {
PX4_WARN("########## Error MsgName cannot be zero: ");
}
dst_buffer[ field_topic_name_offset + _ControlMsgQueue[ src_index ]._MsgName.size()] = '\0';
rc = field_data_offset - offset;
} else {
PX4_WARN("Error coping the DataMsg to dst buffer, insuffienct space. ");
PX4_WARN("... offset[%ld] len[%ld] data_msg_len[%ld]",
PX4_WARN("Error coping the Msg to dst buffer, insuffienct space. ");
if (isData) {
PX4_WARN("Data... offset[%ld] len[%ld] data_msg_len[%ld]",
offset, dst_buffer_len, (field_data_offset - offset) + _DataMsgQueue[ src_index ]._Length);
} else {
PX4_WARN("ControlMsg... offset[%ld] len[%ld]",
offset, dst_buffer_len, (field_data_offset - offset));
}
}
return rc;

View File

@ -57,6 +57,33 @@ public:
return &(_Instance);
}
/**
* @brief Interface to notify the remote entity of a topic being advertised.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
virtual int16_t topic_advertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of a topic being unadvertised
* and is no longer publishing messages.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
//virtual int16_t topic_unadvertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of interest of a
* subscription for a message.
@ -155,10 +182,11 @@ private: // data members
static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1;
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
static const int32_t _DATA_MSG_TYPE = 3;
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4;
static const int32_t _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES = 2;
static const int32_t _PACKET_FIELD_DATA_LEN_IN_BYTES = 2;
static const int32_t _PACKET_HEADER_SIZE =
static const int32_t _PACKET_HEADER_SIZE = 1 + //first byte is the MSG Type
_PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES + _PACKET_FIELD_DATA_LEN_IN_BYTES;
struct FastRpcDataMsg {
@ -174,6 +202,7 @@ private: // data members
};
struct BulkTransferHeader {
uint16_t _MsgType;
uint16_t _MsgNameLen;
uint16_t _DataLen;
};
@ -261,8 +290,8 @@ private://class members.
int32_t DataQSize();
int32_t ControlQSize();
int32_t get_data_msg_size_at(int32_t index);
int32_t copy_data_to_buffer(int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len);
int32_t get_msg_size_at(bool isData, int32_t index);
int32_t copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len);
std::set<std::string> _RemoteSubscribers;
};

View File

@ -278,6 +278,16 @@ bool px4muorb::KraitRpcWrapper::Terminate()
return true;
}
int32_t px4muorb::KraitRpcWrapper::TopicAdvertised(const char *topic)
{
return ((_Initialized) ? px4muorb_topic_advertised(topic) : -1);
}
int32_t px4muorb::KraitRpcWrapper::TopicUnadvertised(const char *topic)
{
return ((_Initialized) ? px4muorb_topic_unadvertised(topic) : -1);
}
int32_t px4muorb::KraitRpcWrapper::AddSubscriber(const char *topic)
{
return ((_Initialized) ? px4muorb_add_subscriber(topic) : -1);

View File

@ -65,6 +65,8 @@ public:
/**
* Muorb related functions to pub/sub of orb topic from krait to adsp
*/
int32_t TopicAdvertised(const char *topic);
int32_t TopicUnadvertised(const char *topic);
int32_t AddSubscriber(const char *topic);
int32_t RemoveSubscriber(const char *topic);
int32_t SendData(const char *topic, int32_t length_in_bytes, const uint8_t *data);

View File

@ -67,6 +67,27 @@ uORB::KraitFastRpcChannel::KraitFastRpcChannel() :
_KraitWrapper.Initialize();
}
int16_t uORB::KraitFastRpcChannel::topic_advertised(const char *messageName)
{
int16_t rc = 0;
PX4_DEBUG("Before calling TopicAdvertised for [%s]\n", messageName);
rc = _KraitWrapper.TopicAdvertised(messageName);
PX4_DEBUG("Response for TopicAdvertised for [%s], rc[%d]\n", messageName, rc);
return rc;
}
/*
//TODO: verify if needed
int16_t uORB::KraitFastRpcChannel::topic_unadvertised(const char *messageName)
{
int16_t rc = 0;
PX4_DEBUG("Before calling TopicUnadvertised for [%s]\n", messageName);
rc = _KraitWrapper.TopicUnadvertised(messageName);
PX4_DEBUG("Response for TopicUnadvertised for [%s], rc[%d]\n", messageName, rc);
return rc;
}
*/
int16_t uORB::KraitFastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
{
int16_t rc = 0;
@ -254,9 +275,14 @@ void uORB::KraitFastRpcChannel::fastrpc_recv_thread()
uint8_t *topic_data = (uint8_t *)(messageName + strlen(messageName) + 1);
if (_RxHandler != nullptr) {
_RxHandler->process_received_message(messageName,
header->_DataLen, topic_data);
//PX4_DEBUG( "Received topic data for control message for: [%s] len[%d]\n", name, data_length );
if (header->_MsgType == _DATA_MSG_TYPE) {
//PX4_DEBUG( "Received topic data for: [%s] len[%d]\n", messageName, data_length );
_RxHandler->process_received_message(messageName,
header->_DataLen, topic_data);
} else if (header->_MsgType == _CONTROL_MSG_TYPE_ADVERTISE) {
PX4_DEBUG( "Received topic for control message for: [%s] len[%d]\n", messageName, data_length );
_RxHandler->process_remote_topic(messageName, true);
}
}
bytes_processed += header->_MsgNameLen + header->_DataLen + sizeof(struct BulkTransferHeader);

View File

@ -70,6 +70,32 @@ public:
return (_InstancePtr != nullptr);
}
/**
* @brief Interface to notify the remote entity of a topic being advertised.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
virtual int16_t topic_advertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of a topic being unadvertised
* and is no longer publishing messages.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
//virtual int16_t topic_unadvertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of interest of a
@ -141,8 +167,10 @@ private: // data members
static const int32_t _CONTROL_MSG_TYPE_ADD_SUBSCRIBER = 1;
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
static const int32_t _DATA_MSG_TYPE = 3;
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4;
struct BulkTransferHeader {
uint16_t _MsgType;
uint16_t _MsgNameLen;
uint16_t _DataLen;
};

View File

@ -928,16 +928,7 @@ bool copy_if_updated_multi(orb_id_t topic, int multi_instance, int *handle, void
bool updated = false;
if (*handle < 0) {
#if __PX4_POSIX_EAGLE
// The orb_exists call doesn't work correctly on Snapdragon yet.
// (No data gets sent from the QURT to the Linux side because there
// are no subscribers. However, there won't be any subscribers, if
// they check using orb_exists() before subscribing.)
if (true)
#else
if (OK == orb_exists(topic, multi_instance))
#endif
{
*handle = orb_subscribe_multi(topic, multi_instance);
/* copy first data */

View File

@ -56,6 +56,33 @@ public:
// INTERFACES FOR Control messages over a channel.
//=========================================================================
/**
* @brief Interface to notify the remote entity of a topic being advertised.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
virtual int16_t topic_advertised(const char *messageName) = 0;
/**
* @brief Interface to notify the remote entity of a topic being unadvertised
* and is no longer publishing messages.
*
* @param messageName
* This represents the uORB message name(aka topic); This message name should be
* globally unique.
* @return
* 0 = success; This means the messages is successfully sent to the receiver
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
//virtual int16_t topic_unadvertised(const char *messageName) = 0;
/**
* @brief Interface to notify the remote entity of interest of a
* subscription for a message.
@ -127,6 +154,21 @@ class uORBCommunicator::IChannelRxHandler
{
public:
/**
* Interface to process a received topic from remote.
* @param topic_name
* This represents the uORB message Name (topic); This message Name should be
* globally unique.
* @param isAdvertisement
* Represents if the topic has been advertised or is no longer avialable.
* @return
* 0 = success; This means the messages is successfully handled in the
* handler.
* otherwise = failure.
*/
virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement) = 0;
/**
* Interface to process a received AddSubscription from remote.
* @param messageName

View File

@ -489,6 +489,26 @@ int uORB::DeviceNode::unadvertise(orb_advert_t handle)
return PX4_OK;
}
int16_t uORB::DeviceNode::topic_advertised(const orb_metadata *meta, int priority)
{
uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator();
if (ch != nullptr && meta != nullptr) {
return ch->topic_advertised(meta->o_name);
}
return -1;
}
/*
//TODO: Check if we need this since we only unadvertise when things all shutdown and it doesn't actually remove the device
int16_t uORB::DeviceNode::topic_unadvertised(const orb_metadata *meta, int priority)
{
uORBCommunicator::IChannel *ch = uORB::Manager::get_instance()->get_uorb_communicator();
if (ch != nullptr && meta != nullptr) {
return ch->topic_unadvertised(meta->o_name);
}
return -1;
}
*/
pollevent_t
uORB::DeviceNode::poll_state(device::file_t *filp)
{

View File

@ -125,6 +125,9 @@ public:
static int unadvertise(orb_advert_t handle);
static int16_t topic_advertised(const orb_metadata *meta, int priority);
//static int16_t topic_unadvertised(const orb_metadata *meta, int priority);
/**
* processes a request for add subscription from remote
* @param rateInHz

View File

@ -131,11 +131,15 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance)
return ERROR;
}
#ifdef __PX4_NUTTX
#if __PX4_NUTTX
struct stat buffer;
return stat(path, &buffer);
#else
return px4_access(path, F_OK);
ret = px4_access(path, F_OK);
if (ret == -1 && meta != nullptr && _remote_topics.size() > 0) {
ret = (_remote_topics.find(meta->o_name) != _remote_topics.end());
}
return ret;
#endif
}
@ -197,6 +201,9 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
return nullptr;
}
//For remote systems call over and inform them
uORB::DeviceNode::topic_advertised(meta, priority);
/* the advertiser must perform an initial publish to initialise the object */
result = orb_publish(meta, advertiser, data);
@ -433,6 +440,23 @@ uORBCommunicator::IChannel *uORB::Manager::get_uorb_communicator(void)
return _comm_channel;
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
int16_t uORB::Manager::process_remote_topic(const char *topic_name, bool isAdvertisement)
{
warnx("[posix-uORB::Manager::process_remote_topic(%d)] name: %s, isAdver: %d",
__LINE__, topic_name, isAdvertisement);
int16_t rc = 0;
if (isAdvertisement) {
_remote_topics.insert(topic_name);
} else {
_remote_topics.erase(topic_name);
}
return rc;
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------------------------------
int16_t uORB::Manager::process_add_subscription(const char *messageName,

View File

@ -410,6 +410,7 @@ private: // data members
// the communicator channel instance.
uORBCommunicator::IChannel *_comm_channel;
ORBSet _remote_subscriber_topics;
ORBSet _remote_topics;
DeviceMaster *_device_masters[Flavor_count]; ///< Allow at most one DeviceMaster per Flavor
@ -417,6 +418,20 @@ private: //class methods
Manager();
~Manager();
/**
* Interface to process a received topic from remote.
* @param topic_name
* This represents the uORB message Name (topic); This message Name should be
* globally unique.
* @param isAdvertisement
* Represents if the topic has been advertised or is no longer avialable.
* @return
* 0 = success; This means the messages is successfully handled in the
* handler.
* otherwise = failure.
*/
virtual int16_t process_remote_topic(const char *topic_name, bool isAdvertisement);
/**
* Interface to process a received AddSubscription from remote.
* @param messageName