Enabled topic_unadvertised

Signed-off-by: Mark Charlebois <charlebm@gmail.com>
This commit is contained in:
Mark Charlebois 2016-08-15 13:33:38 -07:00 committed by Lorenz Meier
parent 9834155d09
commit b08e70b65a
4 changed files with 24 additions and 25 deletions

View File

@ -71,21 +71,31 @@ uORB::FastRpcChannel::FastRpcChannel()
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
{
return control_msg_queue_add(_CONTROL_MSG_TYPE_ADVERTISE, messageName);
}
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::topic_unadvertised(const char *messageName)
{
return control_msg_queue_add(_CONTROL_MSG_TYPE_UNADVERTISE, messageName);
}
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::control_msg_queue_add(int32_t msgtype, const char *messageName)
{
int16_t rc = 0;
hrt_abstime t1, t2;
static hrt_abstime check_time = 0;
PX4_DEBUG("=========publish topic[%s] to remote...", messageName);
t1 = hrt_absolute_time();
_QueueMutex.lock();
bool overwriteData = false;
if (IsControlQFull()) {
// queue is full. Overwrite the oldest data.
//PX4_WARN("[topic_advertised] Queue Full Overwrite the oldest data. in[%ld] out[%ld] max[%ld]",
// _ControlQInIndex, _ControlQOutIndex, _MAX_MSG_QUEUE_SIZE);
_ControlQOutIndex++;
if (_ControlQOutIndex == _MAX_MSG_QUEUE_SIZE) {
@ -96,7 +106,7 @@ int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
_dropped_pkts++;
}
_ControlMsgQueue[ _ControlQInIndex ]._Type = _CONTROL_MSG_TYPE_ADVERTISE;
_ControlMsgQueue[ _ControlQInIndex ]._Type = msgtype;
_ControlMsgQueue[ _ControlQInIndex ]._MsgName = messageName;
_ControlQInIndex++;
@ -106,7 +116,6 @@ int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
}
// the assumption here is that each caller reads only one data from either control or data queue.
//if (!overwriteData) {
if (ControlQSize() == 1 && DataQSize() == 0) { // post it only of the queue moves from empty to available.
_DataAvailableSemaphore.post();
}
@ -134,19 +143,6 @@ int16_t uORB::FastRpcChannel::topic_advertised(const char *messageName)
return rc;
}
//==============================================================================
//==============================================================================
/*
//TODO: verify if needed
int16_t uORB::FastRpcChannel::topic_unadvertised(const char *messageName)
{
int16_t rc = 0;
PX4_DEBUG("=========unpublish topic[%s] to remote...", messageName);
return rc;
}
*/
//==============================================================================
//==============================================================================
int16_t uORB::FastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)

View File

@ -82,7 +82,7 @@ public:
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
//virtual int16_t topic_unadvertised(const char *messageName);
virtual int16_t topic_unadvertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of interest of a
@ -183,6 +183,7 @@ private: // data members
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
static const int32_t _DATA_MSG_TYPE = 3;
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4;
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE = 5;
static const int32_t _PACKET_FIELD_TOPIC_NAME_LEN_SIZE_IN_BYTES = 2;
static const int32_t _PACKET_FIELD_DATA_LEN_IN_BYTES = 2;
@ -292,6 +293,7 @@ private://class members.
int32_t get_msg_size_at(bool isData, int32_t index);
int32_t copy_msg_to_buffer(bool isData, int32_t src_index, uint8_t *dst_buffer, int32_t offset, int32_t dst_buffer_len);
int16_t control_msg_queue_add(int32_t msgtype, const char *messageName);
std::set<std::string> _RemoteSubscribers;
};

View File

@ -76,8 +76,6 @@ int16_t uORB::KraitFastRpcChannel::topic_advertised(const char *messageName)
return rc;
}
/*
//TODO: verify if needed
int16_t uORB::KraitFastRpcChannel::topic_unadvertised(const char *messageName)
{
int16_t rc = 0;
@ -86,7 +84,6 @@ int16_t uORB::KraitFastRpcChannel::topic_unadvertised(const char *messageName)
PX4_DEBUG("Response for TopicUnadvertised for [%s], rc[%d]\n", messageName, rc);
return rc;
}
*/
int16_t uORB::KraitFastRpcChannel::add_subscription(const char *messageName, int32_t msgRateInHz)
{
@ -280,8 +277,11 @@ void uORB::KraitFastRpcChannel::fastrpc_recv_thread()
_RxHandler->process_received_message(messageName,
header->_DataLen, topic_data);
} else if (header->_MsgType == _CONTROL_MSG_TYPE_ADVERTISE) {
PX4_DEBUG( "Received topic for control message for: [%s] len[%d]\n", messageName, data_length );
PX4_DEBUG( "Received topic advertise message for: [%s] len[%d]\n", messageName, data_length );
_RxHandler->process_remote_topic(messageName, true);
} else if (header->_MsgType == _CONTROL_MSG_TYPE_UNADVERTISE) {
PX4_DEBUG( "Received topic unadvertise message for: [%s] len[%d]\n", messageName, data_length );
_RxHandler->process_remote_topic(messageName, false);
}
}

View File

@ -95,7 +95,7 @@ public:
* Note: This does not mean that the receiver as received it.
* otherwise = failure.
*/
//virtual int16_t topic_unadvertised(const char *messageName);
virtual int16_t topic_unadvertised(const char *messageName);
/**
* @brief Interface to notify the remote entity of interest of a
@ -168,6 +168,7 @@ private: // data members
static const int32_t _CONTROL_MSG_TYPE_REMOVE_SUBSCRIBER = 2;
static const int32_t _DATA_MSG_TYPE = 3;
static const int32_t _CONTROL_MSG_TYPE_ADVERTISE = 4;
static const int32_t _CONTROL_MSG_TYPE_UNADVERTISE = 5;
struct BulkTransferHeader {
uint16_t _MsgType;