dataman: remove locking mechanism

This commit is contained in:
Igor Mišić 2023-05-19 15:36:02 +02:00 committed by Beat Küng
parent 3143f6bd0a
commit c40a38bd88
8 changed files with 183 additions and 862 deletions

View File

@ -1,7 +1,7 @@
uint64 timestamp # time since system start (microseconds)
uint8 client_id
uint8 request_type # id/read/write/clear/lock/unlock
uint8 request_type # id/read/write/clear
uint8 item # dm_item_t
uint32 index
uint8[56] data

View File

@ -1,7 +1,7 @@
uint64 timestamp # time since system start (microseconds)
uint8 client_id
uint8 request_type # id/read/write/clear/lock/unlock
uint8 request_type # id/read/write/clear
uint8 item # dm_item_t
uint32 index
uint8[56] data
@ -12,7 +12,4 @@ uint8 STATUS_FAILURE_NO_DATA = 2
uint8 STATUS_FAILURE_READ_FAILED = 3
uint8 STATUS_FAILURE_WRITE_FAILED = 4
uint8 STATUS_FAILURE_CLEAR_FAILED = 5
uint8 STATUS_ALREADY_LOCKED = 6
uint8 STATUS_ALREADY_UNLOCKED = 7
uint8 STATUS_ITEM_LOCKED = 8
uint8 status

View File

