uORB Subscription callbacks with WorkItem scheduling on publication (#12207)

This commit is contained in:
Daniel Agar 2019-06-17 16:26:06 +02:00 committed by GitHub
parent beaba44e5b
commit 136962d125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 375 additions and 6 deletions

View File

@ -48,6 +48,8 @@
namespace uORB namespace uORB
{ {
class SubscriptionCallback;
// Base subscription wrapper class // Base subscription wrapper class
class Subscription class Subscription
{ {
@ -107,6 +109,10 @@ public:
protected: protected:
friend class SubscriptionCallback;
DeviceNode *get_node() { return _node; }
bool subscribe(); bool subscribe();
void unsubscribe(); void unsubscribe();

View File

@ -0,0 +1,136 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/**
* @file SubscriptionCallback.hpp
*
*/
#pragma once
#include <uORB/SubscriptionInterval.hpp>
#include <containers/List.hpp>
#include <px4_work_queue/WorkItem.hpp>
namespace uORB
{
// Subscription wrapper class with callbacks on new publications
class SubscriptionCallback : public SubscriptionInterval, public ListNode<SubscriptionCallback *>
{
public:
/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallback(const orb_metadata *meta, uint8_t interval_ms = 0, uint8_t instance = 0) :
SubscriptionInterval(meta, interval_ms, instance)
{
}
virtual ~SubscriptionCallback()
{
unregister_callback();
};
bool register_callback()
{
bool ret = false;
if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) {
// registered
ret = true;
} else {
// force topic creation by subscribing with old API
int fd = orb_subscribe_multi(_subscription.get_topic(), _subscription.get_instance());
// try to register callback again
if (_subscription.forceInit()) {
if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) {
ret = true;
}
}
orb_unsubscribe(fd);
}
return ret;
}
void unregister_callback()
{
if (_subscription.get_node()) {
_subscription.get_node()->unregister_callback(this);
}
}
virtual void call() = 0;
};
// Subscription with callback that schedules a WorkItem
class SubscriptionCallbackWorkItem : public SubscriptionCallback
{
public:
/**
* Constructor
*
* @param work_item The WorkItem that will be scheduled immediately on new publications.
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallbackWorkItem(px4::WorkItem *work_item, const orb_metadata *meta, uint8_t instance = 0) :
SubscriptionCallback(meta, 0, instance), // interval 0
_work_item(work_item)
{
}
virtual ~SubscriptionCallbackWorkItem() = default;
void call() override
{
// schedule immediately if no interval, otherwise check time elapsed
if ((_interval == 0) || (hrt_elapsed_time(&_last_update) >= _interval)) {
_work_item->ScheduleNow();
}
}
private:
px4::WorkItem *_work_item;
};
} // namespace uORB

View File

@ -0,0 +1,133 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/
/**
* @file SubscriptionInterval.hpp
*
*/
#pragma once
#include <uORB/uORB.h>
#include <px4_defines.h>
#include "uORBDeviceNode.hpp"
#include "uORBManager.hpp"
#include "uORBUtils.hpp"
#include "Subscription.hpp"
namespace uORB
{
// Base subscription wrapper class
class SubscriptionInterval
{
public:
/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param interval The requested maximum update interval in microseconds.
* @param instance The instance for multi sub.
*/
SubscriptionInterval(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) :
_subscription{meta, instance},
_interval(interval_us)
{}
SubscriptionInterval() : _subscription{nullptr} {}
~SubscriptionInterval() = default;
bool published() { return _subscription.published(); }
/**
* Check if there is a new update.
* */
bool updated()
{
if (published() && (hrt_elapsed_time(&_last_update) >= _interval)) {
return _subscription.updated();
}
return false;
}
/**
* Copy the struct if updated.
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was updated and copied successfully.
*/
bool update(void *dst)
{
if (updated()) {
return copy(dst);
}
return false;
}
/**
* Copy the struct
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was copied successfully.
*/
bool copy(void *dst)
{
if (_subscription.copy(dst)) {
_last_update = hrt_absolute_time();
return true;
}
return false;
}
bool valid() const { return _subscription.valid(); }
uint8_t get_instance() const { return _subscription.get_instance(); }
orb_id_t get_topic() const { return _subscription.get_topic(); }
uint32_t get_interval() const { return _interval; }
void set_interval(uint32_t interval) { _interval = interval; }
protected:
Subscription _subscription;
uint64_t _last_update{0}; // last update in microseconds
uint32_t _interval{0}; // maximum update interval in microseconds
};
} // namespace uORB

View File

@ -37,6 +37,8 @@
#include "uORBUtils.hpp" #include "uORBUtils.hpp"
#include "uORBManager.hpp" #include "uORBManager.hpp"
#include "SubscriptionCallback.hpp"
#ifdef ORB_COMMUNICATOR #ifdef ORB_COMMUNICATOR
#include "uORBCommunicator.hpp" #include "uORBCommunicator.hpp"
#endif /* ORB_COMMUNICATOR */ #endif /* ORB_COMMUNICATOR */
@ -317,6 +319,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)
_published = true; _published = true;
// callbacks
for (auto item : _callbacks) {
item->call();
}
ATOMIC_LEAVE; ATOMIC_LEAVE;
/* notify any poll waiters */ /* notify any poll waiters */
@ -673,3 +680,33 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
_queue_size = queue_size; _queue_size = queue_size;
return PX4_OK; return PX4_OK;
} }
bool
uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub)
{
if (callback_sub != nullptr) {
ATOMIC_ENTER;
// prevent duplicate registrations
for (auto existing_callbacks : _callbacks) {
if (callback_sub == existing_callbacks) {
ATOMIC_LEAVE;
return true;
}
}
_callbacks.add(callback_sub);
ATOMIC_LEAVE;
return true;
}
return false;
}
void
uORB::DeviceNode::unregister_callback(uORB::SubscriptionCallback *callback_sub)
{
ATOMIC_ENTER;
_callbacks.remove(callback_sub);
ATOMIC_LEAVE;
}

View File

@ -45,6 +45,7 @@ namespace uORB
class DeviceNode; class DeviceNode;
class DeviceMaster; class DeviceMaster;
class Manager; class Manager;
class SubscriptionCallback;
} }
/** /**
@ -231,6 +232,12 @@ public:
*/ */
uint64_t copy_and_get_timestamp(void *dst, unsigned &generation); uint64_t copy_and_get_timestamp(void *dst, unsigned &generation);
// add item to list of work items to schedule on node update
bool register_callback(SubscriptionCallback *callback_sub);
// remove item from list of work items
void unregister_callback(SubscriptionCallback *callback_sub);
protected: protected:
pollevent_t poll_state(cdev::file_t *filp) override; pollevent_t poll_state(cdev::file_t *filp) override;
@ -269,6 +276,7 @@ private:
uint8_t *_data{nullptr}; /**< allocated object buffer */ uint8_t *_data{nullptr}; /**< allocated object buffer */
hrt_abstime _last_update{0}; /**< time the object was last updated */ hrt_abstime _last_update{0}; /**< time the object was last updated */
volatile unsigned _generation{0}; /**< object generation count */ volatile unsigned _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;
uint8_t _priority; /**< priority of the topic */ uint8_t _priority; /**< priority of the topic */
bool _published{false}; /**< has ever data been published */ bool _published{false}; /**< has ever data been published */
uint8_t _queue_size; /**< maximum number of elements in the queue */ uint8_t _queue_size; /**< maximum number of elements in the queue */

