uorb: do not open a node exclusively for an advertiser

Exclusive open is not required, but we now need to ensure the queue size
is set atomically.

It avoids a race condition between 2 single-instance advertisers,
where one of them would fail to open the node with -EBUSY.
This commit is contained in:
Beat Küng 2019-11-21 08:04:32 +01:00 committed by Daniel Agar
parent 6de6235fa9
commit b076cfd4ed
2 changed files with 9 additions and 35 deletions

View File

@ -73,36 +73,15 @@ uORB::DeviceNode::~DeviceNode()
int int
uORB::DeviceNode::open(cdev::file_t *filp) uORB::DeviceNode::open(cdev::file_t *filp)
{ {
int ret;
/* is this a publisher? */ /* is this a publisher? */
if (filp->f_oflags == PX4_F_WRONLY) { if (filp->f_oflags == PX4_F_WRONLY) {
/* become the publisher if we can */
lock(); lock();
if (_publisher == 0) {
_publisher = px4_getpid();
ret = PX4_OK;
} else {
ret = -EBUSY;
}
mark_as_advertised(); mark_as_advertised();
unlock(); unlock();
/* now complete the open */ /* now complete the open */
if (ret == PX4_OK) { return CDev::open(filp);
ret = CDev::open(filp);
/* open failed - not the publisher anymore */
if (ret != PX4_OK) {
_publisher = 0;
}
}
return ret;
} }
/* is this a new subscriber? */ /* is this a new subscriber? */
@ -120,7 +99,7 @@ uORB::DeviceNode::open(cdev::file_t *filp)
filp->f_priv = (void *)sd; filp->f_priv = (void *)sd;
ret = CDev::open(filp); int ret = CDev::open(filp);
add_internal_subscriber(); add_internal_subscriber();
@ -143,11 +122,7 @@ uORB::DeviceNode::open(cdev::file_t *filp)
int int
uORB::DeviceNode::close(cdev::file_t *filp) uORB::DeviceNode::close(cdev::file_t *filp)
{ {
/* is this the publisher closing? */ if (filp->f_oflags == PX4_F_RDONLY) { /* subscriber */
if (px4_getpid() == _publisher) {
_publisher = 0;
} else {
SubscriberData *sd = filp_to_sd(filp); SubscriberData *sd = filp_to_sd(filp);
if (sd != nullptr) { if (sd != nullptr) {
@ -353,10 +328,12 @@ uORB::DeviceNode::ioctl(cdev::file_t *filp, int cmd, unsigned long arg)
*(int *)arg = get_priority(); *(int *)arg = get_priority();
return PX4_OK; return PX4_OK;
case ORBIOCSETQUEUESIZE: case ORBIOCSETQUEUESIZE: {
//no need for locking here, since this is used only during the advertisement call, lock();
//and only one advertiser is allowed to open the DeviceNode at the same time. int ret = update_queue_size(arg);
return update_queue_size(arg); unlock();
return ret;
}
case ORBIOCGETINTERVAL: case ORBIOCGETINTERVAL:
if (sd->update_interval) { if (sd->update_interval) {

View File

@ -234,9 +234,6 @@ private:
uint8_t _queue_size; /**< maximum number of elements in the queue */ uint8_t _queue_size; /**< maximum number of elements in the queue */
int8_t _subscriber_count{0}; int8_t _subscriber_count{0};
px4_task_t _publisher{0}; /**< if nonzero, current publisher. Only used inside the advertise call.
We allow one publisher to have an open file descriptor at the same time. */
// statistics // statistics
uint32_t _lost_messages = 0; /**< nr of lost messages for all subscribers. If two subscribers lose the same uint32_t _lost_messages = 0; /**< nr of lost messages for all subscribers. If two subscribers lose the same
message, it is counted as two. */ message, it is counted as two. */