@ -55,10 +55,10 @@ DatamanClient::DatamanClient()
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.request_type = DM_GET_ID
};
dataman_request_s request;
request.timestamp = timestamp;
request.request_type = DM_GET_ID;
request.client_id = CLIENT_ID_NOT_SET;
bool success = syncHandler(request, response, timestamp, 1000_ms);
@ -149,14 +149,13 @@ bool DatamanClient::readSync(dm_item_t item, uint32_t index, uint8_t *buffer, ui
bool success = false;
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.index = index,
.data_length = length,
.client_id = _client_id,
.request_type = DM_READ,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.index = index;
request.data_length = length;
request.client_id = _client_id;
request.request_type = DM_READ;
request.item = static_cast<uint8_t>(item);
dataman_response_s response{};
success = syncHandler(request, response, timestamp, timeout);
@ -187,14 +186,13 @@ bool DatamanClient::writeSync(dm_item_t item, uint32_t index, uint8_t *buffer, u
bool success = false;
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.index = index,
.data_length = length,
.client_id = _client_id,
.request_type = DM_WRITE,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.index = index;
request.data_length = length;
request.client_id = _client_id;
request.request_type = DM_WRITE;
request.item = static_cast<uint8_t>(item);
memcpy(request.data, buffer, length);
@ -219,12 +217,11 @@ bool DatamanClient::clearSync(dm_item_t item, hrt_abstime timeout)
bool success = false;
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_CLEAR,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.client_id = _client_id;
request.request_type = DM_CLEAR;
request.item = static_cast<uint8_t>(item);
dataman_response_s response{};
success = syncHandler(request, response, timestamp, timeout);
@ -242,74 +239,6 @@ bool DatamanClient::clearSync(dm_item_t item, hrt_abstime timeout)
return success;
}
bool DatamanClient::lockSync(dm_item_t item, hrt_abstime timeout)
{
bool success = true;
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_LOCK,
.item = item
};
dataman_response_s response{};
response.status = dataman_response_s::STATUS_ALREADY_LOCKED;
while (success && (response.status == dataman_response_s::STATUS_ALREADY_LOCKED)) {
success = syncHandler(request, response, timestamp, timeout);
}
if (success) {
if (response.status != dataman_response_s::STATUS_SUCCESS) {
success = false;
PX4_ERR("lockSync failed! status=%" PRIu8 ", item=%" PRIu8,
response.status, static_cast<uint8_t>(item));
}
}
return success;
}
bool DatamanClient::unlockSync(dm_item_t item, hrt_abstime timeout)
{
bool success = false;
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_UNLOCK,
.item = item
};
dataman_response_s response{};
success = syncHandler(request, response, timestamp, timeout);
if (success) {
if (response.status != dataman_response_s::STATUS_SUCCESS) {
success = false;
if (response.status == dataman_response_s::STATUS_ALREADY_UNLOCKED) {
PX4_WARN("Dataman already unlocked for item=%" PRIu8,
static_cast<uint8_t>(item));
} else {
PX4_ERR("unlockSync failed! status=%" PRIu8 ", item=%" PRIu8,
response.status, static_cast<uint8_t>(item));
}
}
}
return success;
}
bool DatamanClient::readAsync(dm_item_t item, uint32_t index, uint8_t *buffer, uint32_t length)
{
if (length > g_per_item_size[item]) {
@ -323,14 +252,13 @@ bool DatamanClient::readAsync(dm_item_t item, uint32_t index, uint8_t *buffer, u
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.index = index,
.data_length = length,
.client_id = _client_id,
.request_type = DM_READ,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.index = index;
request.data_length = length;
request.client_id = _client_id;
request.request_type = DM_READ;
request.item = static_cast<uint8_t>(item);
_active_request.timestamp = timestamp;
_active_request.request_type = DM_READ;
@ -362,14 +290,13 @@ bool DatamanClient::writeAsync(dm_item_t item, uint32_t index, uint8_t *buffer,
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.index = index,
.data_length = length,
.client_id = _client_id,
.request_type = DM_WRITE,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.index = index;
request.data_length = length;
request.client_id = _client_id;
request.request_type = DM_WRITE;
request.item = static_cast<uint8_t>(item);
memcpy(request.data, buffer, length);
@ -398,72 +325,17 @@ bool DatamanClient::clearAsync(dm_item_t item)
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_CLEAR,
.item = item
};
dataman_request_s request;
request.timestamp = timestamp;
request.client_id = _client_id;
request.request_type = DM_CLEAR;
request.item = static_cast<uint8_t>(item);
request.index = 0;
_active_request.timestamp = timestamp;
_active_request.request_type = DM_CLEAR;
_active_request.item = item;
_state = State::RequestSent;
_dataman_request_pub.publish(request);
success = true;
}
return success;
}
bool DatamanClient::lockAsync(dm_item_t item)
{
bool success = false;
if (_state == State::Idle) {
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_LOCK,
.item = item
};
_active_request.timestamp = timestamp;
_active_request.request_type = DM_LOCK;
_active_request.item = item;
_state = State::RequestSent;
_dataman_request_pub.publish(request);
success = true;
}
return success;
}
bool DatamanClient::unlockAsync(dm_item_t item)
{
bool success = false;
if (_state == State::Idle) {
hrt_abstime timestamp = hrt_absolute_time();
dataman_request_s request = {
.timestamp = timestamp,
.client_id = _client_id,
.request_type = DM_UNLOCK,
.item = item
};
_active_request.timestamp = timestamp;
_active_request.request_type = DM_UNLOCK;
_active_request.item = item;
_active_request.index = request.index;
_state = State::RequestSent;
_dataman_request_pub.publish(request);
@ -497,23 +369,19 @@ void DatamanClient::update()
_response_status = response.status;
if ((_response_status != dataman_response_s::STATUS_SUCCESS) &&
(_response_status != dataman_response_s::STATUS_ALREADY_LOCKED) &&
(_response_status != dataman_response_s::STATUS_ALREADY_UNLOCKED)) {
if (_response_status != dataman_response_s::STATUS_SUCCESS) {
PX4_ERR("Async request type %" PRIu8 " failed! status=%" PRIu8 " item=%" PRIu8 " index=%" PRIu32,
response.request_type, response.status, static_cast<uint8_t>(_active_request.item), _active_request.index);
}
if (_response_status != dataman_response_s::STATUS_ALREADY_LOCKED) {
_state = State::ResponseReceived;
}
}
}
if (_state == State::RequestSent) {
/* Retry the request if there is no answer or if already locked. */
/* Retry the request if there is no answer */
if (((_active_request.request_type != DM_CLEAR) && (hrt_elapsed_time(&_active_request.timestamp) > 100_ms)) ||
(hrt_elapsed_time(&_active_request.timestamp) > 1000_ms)
) {
@ -522,14 +390,13 @@ void DatamanClient::update()
_active_request.timestamp = timestamp;
dataman_request_s request = {
.timestamp = timestamp,
.index = _active_request.index,
.data_length = _active_request.length,
.client_id = _client_id,
.request_type = _active_request.request_type,
.item = _active_request.item
};
dataman_request_s request;
request.timestamp = timestamp;
request.index = _active_request.index;
request.data_length = _active_request.length;
request.client_id = _client_id;
request.request_type = static_cast<uint8_t>(_active_request.request_type);
request.item = static_cast<uint8_t>(_active_request.item);
if (_active_request.request_type == DM_WRITE) {
memcpy(request.data, _active_request.buffer, _active_request.length);
@ -546,15 +413,12 @@ void DatamanClient::update()
bool DatamanClient::lastOperationCompleted(bool &success)
{
bool completed = false;
success = false;
if (_state == State::ResponseReceived) {
if ((_response_status == dataman_response_s::STATUS_SUCCESS) ||
(_response_status == dataman_response_s::STATUS_ALREADY_UNLOCKED)) {
if (_response_status == dataman_response_s::STATUS_SUCCESS) {
success = true;
} else {
success = false;
}
_state = State::Idle;
@ -714,7 +578,8 @@ void DatamanCache::update()
success = _client.readAsync(static_cast<dm_item_t>(_items[_update_index].response.item),
_items[_update_index].response.index,
_items[_update_index].response.data, g_per_item_size[_items[_update_index].response.item]);
_items[_update_index].response.data,
g_per_item_size[_items[_update_index].response.item]);
if (success) {
_items[_update_index].cache_state = State::RequestSent;

View File

@ -87,28 +87,6 @@ public:
*/
bool clearSync(dm_item_t item, hrt_abstime timeout = 1000_ms);
/**
* @brief Locks an item in the dataman for exclusive access.
*
* This function sends a DM_LOCK request to the dataman to lock an item for exclusive access.
* If the item is already locked, it will wait and retry until it can obtain the lock or the timeout
* is reached. Once the lock is obtained, the item can be safely modified.
*
* @param[in] item The item to be locked.
* @param[in] timeout The maximum time to wait for the lock in microseconds.
* @return true if the item is locked successfully, false otherwise.
*/
bool lockSync(dm_item_t item, hrt_abstime timeout = 1000_ms);
/**
* Unlock an item in dataman.
*
* @param[in] item: The item to unlock.
* @param[in] timeout: The timeout for the operation.
* @return True if the unlock operation was successful, false otherwise.
*/
bool unlockSync(dm_item_t item, hrt_abstime timeout = 1000_ms);
/**
* @brief Initiates an asynchronous request to read the data from dataman for a specific item and index.
*
@ -151,25 +129,6 @@ public:
*/
bool clearAsync(dm_item_t item);
/**
* @brief Locks a dataman item asynchronously.
*
* This function sends a lock request to the dataman service, asking it to lock the specified item.
* The function returns immediately, without waiting for the lock operation to complete.
*
* @param[in] item The dataman item to be locked.
* @return True if the lock request was successfully sent, false otherwise.
*/
bool lockAsync(dm_item_t item);
/**
* @brief Unlocks the specified dataman item asynchronously.
*
* @param[in] item The item to unlock.
* @return true if the request was successfully queued, false otherwise.
*/
bool unlockAsync(dm_item_t item);
/**
* @brief Updates the state of the dataman client for asynchronous functions.
*
@ -177,7 +136,7 @@ public:
* and updates the state accordingly. If there is no response for a request, it retries the
* request after a timeout.
*
* @see readAsync(), writeAsync(), clearAsync(), lockAsync(), unlockAsync(), lastOperationCompleted()
* @see readAsync(), writeAsync(), clearAsync(), lastOperationCompleted()
*/
void update();

View File

@ -140,10 +140,6 @@ static constexpr size_t g_per_item_size_with_hdr[DM_KEY_NUM_KEYS] = {
/* Table of offset for index 0 of each item type */
static unsigned int g_key_offsets[DM_KEY_NUM_KEYS];
/* Item type lock for dataman client id*/
static uint8_t g_item_locks[DM_KEY_NUM_KEYS];
constexpr uint8_t NOT_LOCKED = 0; // dataman client id reserved for unlock state
static uint8_t dataman_clients_count = 1;
static perf_counter_t _dm_read_perf{nullptr};
@ -633,10 +629,6 @@ task_main(int argc, char *argv[])
g_func_counts[i] = 0;
}
for (uint32_t i = 0; i < DM_KEY_NUM_KEYS; i++) {
g_item_locks[i] = false;
}
g_task_should_exit = false;
uORB::Publication<dataman_response_s> dataman_response_pub{ORB_ID(dataman_response)};
@ -704,7 +696,7 @@ task_main(int argc, char *argv[])
switch (request.request_type) {
case DM_GET_ID:
if ((dataman_clients_count < UINT8_MAX) && (dataman_clients_count > NOT_LOCKED)) {
if (dataman_clients_count < UINT8_MAX) {
response.client_id = dataman_clients_count++;
/* Send the timestamp of the request over the data buffer so that the "dataman client"
* can distinguish whether the request was made by it. */
@ -718,9 +710,6 @@ task_main(int argc, char *argv[])
case DM_WRITE:
if ((g_item_locks[request.item] == NOT_LOCKED) ||
(g_item_locks[request.item] == request.client_id)) {
g_func_counts[DM_WRITE]++;
result = g_dm_ops->write(static_cast<dm_item_t>(request.item), request.index,
&(request.data), request.data_length);
@ -728,21 +717,14 @@ task_main(int argc, char *argv[])
if (result > 0) {
response.status = dataman_response_s::STATUS_SUCCESS;
} else if (result < 0) {
response.status = dataman_response_s::STATUS_FAILURE_WRITE_FAILED;
}
} else {
response.status = dataman_response_s::STATUS_ITEM_LOCKED;
response.status = dataman_response_s::STATUS_FAILURE_WRITE_FAILED;
}
break;
case DM_READ:
if ((g_item_locks[request.item] == NOT_LOCKED) ||
(g_item_locks[request.item] == request.client_id)) {
g_func_counts[DM_READ]++;
result = g_dm_ops->read(static_cast<dm_item_t>(request.item), request.index,
&(response.data), request.data_length);
@ -750,21 +732,14 @@ task_main(int argc, char *argv[])
if (result > 0) {
response.status = dataman_response_s::STATUS_SUCCESS;
} else if (result < 0) {
response.status = dataman_response_s::STATUS_FAILURE_READ_FAILED;
}
} else {
response.status = dataman_response_s::STATUS_ITEM_LOCKED;
response.status = dataman_response_s::STATUS_FAILURE_READ_FAILED;
}
break;
case DM_CLEAR:
if ((g_item_locks[request.item] == NOT_LOCKED) ||
(g_item_locks[request.item] == request.client_id)) {
g_func_counts[DM_CLEAR]++;
result = g_dm_ops->clear(static_cast<dm_item_t>(request.item));
@ -775,43 +750,11 @@ task_main(int argc, char *argv[])
response.status = dataman_response_s::STATUS_FAILURE_CLEAR_FAILED;
}
} else {
response.status = dataman_response_s::STATUS_ITEM_LOCKED;
}
break;
case DM_LOCK:
if (g_item_locks[request.item] == NOT_LOCKED ||
(g_item_locks[request.item] == request.client_id)) {
g_func_counts[DM_LOCK]++;
g_item_locks[request.item] = request.client_id;
response.status = dataman_response_s::STATUS_SUCCESS;
} else {
response.status = dataman_response_s::STATUS_ALREADY_LOCKED;
}
break;
case DM_UNLOCK:
if (g_item_locks[request.item] == request.client_id) {
g_func_counts[DM_UNLOCK]++;
g_item_locks[request.item] = NOT_LOCKED;
response.status = dataman_response_s::STATUS_SUCCESS;
} else if (g_item_locks[request.item] != request.client_id) {
response.status = dataman_response_s::STATUS_ALREADY_UNLOCKED;
}
break;
default:
break;
}
response.timestamp = hrt_absolute_time();
@ -875,8 +818,6 @@ status()
PX4_INFO("Writes %u", g_func_counts[DM_WRITE]);
PX4_INFO("Reads %u", g_func_counts[DM_READ]);
PX4_INFO("Clears %u", g_func_counts[DM_CLEAR]);
PX4_INFO("Locks %u", g_func_counts[DM_LOCK]);
PX4_INFO("Unlocks %u", g_func_counts[DM_LOCK]);
perf_print_counter(_dm_read_perf);
perf_print_counter(_dm_write_perf);
@ -904,13 +845,11 @@ It is used to store structured data of different types: mission waypoints, missi
Each type has a specific type and a fixed maximum amount of storage items, so that fast random access is possible.
### Implementation
Reading and writing a single item is always atomic. If multiple items need to be read/modified atomically, there is
an additional lock per item type via `dm_lock`.
Reading and writing a single item is always atomic.
**DM_KEY_FENCE_POINTS** and **DM_KEY_SAFE_POINTS** items: the first data element is a `mission_stats_entry_s` struct,
which stores the number of items for these types. These items are always updated atomically in one transaction (from
the mavlink mission manager). During that time, navigator will try to acquire the geofence item lock, fail, and will not
check for geofence violations.
the mavlink mission manager).
)DESCR_STR");

View File

@ -59,8 +59,6 @@ typedef enum {
DM_WRITE, ///< Write index for given item
DM_READ, ///< Read index for given item
DM_CLEAR, ///< Clear all index for given item
DM_LOCK, ///< Lock all items for given item types
DM_UNLOCK, ///< Unlock all items for given item types
DM_NUMBER_OF_FUNCS
} dm_function_t;

View File

@ -38,14 +38,10 @@
#include <unit_test.h>
#include <errno.h>
#include <fcntl.h>
#include <px4_platform_common/atomic.h>
#include <px4_platform_common/px4_config.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "dataman_client/DatamanClient.hpp"
@ -53,6 +49,7 @@
class DatamanTest : public UnitTest
{
public:
DatamanTest();
virtual bool run_tests();
private:
@ -64,10 +61,6 @@ private:
ReadWait,
Clear,
ClearWait,
Lock,
LockWait,
Unlock,
UnlockWait,
OperationCompleted,
CompareBuffers,
Exit
@ -91,19 +84,16 @@ private:
bool testAsyncWriteInvalidIndex();
bool testAsyncReadBufferOverflow();
bool testAsyncWriteBufferOverflow();
bool testAsyncMutipleClientsNoLocks();
bool testAsyncMutipleClientsWithLocks();
bool testAsyncMutipleClients();
bool testAsyncWriteReadAllItemsMaxSize();
bool testAsyncClearAll();
//Cache
bool testCache();
//The last test will reset the items so that FMU can boot without Dataman errors.
//This will reset the items but it will not restore the compact key.
bool testResetItems();
static void *testLockThread(void *arg);
DatamanClient _dataman_client1{};
DatamanClient _dataman_client2{};
DatamanClient _dataman_client3{};
@ -111,11 +101,9 @@ private:
DatamanClient _dataman_client_thread2{};
DatamanClient _dataman_client_thread3{};
uint32_t _cache_size = 10;
DatamanCache _dataman_cache{_cache_size};
DatamanCache _dataman_cache{"test_dm_cache_miss", 10};
static void *testAsyncNoLocksThread(void *arg);
static void *testAsyncWithLocksThread(void *arg);
static void *testAsyncThread(void *arg);
static constexpr uint32_t DM_MAX_DATA_SIZE{MISSION_ITEM_SIZE};
static_assert(sizeof(dataman_response_s::data) == DM_MAX_DATA_SIZE, "data size != DM_MAX_DATA_SIZE");
@ -125,24 +113,39 @@ private:
bool _response_success{false};
uint32_t _thread_index;
bool _thread_tests_success;
px4::atomic_int _thread_index{0};
px4::atomic_bool _thread_tests_success{false};
uint16_t _max_index[DM_KEY_NUM_KEYS] {};
static constexpr uint32_t OVERFLOW_LENGTH = sizeof(_buffer_write) + 1;
};
DatamanTest::DatamanTest()
{
for (uint32_t i = 0; i < DM_KEY_NUM_KEYS; ++i) {
_max_index[i] = g_per_item_max_index[i];
}
#ifndef __PX4_NUTTX
_max_index[DM_KEY_WAYPOINTS_OFFBOARD_0] = 200;
_max_index[DM_KEY_WAYPOINTS_OFFBOARD_1] = 200;
#endif
}
bool
DatamanTest::testSyncReadInvalidItem()
{
bool success = _dataman_client1.readSync(DM_KEY_NUM_KEYS, 0, _buffer_read, sizeof(_buffer_read));
bool success = _dataman_client1.readSync(DM_KEY_NUM_KEYS, 0, _buffer_read, 2);
return !success;
}
bool
DatamanTest::testSyncWriteInvalidItem()
{
bool success = _dataman_client1.writeSync(DM_KEY_NUM_KEYS, 0, _buffer_write, sizeof(_buffer_write));
bool success = _dataman_client1.writeSync(DM_KEY_NUM_KEYS, 0, _buffer_write, 2);
return !success;
}
@ -150,14 +153,14 @@ DatamanTest::testSyncWriteInvalidItem()
bool
DatamanTest::testSyncReadInvalidIndex()
{
bool success = _dataman_client1.readSync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_read, 0);
bool success = _dataman_client1.readSync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_read, 2);
return !success;
}
bool
DatamanTest::testSyncWriteInvalidIndex()
{
bool success = _dataman_client1.writeSync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_write, 0);
bool success = _dataman_client1.writeSync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_write, 2);
return !success;
}
@ -178,6 +181,11 @@ DatamanTest::testSyncWriteBufferOverflow()
bool
DatamanTest::testSyncMutipleClients()
{
// Prepare write buffer
for (uint32_t i = 0; i < DM_MAX_DATA_SIZE; ++i) {
_buffer_write[i] = (uint8_t)i;
}
bool success = _dataman_client1.writeSync(DM_KEY_WAYPOINTS_OFFBOARD_0, 0x11, _buffer_write, sizeof(_buffer_write));
if (!success) {
@ -214,41 +222,6 @@ DatamanTest::testSyncMutipleClients()
return false;
}
//Test locking
success = _dataman_client1.lockSync(DM_KEY_MISSION_STATE);
if (!success) {
return false;
}
//Check if already locked
success = _dataman_client1.lockSync(DM_KEY_MISSION_STATE, 10_ms);
if (success) {
return false;
}
//Check if already locked from another client
success = _dataman_client2.lockSync(DM_KEY_MISSION_STATE, 10_ms);
if (success) {
return false;
}
//Check if can write while locked
success = _dataman_client1.writeSync(DM_KEY_MISSION_STATE, 0, _buffer_write, g_per_item_size[DM_KEY_MISSION_STATE]);
if (!success) {
return false;
}
//Check if can read while locked
success = _dataman_client1.readSync(DM_KEY_MISSION_STATE, 0, _buffer_read, g_per_item_size[DM_KEY_MISSION_STATE]);
if (!success) {
return false;
}
//Compare content from buffers
for (uint32_t i = 0; i < g_per_item_size[DM_KEY_MISSION_STATE]; ++i) {
if (_buffer_read[i] != _buffer_write[i]) {
@ -256,84 +229,9 @@ DatamanTest::testSyncMutipleClients()
}
}
//Check if can write while locked from another client
success = _dataman_client2.writeSync(DM_KEY_MISSION_STATE, 0, _buffer_write, g_per_item_size[DM_KEY_MISSION_STATE]);
if (success) {
return false;
}
//Check if can read while locked from another client
success = _dataman_client2.readSync(DM_KEY_MISSION_STATE, 0, _buffer_read, g_per_item_size[DM_KEY_MISSION_STATE]);
if (success) {
return false;
}
//Check if can unlock while locked from another client
success = _dataman_client2.unlockSync(DM_KEY_MISSION_STATE);
if (success) {
return false;
}
//Check if can unlock
success = _dataman_client1.unlockSync(DM_KEY_MISSION_STATE);
if (!success) {
return false;
}
//Lock from another thread and to test retry lock mechanism
pthread_t thread{};
uint32_t ret = pthread_create(&thread, NULL, &testLockThread, this);
if (ret != 0) {
printf("pthread_create failed: %" PRIu32 "\n", ret);
return false;
}
px4_usleep(50_ms);
//Should fail since timeout is to short
success = _dataman_client1.lockSync(DM_KEY_MISSION_STATE, 10_ms);
if (success) {
pthread_join(thread, nullptr);
return false;
}
//Should be able to lock since the task in the thread should unlock the item in the meantime
success = _dataman_client1.lockSync(DM_KEY_MISSION_STATE);
if (!success) {
pthread_join(thread, nullptr);
return false;
}
success = _dataman_client1.unlockSync(DM_KEY_MISSION_STATE);
if (!success) {
pthread_join(thread, nullptr);
return false;
}
pthread_join(thread, nullptr);
return success;
}
void *DatamanTest::testLockThread(void *arg)
{
DatamanTest *dataman_test = (DatamanTest *)arg;
dataman_test->_dataman_client_thread1.lockSync(DM_KEY_MISSION_STATE);
px4_usleep(200_ms);
dataman_test->_dataman_client_thread1.unlockSync(DM_KEY_MISSION_STATE);
px4_usleep(200_ms);
return nullptr;
}
bool
DatamanTest::testSyncWriteReadAllItemsMaxSize()
{
@ -343,7 +241,7 @@ DatamanTest::testSyncWriteReadAllItemsMaxSize()
for (uint32_t item = DM_KEY_SAFE_POINTS; item < DM_KEY_NUM_KEYS; ++item) {
// writeSync
for (uint32_t index = 0U; index < g_per_item_max_index[item]; ++index) {
for (uint32_t index = 0U; index < _max_index[item]; ++index) {
// Prepare write buffer
for (uint32_t i = 0; i < g_per_item_size[item]; ++i) {
@ -359,7 +257,7 @@ DatamanTest::testSyncWriteReadAllItemsMaxSize()
}
// readSync
for (uint32_t index = 0U; index < g_per_item_max_index[item]; ++index) {
for (uint32_t index = 0U; index < _max_index[item]; ++index) {
success = _dataman_client1.readSync((dm_item_t)item, index, _buffer_read, g_per_item_size[item]);
@ -423,7 +321,7 @@ DatamanTest::testAsyncReadInvalidItem()
case State::Read:
state = State::ReadWait;
success = _dataman_client1.readAsync(DM_KEY_NUM_KEYS, 0, _buffer_read, sizeof(_buffer_read));
success = _dataman_client1.readAsync(DM_KEY_NUM_KEYS, 0, _buffer_read, 2);
if (!success) {
return false;
@ -479,7 +377,7 @@ DatamanTest::testAsyncWriteInvalidItem()
case State::Write:
state = State::WriteWait;
success = _dataman_client1.writeAsync(DM_KEY_NUM_KEYS, 0, _buffer_write, sizeof(_buffer_write));
success = _dataman_client1.writeAsync(DM_KEY_NUM_KEYS, 0, _buffer_write, 2);
if (!success) {
return false;
@ -633,10 +531,6 @@ DatamanTest::testAsyncReadBufferOverflow()
{
bool success = _dataman_client1.readAsync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_read, OVERFLOW_LENGTH);
if (success) {
return false;
}
return !success;
}
@ -645,56 +539,52 @@ DatamanTest::testAsyncWriteBufferOverflow()
{
bool success = _dataman_client1.writeAsync(DM_KEY_SAFE_POINTS, DM_KEY_SAFE_POINTS_MAX, _buffer_write, OVERFLOW_LENGTH);
if (success) {
return false;
}
return !success;
}
bool
DatamanTest::testAsyncMutipleClientsNoLocks()
DatamanTest::testAsyncMutipleClients()
{
pthread_t thread1{};
pthread_t thread2{};
pthread_t thread3{};
_thread_tests_success = true;
_thread_index = 0x0;
_thread_tests_success.store(true);
_thread_index.store(0);
// Test multiple dataman clients without locks
uint32_t ret = pthread_create(&thread1, NULL, &testAsyncNoLocksThread, this);
// Test multiple dataman clients
uint32_t ret = pthread_create(&thread1, nullptr, &testAsyncThread, this);
if (ret != 0) {
printf("pthread_create failed: %" PRIu32 "\n", ret);
_thread_tests_success = false;
_thread_tests_success.store(false);
}
ret = pthread_create(&thread2, NULL, &testAsyncNoLocksThread, this);
ret = pthread_create(&thread2, nullptr, &testAsyncThread, this);
if (ret != 0) {
printf("pthread_create failed: %" PRIu32 "\n", ret);
_thread_tests_success = false;
_thread_tests_success.store(false);
}
ret = pthread_create(&thread3, NULL, &testAsyncNoLocksThread, this);
ret = pthread_create(&thread3, nullptr, &testAsyncThread, this);
if (ret != 0) {
printf("pthread_create failed: %" PRIu32 "\n", ret);
_thread_tests_success = false;
_thread_tests_success.store(false);
}
pthread_join(thread1, nullptr);
pthread_join(thread2, nullptr);
pthread_join(thread3, nullptr);
return _thread_tests_success;
return _thread_tests_success.load();
}
void *DatamanTest::testAsyncNoLocksThread(void *arg)
void *DatamanTest::testAsyncThread(void *arg)
{
DatamanTest *dataman_test = (DatamanTest *)arg;
uint32_t index = (dataman_test->_thread_index)++;
const uint32_t index = dataman_test->_thread_index.fetch_add(1);
State state = State::Write;
hrt_abstime start_time = hrt_absolute_time();
@ -712,17 +602,17 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
DatamanClient *dataman_client{nullptr};
if (dataman_test->_thread_index == 1) {
if (index == 0) {
dataman_client = &(dataman_test->_dataman_client_thread1);
} else if (dataman_test->_thread_index == 2) {
} else if (index == 1) {
dataman_client = &(dataman_test->_dataman_client_thread2);
} else if (dataman_test->_thread_index == 3) {
} else if (index == 2) {
dataman_client = &(dataman_test->_dataman_client_thread3);
} else {
PX4_ERR("Unknown thread %" PRIu32 "!", dataman_test->_thread_index);
PX4_ERR("Unknown thread %" PRIu32 "!", index);
return nullptr;
}
@ -740,7 +630,7 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
if (!success) {
PX4_ERR("writeAsync failed for index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
}
break;
@ -753,7 +643,7 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
if (!response_success) {
PX4_ERR("writeAsync failed to get success operation complete for the index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
}
}
@ -767,7 +657,7 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
if (!success) {
PX4_ERR("readAsync failed for index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
}
break;
@ -779,7 +669,7 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
if (!response_success) {
PX4_ERR("readAsync failed to get success operation complete for the index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
}
}
@ -790,7 +680,7 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
for (uint32_t i = 0; i < g_per_item_size[DM_KEY_WAYPOINTS_OFFBOARD_0]; ++i) {
if (buffer_write[i] != buffer_read[i]) {
PX4_ERR("buffer are not the same for index %" PRIu32 "!", index);
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
break;
}
}
@ -806,360 +696,14 @@ void *DatamanTest::testAsyncNoLocksThread(void *arg)
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout! index=%" PRIu32, index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
dataman_test->_thread_tests_success.store(false);
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
PX4_INFO("Thread %" PRIu32 " finished!", dataman_test->_thread_index);
px4_usleep(200_ms);
return nullptr;
}
bool
DatamanTest::testAsyncMutipleClientsWithLocks()
{
pthread_t thread1{};
_thread_tests_success = true;
_thread_index = 0x0;
// Lock the item in the thread and then do other tests from this scope
uint32_t ret = pthread_create(&thread1, NULL, &testAsyncWithLocksThread, this);
if (ret != 0) {
printf("pthread_create failed: %" PRIu32 "\n", ret);
_thread_tests_success = false;
}
px4_usleep(10_ms); //give thread1 time to lock the item
hrt_abstime start_time = hrt_absolute_time();
bool completed;
bool response_success;
bool test_success = false;
// Try lockAsync for already locked item
_dataman_client1.lockAsync(DM_KEY_WAYPOINTS_OFFBOARD_0);
while (true) {
_dataman_client1.update();
completed = _dataman_client1.lastOperationCompleted(response_success);
if (completed) {
if (response_success) {
PX4_ERR("lockAsync successful, expected to fail!");
} else {
test_success = true;
}
break;
}
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
break;
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
if (!test_success) {
pthread_join(thread1, nullptr);
return false;
}
// Try writeAsync for already locked item
_dataman_client1.writeAsync(DM_KEY_WAYPOINTS_OFFBOARD_0, 0, _buffer_write, sizeof(_buffer_write));
while (true) {
_dataman_client1.update();
completed = _dataman_client1.lastOperationCompleted(response_success);
if (completed) {
if (response_success) {
PX4_ERR("writeAsync successful, expected to fail!");
} else {
test_success = true;
}
break;
}
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
break;
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
if (!test_success) {
pthread_join(thread1, nullptr);
return false;
}
// Try readAsync for already locked item
_dataman_client1.readAsync(DM_KEY_WAYPOINTS_OFFBOARD_0, 0, _buffer_read, sizeof(_buffer_read));
while (true) {
_dataman_client1.update();
completed = _dataman_client1.lastOperationCompleted(response_success);
if (completed) {
if (response_success) {
PX4_ERR("readAsync successful, expected to fail!");
} else {
test_success = true;
}
break;
}
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
break;
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
if (!test_success) {
pthread_join(thread1, nullptr);
return false;
}
// Try unlockAsync for already locked item
_dataman_client1.unlockAsync(DM_KEY_WAYPOINTS_OFFBOARD_0);
while (true) {
_dataman_client1.update();
completed = _dataman_client1.lastOperationCompleted(response_success);
if (completed) {
if (response_success) {
PX4_ERR("unlockAsync successful, expected to fail!");
} else {
test_success = true;
}
break;
}
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
break;
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
if (!test_success) {
pthread_join(thread1, nullptr);
return false;
}
pthread_join(thread1, nullptr);
return _thread_tests_success;
}
void *DatamanTest::testAsyncWithLocksThread(void *arg)
{
DatamanTest *dataman_test = (DatamanTest *)arg;
uint32_t index = (dataman_test->_thread_index)++;
State state = State::Lock;
hrt_abstime start_time = hrt_absolute_time();
uint8_t buffer_read[DM_MAX_DATA_SIZE] = {};
uint8_t buffer_write[DM_MAX_DATA_SIZE] = {};
bool success;
bool response_success;
DatamanClient *dataman_client{nullptr};
if (dataman_test->_thread_index == 1) {
dataman_client = &(dataman_test->_dataman_client_thread1);
} else if (dataman_test->_thread_index == 2) {
dataman_client = &(dataman_test->_dataman_client_thread2);
} else if (dataman_test->_thread_index == 3) {
dataman_client = &(dataman_test->_dataman_client_thread3);
} else {
PX4_ERR("Unknown thread %" PRIu32 "!", dataman_test->_thread_index);
return nullptr;
}
// Prepare write buffer
for (uint8_t i = 0; i < g_per_item_size[DM_KEY_WAYPOINTS_OFFBOARD_0]; ++i) {
buffer_write[i] = i + 1;
}
while (state != State::Exit) {
dataman_client->update();
switch (state) {
case State::Lock:
state = State::LockWait;
success = dataman_client->lockAsync(DM_KEY_WAYPOINTS_OFFBOARD_0);
if (!success) {
PX4_ERR("lockAsync failed!");
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
break;
case State::LockWait:
if (dataman_client->lastOperationCompleted(response_success)) {
state = State::Write;
if (!response_success) {
PX4_ERR("lockAsync failed to get success operation complete!");
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
}
break;
case State::Write:
state = State::WriteWait;
success = dataman_client->writeAsync(DM_KEY_WAYPOINTS_OFFBOARD_0, index, buffer_write, sizeof(buffer_write));
if (!success) {
PX4_ERR("writeAsync failed for index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
break;
case State::WriteWait:
if (dataman_client->lastOperationCompleted(response_success)) {
state = State::Read;
if (!response_success) {
PX4_ERR("writeAsync failed to get success operation complete for the index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
}
break;
case State::Read:
state = State::ReadWait;
success = dataman_client->readAsync(DM_KEY_WAYPOINTS_OFFBOARD_0, index, buffer_read, sizeof(buffer_read));
if (!success) {
PX4_ERR("readAsync failed for index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
break;
case State::ReadWait:
if (dataman_client->lastOperationCompleted(response_success)) {
state = State::CompareBuffers;
if (!response_success) {
PX4_ERR("readAsync failed to get success operation complete for the index %" PRIu32 "!", index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
}
break;
case State::CompareBuffers:
for (uint32_t i = 0; i < g_per_item_size[DM_KEY_WAYPOINTS_OFFBOARD_0]; ++i) {
if (buffer_write[i] != buffer_read[i]) {
PX4_ERR("buffer are not the same for index %" PRIu32 "!", index);
dataman_test->_thread_tests_success = false;
break;
}
}
state = State::Unlock;
break;
case State::Unlock:
// Wait before unlocking so that we can perform the tests from another scope
px4_usleep(500_ms);
state = State::UnlockWait;
success = dataman_client->unlockAsync(DM_KEY_WAYPOINTS_OFFBOARD_0);
if (!success) {
PX4_ERR("unlockAsync failed!");
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
break;
case State::UnlockWait:
// Please wait some time before unlocking so that we can perform the test from another scope
if (dataman_client->lastOperationCompleted(response_success)) {
state = State::Exit;
if (!response_success) {
PX4_ERR("unlockAsync failed to get success operation complete!");
dataman_test->_thread_tests_success = false;
}
}
break;
default:
break;
}
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout! index=%" PRIu32, index);
state = State::Exit;
dataman_test->_thread_tests_success = false;
}
//Simulate rescheduling the task after a 1 ms delay to allow time for the dataman task to operate.
px4_usleep(1_ms);
}
PX4_INFO("Thread %" PRIu32 " finished!", dataman_test->_thread_index);
PX4_INFO("Thread %" PRIu32 " finished!", index);
px4_usleep(200_ms);
return nullptr;
@ -1171,8 +715,8 @@ DatamanTest::testAsyncWriteReadAllItemsMaxSize()
bool success = false;
State state = State::Write;
volatile uint32_t item = DM_KEY_SAFE_POINTS;
volatile uint32_t index = 0U;
uint32_t item = DM_KEY_SAFE_POINTS;
uint32_t index = 0U;
hrt_abstime start_time = hrt_absolute_time();
@ -1244,7 +788,7 @@ DatamanTest::testAsyncWriteReadAllItemsMaxSize()
}
}
if (index < g_per_item_max_index[item] - 1) {
if (index < _max_index[item] - 1) {
++index;
} else {
@ -1284,7 +828,7 @@ DatamanTest::testAsyncClearAll()
State state = State::Clear;
hrt_abstime start_time = hrt_absolute_time();
volatile uint32_t item = DM_KEY_SAFE_POINTS;
uint32_t item = DM_KEY_SAFE_POINTS;
//While loop represents a task
while (state != State::Exit) {
@ -1373,21 +917,29 @@ DatamanTest::testCache()
}
// Load cache
for (uint32_t index = 0; index < _cache_size; ++index) {
_dataman_cache.load(item, index);
for (uint32_t index = 0; index < _dataman_cache.size(); ++index) {
if (!_dataman_cache.load(item, index)) {
return false;
}
}
hrt_abstime start_time = hrt_absolute_time();
// loop represents the task, we collect the data
while (hrt_elapsed_time(&start_time) < 500_ms) {
while (_dataman_cache.isLoading()) {
px4_usleep(1_ms);
_dataman_cache.update();
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
return false;
}
}
// check cached data
for (uint32_t index = 0; index < _cache_size; ++index) {
for (uint32_t index = 0; index < _dataman_cache.size(); ++index) {
uint8_t value = index + uniq_number;
success = _dataman_cache.loadWait(item, index, _buffer_read, sizeof(_buffer_read));
@ -1420,12 +972,19 @@ DatamanTest::testCache()
return false;
}
uint32_t old_cache_size = _cache_size;
_cache_size = 5;
_dataman_cache.resize(_cache_size);
// expected to success without timeout set (item is now cached)
success = _dataman_cache.loadWait(item, extra_index, _buffer_read, sizeof(_buffer_read));
// check cached data after resize (reduced)
for (uint32_t index = 0; index < _cache_size; ++index) {
if (!success) {
PX4_ERR("loadWait failed");
return false;
}
uint32_t old_cache_size = _dataman_cache.size();
_dataman_cache.resize(5);
// check cached data after resize (reduced, the first item got overwritten by extra_index)
for (uint32_t index = 1; index < _dataman_cache.size(); ++index) {
uint8_t value = index + uniq_number;
success = _dataman_cache.loadWait(item, index, _buffer_read, sizeof(_buffer_read));
@ -1442,7 +1001,7 @@ DatamanTest::testCache()
}
}
for (uint32_t index = _cache_size; index < old_cache_size; ++index) {
for (uint32_t index = _dataman_cache.size(); index < old_cache_size; ++index) {
uint8_t value = index + uniq_number;
success = _dataman_cache.loadWait(item, index, _buffer_read, sizeof(_buffer_read));
@ -1452,25 +1011,30 @@ DatamanTest::testCache()
}
}
_cache_size = 15;
_dataman_cache.resize(_cache_size);
_dataman_cache.invalidate();
_dataman_cache.resize(15);
// Load cache
for (uint32_t index = 0; index < _cache_size; ++index) {
for (uint32_t index = 0; index < _dataman_cache.size(); ++index) {
_dataman_cache.load(item, index);
}
start_time = hrt_absolute_time();
// loop represents the task, we collect the data
while (hrt_elapsed_time(&start_time) < 500_ms) {
while (_dataman_cache.isLoading()) {
px4_usleep(1_ms);
_dataman_cache.update();
if (hrt_elapsed_time(&start_time) > 2_s) {
PX4_ERR("Test timeout!");
return false;
}
}
// check cached data
for (uint32_t index = 0; index < _cache_size; ++index) {
for (uint32_t index = 0; index < _dataman_cache.size(); ++index) {
uint8_t value = index + uniq_number;
success = _dataman_cache.loadWait(item, index, _buffer_read, sizeof(_buffer_read));
@ -1490,7 +1054,7 @@ DatamanTest::testCache()
// invalidate and check cached data
_dataman_cache.invalidate();
for (uint32_t index = 0; index < _cache_size; ++index) {
for (uint32_t index = 0; index < _dataman_cache.size(); ++index) {
uint8_t value = index + uniq_number;
success = _dataman_cache.loadWait(item, index, _buffer_read, sizeof(_buffer_read));
@ -1571,8 +1135,7 @@ bool DatamanTest::run_tests()
ut_run_test(testAsyncWriteInvalidIndex);
ut_run_test(testAsyncReadBufferOverflow);
ut_run_test(testAsyncWriteBufferOverflow);
ut_run_test(testAsyncMutipleClientsNoLocks);
ut_run_test(testAsyncMutipleClientsWithLocks);
ut_run_test(testAsyncMutipleClients);
ut_run_test(testAsyncWriteReadAllItemsMaxSize);
ut_run_test(testAsyncClearAll);