diff --git a/src/modules/dataman/dataman.c b/src/modules/dataman/dataman.c index dc2d6c312a..fa88dfaffb 100644 --- a/src/modules/dataman/dataman.c +++ b/src/modules/dataman/dataman.c @@ -126,45 +126,46 @@ static const char *k_data_manager_device_path = "/fs/microsd/dataman"; /* The data manager work queues */ typedef struct { - sq_queue_t q; - sem_t mutex; /* Mutual exclusion on work queue adds and deletes */ - unsigned size; - unsigned max_size; + sq_queue_t q; /* Nuttx queue */ + sem_t mutex; /* Mutual exclusion on work queue adds and deletes */ + unsigned size; /* Current size of queue */ + unsigned max_size; /* Maximum queue size reached */ } work_q_t; -static work_q_t g_free_q; -static work_q_t g_work_q; +static work_q_t g_free_q; /* queue of free work items. So that we don't always need to call malloc and free*/ +static work_q_t g_work_q; /* pending work items. To be consumed by worker thread */ -sem_t g_work_queued_sema; +sem_t g_work_queued_sema; /* To notify worker thread a work item has been queued */ sem_t g_init_sema; static bool g_task_should_exit; /**< if true, dataman task should exit */ -#define DM_SECTOR_HDR_SIZE 4 -static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE; +#define DM_SECTOR_HDR_SIZE 4 /* data manager per item header overhead */ +static const unsigned k_sector_size = DM_MAX_DATA_SIZE + DM_SECTOR_HDR_SIZE; /* total item sorage space */ static void init_q(work_q_t *q) { - sq_init(&(q->q)); - sem_init(&(q->mutex), 1, 1); - q->size = q->max_size = 0; + sq_init(&(q->q)); /* Initialize the NuttX queue structure */ + sem_init(&(q->mutex), 1, 1); /* Queue is initially unlocked */ + q->size = q->max_size = 0; /* Queue is initially empty */ } -static void destroy_q(work_q_t *q) +static inline void +destroy_q(work_q_t *q) { - sem_destroy(&(q->mutex)); + sem_destroy(&(q->mutex)); /* Destroy the queue lock */ } static inline void lock_queue(work_q_t *q) { - sem_wait(&(q->mutex)); + sem_wait(&(q->mutex)); /* Acquire the queue lock */ } static inline void unlock_queue(work_q_t *q) { - sem_post(&(q->mutex)); + sem_post(&(q->mutex)); /* Release the queue lock */ } static work_q_item_t * @@ -172,54 +173,47 @@ create_work_item(void) { work_q_item_t *item; + /* Try to reuse item from free item queue */ lock_queue(&g_free_q); if ((item = (work_q_item_t *)sq_remfirst(&(g_free_q.q)))) g_free_q.size--; unlock_queue(&g_free_q); + /* If we there weren't any free items then obtain memory for a new one */ if (item == NULL) item = (work_q_item_t *)malloc(sizeof(work_q_item_t)); + /* If we got one then lock the item*/ if (item) sem_init(&item->wait_sem, 1, 0); /* Caller will wait on this... initially locked */ + /* return the item pointer, or NULL if all failed */ return item; } /* Work queue management functions */ -static void -enqueue_work_item(work_q_item_t *item) -{ - /* put the work item on the work queue */ - lock_queue(&g_work_q); - sq_addlast(&item->link, &(g_work_q.q)); - if (++g_work_q.size > g_work_q.max_size) - g_work_q.max_size = g_work_q.size; - - unlock_queue(&g_work_q); - - /* tell the work thread that work is available */ - sem_post(&g_work_queued_sema); -} - -static void +static inline void destroy_work_item(work_q_item_t *item) { - sem_destroy(&item->wait_sem); + sem_destroy(&item->wait_sem); /* Destroy the item lock */ + /* Return the item to the free item queue for later reuse */ lock_queue(&g_free_q); sq_addfirst(&item->link, &(g_free_q.q)); + /* Update the queue size and potentially the maximum queue size */ if (++g_free_q.size > g_free_q.max_size) g_free_q.max_size = g_free_q.size; unlock_queue(&g_free_q); } -static work_q_item_t * +static inline work_q_item_t * dequeue_work_item(void) { work_q_item_t *work; + + /* retrieve the 1st item on the work queue */ lock_queue(&g_work_q); if ((work = (work_q_item_t *)sq_remfirst(&g_work_q.q))) @@ -229,6 +223,32 @@ dequeue_work_item(void) return work; } +static int +enqueue_work_item_and_wait_for_result(work_q_item_t *item) +{ + /* put the work item at the end of the work queue */ + lock_queue(&g_work_q); + sq_addlast(&item->link, &(g_work_q.q)); + + /* Adjust the queue size and potentially the maximum queue size */ + if (++g_work_q.size > g_work_q.max_size) + g_work_q.max_size = g_work_q.size; + + unlock_queue(&g_work_q); + + /* tell the work thread that work is available */ + sem_post(&g_work_queued_sema); + + /* wait for the result */ + sem_wait(&item->wait_sem); + + int result = item->result; + + destroy_work_item(item); + + return result; +} + /* Calculate the offset in file of specific item */ static int calculate_offset(dm_item_t item, unsigned char index) @@ -250,6 +270,8 @@ calculate_offset(dm_item_t item, unsigned char index) * * byte 0: Length of user data item * byte 1: Persistence of this data item + * byte 2: Unused (for future use) + * byte 3: Unused (for future use) * byte DM_SECTOR_HDR_SIZE... : data item value * * The total size must not exceed k_sector_size @@ -266,6 +288,7 @@ _write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const v /* Get the offset for this item */ offset = calculate_offset(item, index); + /* If item type or index out of range, return error */ if (offset < 0) return -1; @@ -283,10 +306,12 @@ _write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const v len = -1; + /* Seek to the right spot in the data manager file and write the data item */ if (lseek(g_task_fd, offset, SEEK_SET) == offset) if ((len = write(g_task_fd, buffer, count)) == count) - fsync(g_task_fd); + fsync(g_task_fd); /* Make sure data is written to physical media */ + /* Make sure the write succeeded */ if (len != count) return -1; @@ -304,6 +329,7 @@ _read(dm_item_t item, unsigned char index, void *buf, size_t count) /* Get the offset for this item */ offset = calculate_offset(item, index); + /* If item type or index out of range, return error */ if (offset < 0) return -1; @@ -316,14 +342,17 @@ _read(dm_item_t item, unsigned char index, void *buf, size_t count) if (lseek(g_task_fd, offset, SEEK_SET) == offset) len = read(g_task_fd, buffer, count + DM_SECTOR_HDR_SIZE); - /* Check for length issues */ + /* Check for read error */ if (len < 0) return -1; + /* A zero length entry is a empty entry */ if (len == 0) buffer[0] = 0; + /* See if we got data */ if (buffer[0] > 0) { + /* We got more than requested!!! */ if (buffer[0] > count) return -1; @@ -340,11 +369,14 @@ _clear(dm_item_t item) { int i, result = 0; + /* Get the offset of 1st item of this type */ int offset = calculate_offset(item, 0); + /* Check for item type out of range */ if (offset < 0) return -1; + /* Clear all items of this type */ for (i = 0; (unsigned)i < g_per_item_max_index[item]; i++) { char buf[1]; @@ -353,9 +385,11 @@ _clear(dm_item_t item) break; } + /* Avoid SD flash wear by only doing writes where necessary */ if (read(g_task_fd, buf, 1) < 1) break; + /* If item has length greater than 0 it needs to be overwritten */ if (buf[0]) { if (lseek(g_task_fd, offset, SEEK_SET) != offset) { result = -1; @@ -373,6 +407,7 @@ _clear(dm_item_t item) offset += k_sector_size; } + /* Make sure data is actually written to physical media */ fsync(g_task_fd); return result; } @@ -452,12 +487,13 @@ dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const { work_q_item_t *work; + /* Make sure data manager has been started and is not shutting down */ if ((g_fd < 0) || g_task_should_exit) return -1; - /* Will return with queues locked */ + /* get a work item and queue up a write request */ if ((work = create_work_item()) == NULL) - return -1; /* queues unlocked on failure */ + return -1; work->func = dm_write_func; work->write_params.item = item; @@ -465,12 +501,9 @@ dm_write(dm_item_t item, unsigned char index, dm_persitence_t persistence, const work->write_params.persistence = persistence; work->write_params.buf = buf; work->write_params.count = count; - enqueue_work_item(work); - sem_wait(&work->wait_sem); - ssize_t result = work->result; - destroy_work_item(work); - return result; + /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */ + return (ssize_t)enqueue_work_item_and_wait_for_result(work); } /* Retrieve from the data manager file */ @@ -479,24 +512,22 @@ dm_read(dm_item_t item, unsigned char index, void *buf, size_t count) { work_q_item_t *work; + /* Make sure data manager has been started and is not shutting down */ if ((g_fd < 0) || g_task_should_exit) return -1; - /* Will return with queues locked */ + /* get a work item and queue up a read request */ if ((work = create_work_item()) == NULL) - return -1; /* queues unlocked on failure */ + return -1; work->func = dm_read_func; work->read_params.item = item; work->read_params.index = index; work->read_params.buf = buf; work->read_params.count = count; - enqueue_work_item(work); - sem_wait(&work->wait_sem); - ssize_t result = work->result; - destroy_work_item(work); - return result; + /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */ + return (ssize_t)enqueue_work_item_and_wait_for_result(work); } __EXPORT int @@ -504,21 +535,19 @@ dm_clear(dm_item_t item) { work_q_item_t *work; + /* Make sure data manager has been started and is not shutting down */ if ((g_fd < 0) || g_task_should_exit) return -1; - /* Will return with queues locked */ + /* get a work item and queue up a clear request */ if ((work = create_work_item()) == NULL) - return -1; /* queues unlocked on failure */ + return -1; work->func = dm_clear_func; work->clear_params.item = item; - enqueue_work_item(work); - sem_wait(&work->wait_sem); - int result = work->result; - destroy_work_item(work); - return result; + /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */ + return enqueue_work_item_and_wait_for_result(work); } /* Tell the data manager about the type of the last reset */ @@ -527,21 +556,19 @@ dm_restart(dm_reset_reason reason) { work_q_item_t *work; + /* Make sure data manager has been started and is not shutting down */ if ((g_fd < 0) || g_task_should_exit) return -1; - /* Will return with queues locked */ + /* get a work item and queue up a restart request */ if ((work = create_work_item()) == NULL) - return -1; /* queues unlocked on failure */ + return -1; work->func = dm_restart_func; work->restart_params.reason = reason; - enqueue_work_item(work); - sem_wait(&work->wait_sem); - int result = work->result; - destroy_work_item(work); - return result; + /* Enqueue the item on the work queue and wait for the worker thread to complete processing it */ + return enqueue_work_item_and_wait_for_result(work); } static int @@ -570,24 +597,29 @@ task_main(int argc, char *argv[]) sem_init(&g_work_queued_sema, 1, 0); + /* Open or create the data manager file */ g_task_fd = open(k_data_manager_device_path, O_RDWR | O_CREAT | O_BINARY); if (g_task_fd < 0) { warnx("Could not open data manager file %s", k_data_manager_device_path); - sem_post(&g_init_sema); + sem_post(&g_init_sema); /* Don't want to hang startup */ return -1; } if (lseek(g_task_fd, max_offset, SEEK_SET) != max_offset) { close(g_task_fd); warnx("Could not seek data manager file %s", k_data_manager_device_path); - sem_post(&g_init_sema); + sem_post(&g_init_sema); /* Don't want to hang startup */ return -1; } fsync(g_task_fd); + /* We use two file descriptors, one for the caller context and one for the worker thread */ + /* They are actually the same but we need to some way to reject caller request while the */ + /* worker thread is shutting down but still processing requests */ g_fd = g_task_fd; warnx("Initialized, data manager file '%s' size is %d bytes", k_data_manager_device_path, max_offset); + /* Tell startup that the worker thread has completed its initialization */ sem_post(&g_init_sema); /* Start the endless loop, waiting for then processing work requests */ @@ -595,7 +627,7 @@ task_main(int argc, char *argv[]) /* do we need to exit ??? */ if ((g_task_should_exit) && (g_fd >= 0)) { - /* Close the file handle to stop further queueing */ + /* Close the file handle to stop further queuing */ g_fd = -1; } @@ -607,6 +639,7 @@ task_main(int argc, char *argv[]) /* Empty the work queue */ while ((work = dequeue_work_item())) { + /* handle each work item with the appropriate handler */ switch (work->func) { case dm_write_func: g_func_counts[dm_write_func]++; @@ -647,7 +680,7 @@ task_main(int argc, char *argv[]) close(g_task_fd); g_task_fd = -1; - /* Empty the work queue */ + /* The work queue is now empty, empty the free queue */ for (;;) { if ((work = (work_q_item_t *)sq_remfirst(&(g_free_q.q))) == NULL) break; @@ -669,7 +702,7 @@ start(void) sem_init(&g_init_sema, 1, 0); - /* start the task */ + /* start the worker thread */ if ((task = task_spawn_cmd("dataman", SCHED_DEFAULT, SCHED_PRIORITY_MAX - 5, 2048, task_main, NULL)) <= 0) { warn("task start failed"); return -1; @@ -704,7 +737,7 @@ stop(void) static void usage(void) { - errx(1, "usage: dataman {start|stop|status}"); + errx(1, "usage: dataman {start|stop|status|poweronrestart|inflightrestart}"); } int @@ -726,6 +759,7 @@ dataman_main(int argc, char *argv[]) exit(0); } + /* Worker thread should be running for all other commands */ if (g_fd < 0) errx(1, "not running"); @@ -733,6 +767,10 @@ dataman_main(int argc, char *argv[]) stop(); else if (!strcmp(argv[1], "status")) status(); + else if (!strcmp(argv[1], "poweronrestart")) + dm_restart(DM_INIT_REASON_POWER_ON); + else if (!strcmp(argv[1], "inflightrestart")) + dm_restart(DM_INIT_REASON_IN_FLIGHT); else usage(); diff --git a/src/systemcmds/tests/test_dataman.c b/src/systemcmds/tests/test_dataman.c index 5b121e34ed..1f844a97de 100644 --- a/src/systemcmds/tests/test_dataman.c +++ b/src/systemcmds/tests/test_dataman.c @@ -89,7 +89,7 @@ task_main(int argc, char *argv[]) unsigned hash = i ^ my_id; unsigned len = (hash & 63) + 2; - int ret = dm_write(DM_KEY_WAYPOINTS_OFFBOARD, hash, DM_PERSIST_IN_FLIGHT_RESET, buffer, len); + int ret = dm_write(DM_KEY_WAYPOINTS_OFFBOARD_1, hash, DM_PERSIST_IN_FLIGHT_RESET, buffer, len); warnx("ret: %d", ret); if (ret != len) { warnx("%d write failed, index %d, length %d", my_id, hash, len); @@ -103,7 +103,7 @@ task_main(int argc, char *argv[]) for (unsigned i = 0; i < NUM_MISSIONS_SUPPORTED; i++) { unsigned hash = i ^ my_id; unsigned len2, len = (hash & 63) + 2; - if ((len2 = dm_read(DM_KEY_WAYPOINTS_OFFBOARD, hash, buffer, sizeof(buffer))) < 2) { + if ((len2 = dm_read(DM_KEY_WAYPOINTS_OFFBOARD_1, hash, buffer, sizeof(buffer))) < 2) { warnx("%d read failed length test, index %d", my_id, hash); goto fail; } @@ -163,7 +163,7 @@ int test_dataman(int argc, char *argv[]) free(sems); dm_restart(DM_INIT_REASON_IN_FLIGHT); for (i = 0; i < NUM_MISSIONS_SUPPORTED; i++) { - if (dm_read(DM_KEY_WAYPOINTS_OFFBOARD, i, buffer, sizeof(buffer)) != 0) + if (dm_read(DM_KEY_WAYPOINTS_OFFBOARD_1, i, buffer, sizeof(buffer)) != 0) break; } if (i >= NUM_MISSIONS_SUPPORTED) { @@ -173,7 +173,7 @@ int test_dataman(int argc, char *argv[]) } dm_restart(DM_INIT_REASON_POWER_ON); for (i = 0; i < NUM_MISSIONS_SUPPORTED; i++) { - if (dm_read(DM_KEY_WAYPOINTS_OFFBOARD, i, buffer, sizeof(buffer)) != 0) { + if (dm_read(DM_KEY_WAYPOINTS_OFFBOARD_1, i, buffer, sizeof(buffer)) != 0) { warnx("Restart power-on failed"); return -1; }