microRTPS: make sure that Sub/Pubs do not exchange data in loop (i.e. for the same entity)

This commit is contained in:
TSC21 2020-03-05 12:14:37 +00:00 committed by Nuno Marques
parent 441e6290eb
commit 80c658c1ae
2 changed files with 47 additions and 26 deletions

View File

@ -127,17 +127,24 @@ bool @(topic)_Publisher::init()
void @(topic)_Publisher::PubListener::onPublicationMatched(Publisher* pub, MatchingInfo& info)
{
(void)pub;
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << " - @(topic) publisher matched" << std::endl;
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
std::stringstream own_endpoint, remote_endpoint;
for (size_t i = 0; i < 6; i++) {
own_endpoint << pub->getGuid().guidPrefix.value[i];
remote_endpoint << info.remoteEndpointGuid.guidPrefix.value[i];
}
else
{
n_matched--;
std::cout << " - @(topic) publisher unmatched" << std::endl;
// If the matching happens for the same entity, do not make a match
if (own_endpoint.str() != remote_endpoint.str()) {
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << " - @(topic) publisher matched" << std::endl;
} else {
n_matched--;
std::cout << " - @(topic) publisher unmatched" << std::endl;
}
}
}

View File

@ -127,22 +127,30 @@ bool @(topic)_Subscriber::init(uint8_t topic_ID, std::condition_variable* t_send
void @(topic)_Subscriber::SubListener::onSubscriptionMatched(Subscriber* sub, MatchingInfo& info)
{
(void)sub;
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << " - @(topic) subscriber matched" << std::endl;
// The first 6 values of the ID guidPrefix of an entity in a DDS-RTPS Domain
// are the same for all its subcomponents (publishers, subscribers)
std::stringstream own_endpoint, remote_endpoint;
for (size_t i = 0; i < 6; i++) {
own_endpoint << sub->getGuid().guidPrefix.value[i];
remote_endpoint << info.remoteEndpointGuid.guidPrefix.value[i];
}
else
{
n_matched--;
std::cout << " - @(topic) subscriber unmatched" << std::endl;
// If the matching happens for the same entity, do not make a match
if (own_endpoint.str() != remote_endpoint.str()) {
if (info.status == MATCHED_MATCHING)
{
n_matched++;
std::cout << " - @(topic) subscriber matched" << std::endl;
} else {
n_matched--;
std::cout << " - @(topic) subscriber unmatched" << std::endl;
}
}
}
void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
{
if (n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(has_msg_mutex);
if(has_msg.load() == true) // Check if msg has been fetched
{
@ -150,7 +158,6 @@ void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
}
has_msg_lock.unlock();
// Take data
if(sub->takeNextData(&msg, &m_info))
{
@ -167,6 +174,7 @@ void @(topic)_Subscriber::SubListener::onNewDataMessage(Subscriber* sub)
}
}
}
}
void @(topic)_Subscriber::run()
@ -178,7 +186,11 @@ void @(topic)_Subscriber::run()
bool @(topic)_Subscriber::hasMsg()
{
return m_listener.has_msg.load();
if (m_listener.n_matched > 0) {
return m_listener.has_msg.load();
}
return false;
}
@[if 1.5 <= fastrtpsgen_version <= 1.7]@
@ -200,8 +212,10 @@ bool @(topic)_Subscriber::hasMsg()
void @(topic)_Subscriber::unlockMsg()
{
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
if (m_listener.n_matched > 0) {
std::unique_lock<std::mutex> has_msg_lock(m_listener.has_msg_mutex);
m_listener.has_msg = false;
has_msg_lock.unlock();
m_listener.has_msg_cv.notify_one();
}
}