View File

@ -49,8 +49,16 @@ WorkItem::WorkItem(const wq_config_t &config)
} }
} }
WorkItem::~WorkItem()
{
Deinit();
}
bool WorkItem::Init(const wq_config_t &config) bool WorkItem::Init(const wq_config_t &config)
{ {
// clear any existing first
Deinit();
px4::WorkQueue *wq = WorkQueueFindOrCreate(config); px4::WorkQueue *wq = WorkQueueFindOrCreate(config);
if (wq == nullptr) { if (wq == nullptr) {
@ -64,4 +72,16 @@ bool WorkItem::Init(const wq_config_t &config)
return false; return false;
} }
void WorkItem::Deinit()
{
// remove any currently queued work
if (_wq != nullptr) {
// prevent additional insertions
px4::WorkQueue *wq_temp = _wq;
_wq = nullptr;
wq_temp->Remove(this);
}
}
} // namespace px4 } // namespace px4

View File

@ -51,7 +51,7 @@ public:
explicit WorkItem(const wq_config_t &config); explicit WorkItem(const wq_config_t &config);
WorkItem() = delete; WorkItem() = delete;
virtual ~WorkItem() = default; virtual ~WorkItem();
inline void ScheduleNow() { if (_wq != nullptr) _wq->Add(this); } inline void ScheduleNow() { if (_wq != nullptr) _wq->Add(this); }
@ -60,6 +60,7 @@ public:
protected: protected:
bool Init(const wq_config_t &config); bool Init(const wq_config_t &config);
void Deinit();
private: private:

View File

@ -74,6 +74,8 @@ WorkQueue::~WorkQueue()
void WorkQueue::Add(WorkItem *item) void WorkQueue::Add(WorkItem *item)
{ {
// TODO: prevent additions when shutting down
work_lock(); work_lock();
_q.push(item); _q.push(item);
work_unlock(); work_unlock();
@ -82,6 +84,24 @@ void WorkQueue::Add(WorkItem *item)
px4_sem_post(&_process_lock); px4_sem_post(&_process_lock);
} }
void WorkQueue::Remove(WorkItem *item)
{
work_lock();
_q.remove(item);
work_unlock();
}
void WorkQueue::Clear()
{
work_lock();
while (!_q.empty()) {
_q.pop();
}
work_unlock();
}
void WorkQueue::Run() void WorkQueue::Run()
{ {
while (!should_exit()) { while (!should_exit()) {

View File

@ -58,10 +58,9 @@ public:
const char *get_name() { return _config.name; } const char *get_name() { return _config.name; }
void Add(WorkItem *item); void Add(WorkItem *item);
void Remove(WorkItem *item);
// TODO: need helpers to handle clean shutdown - remove and clear? void Clear();
//void remove(WorkItem *item);
//void clear();
void Run(); void Run();

View File

@ -198,13 +198,18 @@ static void WorkQueueManagerRun()
} }
// stack size // stack size
#ifndef __PX4_QURT
const size_t stacksize = math::max(PTHREAD_STACK_MIN, PX4_STACK_ADJUSTED(wq->stacksize)); const size_t stacksize = math::max(PTHREAD_STACK_MIN, PX4_STACK_ADJUSTED(wq->stacksize));
#else
const size_t stacksize = math::max(8 * 1024, PX4_STACK_ADJUSTED(wq->stacksize));
#endif
int ret_setstacksize = pthread_attr_setstacksize(&attr, stacksize); int ret_setstacksize = pthread_attr_setstacksize(&attr, stacksize);
if (ret_setstacksize != 0) { if (ret_setstacksize != 0) {
PX4_ERR("setting stack size for %s failed (%i)", wq->name, ret_setstacksize); PX4_ERR("setting stack size for %s failed (%i)", wq->name, ret_setstacksize);
} }
#ifndef __PX4_QURT
// schedule policy FIFO // schedule policy FIFO
int ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, SCHED_FIFO); int ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
@ -212,6 +217,8 @@ static void WorkQueueManagerRun()
PX4_ERR("failed to set sched policy SCHED_FIFO (%i)", ret_setschedpolicy); PX4_ERR("failed to set sched policy SCHED_FIFO (%i)", ret_setschedpolicy);
} }
#endif // ! QuRT
// priority // priority
param.sched_priority = sched_get_priority_max(SCHED_FIFO) + wq->relative_priority; param.sched_priority = sched_get_priority_max(SCHED_FIFO) + wq->relative_priority;
int ret_setschedparam = pthread_attr_setschedparam(&attr, &param); int ret_setschedparam = pthread_attr_setschedparam(&attr, &param);

