From 828e31d3a914166c8bb3897c6191f4203f151b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beat=20K=C3=BCng?= Date: Wed, 9 Jan 2019 13:23:57 +0100 Subject: [PATCH] lockstep_scheduler: optimize performance - use a linked-list instead of std::vector. Insertion and removal are now O(1) - avoid malloc and use a thread_local instance of TimedWait. It gets destroyed when the thread exits, so we have to add protection in case a thread exits too quickly. This in turn requires a fix to the unit-tests. --- .../src/lockstep_scheduler/CMakeLists.txt | 2 + .../lockstep_scheduler/lockstep_scheduler.h | 21 +++++- .../src/lockstep_scheduler.cpp | 73 +++++++++++++------ .../test/src/lockstep_scheduler_test.cpp | 56 +++++++++++--- 4 files changed, 118 insertions(+), 34 deletions(-) diff --git a/platforms/posix/src/lockstep_scheduler/CMakeLists.txt b/platforms/posix/src/lockstep_scheduler/CMakeLists.txt index 33e8f70666..a51e57cb1c 100644 --- a/platforms/posix/src/lockstep_scheduler/CMakeLists.txt +++ b/platforms/posix/src/lockstep_scheduler/CMakeLists.txt @@ -6,6 +6,8 @@ if(NOT PROJECT_NAME STREQUAL "px4") set (CMAKE_CXX_STANDARD 11) + add_definitions(-DUNIT_TESTS) + add_library(lockstep_scheduler src/lockstep_scheduler.cpp ) diff --git a/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h b/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h index 8462368529..69a0ee34ed 100644 --- a/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h +++ b/platforms/posix/src/lockstep_scheduler/include/lockstep_scheduler/lockstep_scheduler.h @@ -10,6 +10,8 @@ class LockstepScheduler { public: + ~LockstepScheduler(); + void set_absolute_time(uint64_t time_us); inline uint64_t get_absolute_time() const { return time_us_; } int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us); @@ -19,13 +21,28 @@ private: std::atomic time_us_{0}; struct TimedWait { + ~TimedWait() + { + // If a thread quickly exits after a cond_timedwait(), the + // thread_local object can still be in the linked list. In that case + // we need to wait until it's removed. + while (!removed) { +#ifndef UNIT_TESTS // unit tests don't define system_usleep and execute faster w/o sleeping here + system_sleep(5000); +#endif + } + } + pthread_cond_t *passed_cond{nullptr}; pthread_mutex_t *passed_lock{nullptr}; uint64_t time_us{0}; bool timeout{false}; std::atomic done{false}; + std::atomic removed{true}; + + TimedWait *next{nullptr}; ///< linked list }; - std::vector> timed_waits_{}; - std::mutex timed_waits_mutex_{}; + TimedWait *timed_waits_{nullptr}; ///< head of linked list + std::mutex timed_waits_mutex_; std::atomic setting_time_{false}; ///< true if set_absolute_time() is currently being executed }; diff --git a/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp b/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp index c12f49799c..08b76f0272 100644 --- a/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp +++ b/platforms/posix/src/lockstep_scheduler/src/lockstep_scheduler.cpp @@ -1,5 +1,16 @@ #include "lockstep_scheduler/lockstep_scheduler.h" +LockstepScheduler::~LockstepScheduler() +{ + // cleanup the linked list + std::unique_lock lock_timed_waits(timed_waits_mutex_); + + while (timed_waits_) { + TimedWait *tmp = timed_waits_; + timed_waits_ = timed_waits_->next; + tmp->removed = true; + } +} void LockstepScheduler::set_absolute_time(uint64_t time_us) { @@ -9,29 +20,38 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us) std::unique_lock lock_timed_waits(timed_waits_mutex_); setting_time_ = true; - auto it = std::begin(timed_waits_); - - while (it != std::end(timed_waits_)) { - - std::shared_ptr temp_timed_wait = *it; + TimedWait *timed_wait = timed_waits_; + TimedWait *timed_wait_prev = nullptr; + while (timed_wait) { // Clean up the ones that are already done from last iteration. - if (temp_timed_wait->done) { - it = timed_waits_.erase(it); + if (timed_wait->done) { + // Erase from the linked list + if (timed_wait_prev) { + timed_wait_prev->next = timed_wait->next; + + } else { + timed_waits_ = timed_wait->next; + } + + TimedWait *tmp = timed_wait; + timed_wait = timed_wait->next; + tmp->removed = true; continue; } - if (temp_timed_wait->time_us <= time_us && - !temp_timed_wait->timeout) { + if (timed_wait->time_us <= time_us && + !timed_wait->timeout) { // We are abusing the condition here to signal that the time // has passed. - pthread_mutex_lock(temp_timed_wait->passed_lock); - temp_timed_wait->timeout = true; - pthread_cond_broadcast(temp_timed_wait->passed_cond); - pthread_mutex_unlock(temp_timed_wait->passed_lock); + pthread_mutex_lock(timed_wait->passed_lock); + timed_wait->timeout = true; + pthread_cond_broadcast(timed_wait->passed_cond); + pthread_mutex_unlock(timed_wait->passed_lock); } - ++it; + timed_wait_prev = timed_wait; + timed_wait = timed_wait->next; } setting_time_ = false; @@ -40,7 +60,9 @@ void LockstepScheduler::set_absolute_time(uint64_t time_us) int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *lock, uint64_t time_us) { - std::shared_ptr new_timed_wait; + // A TimedWait object might still be in timed_waits_ after we return, so its lifetime needs to be + // longer. And using thread_local is more efficient than malloc. + static thread_local TimedWait timed_wait; { std::lock_guard lock_timed_waits(timed_waits_mutex_); @@ -49,22 +71,29 @@ int LockstepScheduler::cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *loc return ETIMEDOUT; } - new_timed_wait = std::make_shared(); - new_timed_wait->time_us = time_us; - new_timed_wait->passed_cond = cond; - new_timed_wait->passed_lock = lock; - timed_waits_.push_back(new_timed_wait); + timed_wait.time_us = time_us; + timed_wait.passed_cond = cond; + timed_wait.passed_lock = lock; + timed_wait.timeout = false; + timed_wait.done = false; + + // Add to linked list if not removed yet (otherwise just re-use the object) + if (timed_wait.removed) { + timed_wait.removed = false; + timed_wait.next = timed_waits_; + timed_waits_ = &timed_wait; + } } int result = pthread_cond_wait(cond, lock); - const bool timeout = new_timed_wait->timeout; + const bool timeout = timed_wait.timeout; if (result == 0 && timeout) { result = ETIMEDOUT; } - new_timed_wait->done = true; + timed_wait.done = true; if (!timeout && setting_time_) { // This is where it gets tricky: the timeout has not been triggered yet, diff --git a/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp b/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp index f06edcbb15..e486710bf2 100644 --- a/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp +++ b/platforms/posix/src/lockstep_scheduler/test/src/lockstep_scheduler_test.cpp @@ -4,7 +4,43 @@ #include #include #include +#include +#include +class TestThread +{ +public: + TestThread(const std::function &f) + : _f(f) + { + _thread = std::thread(std::bind(&TestThread::execute, this)); + } + + void join(LockstepScheduler &ls) + { + // The unit-tests do not reflect the real usage, where + // set_absolute_time() is called regularly and can do some + // cleanup tasks. We simulate that here by waiting until + // the given task returns (which is expected to happen quickly) + // and then call set_absolute_time(), which can do the cleanup, + // and _thread can then exit as well. + while (!_done) { + std::this_thread::yield(); // usleep is too slow here + } + + ls.set_absolute_time(ls.get_absolute_time()); + _thread.join(); + } +private: + void execute() + { + _f(); + _done = true; + } + std::function _f; + std::atomic _done{false}; + std::thread _thread; +}; constexpr uint64_t some_time_us = 12345678; @@ -33,7 +69,7 @@ void test_condition_timing_out() // Use a thread to wait for condition while we already have the lock. // This ensures the synchronization happens in the right order. - std::thread thread([&ls, &cond, &lock, &should_have_timed_out]() { + TestThread thread([&ls, &cond, &lock, &should_have_timed_out]() { assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == ETIMEDOUT); assert(should_have_timed_out); // It should be re-locked afterwards, so we should be able to unlock it. @@ -44,7 +80,7 @@ void test_condition_timing_out() should_have_timed_out = true; ls.set_absolute_time(some_time_us + 1500); - thread.join(); + thread.join(ls); pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); @@ -67,7 +103,7 @@ void test_locked_semaphore_getting_unlocked() pthread_mutex_lock(&lock); // Use a thread to wait for condition while we already have the lock. // This ensures the synchronization happens in the right order. - std::thread thread([&ls, &cond, &lock]() { + TestThread thread([&ls, &cond, &lock]() { ls.set_absolute_time(some_time_us + 500); assert(ls.cond_timedwait(&cond, &lock, some_time_us + 1000) == 0); @@ -79,7 +115,7 @@ void test_locked_semaphore_getting_unlocked() pthread_cond_broadcast(&cond); pthread_mutex_unlock(&lock); - thread.join(); + thread.join(ls); pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); @@ -107,7 +143,7 @@ public: void run() { pthread_mutex_lock(&lock_); - thread_ = std::make_shared([this]() { + thread_ = std::make_shared([this]() { result_ = ls_.cond_timedwait(&cond_, &lock_, timeout_); pthread_mutex_unlock(&lock_); }); @@ -131,13 +167,13 @@ public: pthread_mutex_unlock(&lock_); is_done_ = true; // We can be sure that this triggers. - thread_->join(); + thread_->join(ls_); assert(result_ == 0); } else if (timeout_reached) { is_done_ = true; - thread_->join(); + thread_->join(ls_); assert(result_ == ETIMEDOUT); } } @@ -151,7 +187,7 @@ private: LockstepScheduler &ls_; std::atomic is_done_{false}; std::atomic result_ {INITIAL_RESULT}; - std::shared_ptr thread_{}; + std::shared_ptr thread_{}; }; int random_number(int min, int max) @@ -250,7 +286,7 @@ void test_usleep() std::atomic step{Step::Init}; - std::thread thread([&step, &ls]() { + TestThread thread([&step, &ls]() { step = Step::ThreadStarted; WAIT_FOR(step == Step::BeforeUsleep); @@ -268,7 +304,7 @@ void test_usleep() assert(ls.usleep_until(some_time_us + 1000) == 0); assert(step == Step::UsleepTriggered); - thread.join(); + thread.join(ls); } int main(int /*argc*/, char ** /*argv*/)