View File

@ -48,7 +48,7 @@ struct wq_config_t {
namespace wq_configurations namespace wq_configurations
{ {
static constexpr wq_config_t rate_ctrl{"wq:rate_ctrl", 1500, 0}; // PX4 inner loop highest priority static constexpr wq_config_t rate_ctrl{"wq:rate_ctrl", 1600, 0}; // PX4 inner loop highest priority
static constexpr wq_config_t SPI1{"wq:SPI1", 1250, -1}; static constexpr wq_config_t SPI1{"wq:SPI1", 1250, -1};
static constexpr wq_config_t SPI2{"wq:SPI2", 1250, -2}; static constexpr wq_config_t SPI2{"wq:SPI2", 1250, -2};
@ -62,7 +62,9 @@ static constexpr wq_config_t I2C2{"wq:I2C2", 1250, -8};
static constexpr wq_config_t I2C3{"wq:I2C3", 1250, -9}; static constexpr wq_config_t I2C3{"wq:I2C3", 1250, -9};
static constexpr wq_config_t I2C4{"wq:I2C4", 1250, -10}; static constexpr wq_config_t I2C4{"wq:I2C4", 1250, -10};
static constexpr wq_config_t hp_default{"wq:hp_default", 1500, -11}; static constexpr wq_config_t att_pos_ctrl{"wq:att_pos_ctrl", 7000, -11}; // PX4 att/pos controllers, highest priority after sensors
static constexpr wq_config_t hp_default{"wq:hp_default", 1500, -12};
static constexpr wq_config_t lp_default{"wq:lp_default", 1500, -50}; static constexpr wq_config_t lp_default{"wq:lp_default", 1500, -50};
static constexpr wq_config_t test1{"wq:test1", 800, 0}; static constexpr wq_config_t test1{"wq:test1", 800, 0};