diff --git a/include/coro/concepts/executor.hpp b/include/coro/concepts/executor.hpp index dc8394f..c6c9129 100644 --- a/include/coro/concepts/executor.hpp +++ b/include/coro/concepts/executor.hpp @@ -21,7 +21,7 @@ concept executor = requires(type t, std::coroutine_handle<> c) { { t.schedule() } -> coro::concepts::awaiter; { t.yield() } -> coro::concepts::awaiter; - { t.resume(c) } -> std::same_as; + { t.resume(c) } -> std::same_as; }; #ifdef LIBCORO_FEATURE_NETWORKING diff --git a/include/coro/io_scheduler.hpp b/include/coro/io_scheduler.hpp index 1c1f46d..f916706 100644 --- a/include/coro/io_scheduler.hpp +++ b/include/coro/io_scheduler.hpp @@ -229,8 +229,18 @@ class io_scheduler * Resumes execution of a direct coroutine handle on this io scheduler. * @param handle The coroutine handle to resume execution. */ - auto resume(std::coroutine_handle<> handle) -> void + auto resume(std::coroutine_handle<> handle) -> bool { + if (handle == nullptr) + { + return false; + } + + if (m_shutdown_requested.load(std::memory_order::acquire)) + { + return false; + } + if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) { { @@ -245,10 +255,12 @@ class io_scheduler eventfd_t value{1}; eventfd_write(m_schedule_fd, value); } + + return true; } else { - m_thread_pool->resume(handle); + return m_thread_pool->resume(handle); } } diff --git a/include/coro/task_container.hpp b/include/coro/task_container.hpp index 6d0497b..76a22e2 100644 --- a/include/coro/task_container.hpp +++ b/include/coro/task_container.hpp @@ -78,7 +78,7 @@ class task_container { m_size.fetch_add(1, std::memory_order::relaxed); - std::scoped_lock lk{m_mutex}; + std::unique_lock lk{m_mutex}; if (cleanup == garbage_collect_t::yes) { @@ -95,6 +95,9 @@ class task_container std::size_t index = m_free_task_indices.front(); m_free_task_indices.pop(); + // We've reserved the slot, we can release the lock. + lk.unlock(); + // Store the task inside a cleanup task for self deletion. m_tasks[index] = make_cleanup_task(std::move(user_task), index); @@ -170,7 +173,7 @@ class task_container auto gc_internal() -> std::size_t { std::size_t deleted{0}; - auto pos = std::begin(m_tasks_to_delete); + auto pos = std::begin(m_tasks_to_delete); while (pos != std::end(m_tasks_to_delete)) { // Skip tasks that are still running or have yet to start. diff --git a/include/coro/thread_pool.hpp b/include/coro/thread_pool.hpp index 56d8189..c8e8c0b 100644 --- a/include/coro/thread_pool.hpp +++ b/include/coro/thread_pool.hpp @@ -134,15 +134,17 @@ class thread_pool /** * Schedules any coroutine handle that is ready to be resumed. * @param handle The coroutine handle to schedule. + * @return True if the coroutine is resumed, false if its a nullptr. */ - auto resume(std::coroutine_handle<> handle) noexcept -> void; + auto resume(std::coroutine_handle<> handle) noexcept -> bool; /** * Schedules the set of coroutine handles that are ready to be resumed. * @param handles The coroutine handles to schedule. + * @param uint64_t The number of tasks resumed, if any where null they are discarded. */ template> range_type> - auto resume(const range_type& handles) noexcept -> void + auto resume(const range_type& handles) noexcept -> uint64_t { m_size.fetch_add(std::size(handles), std::memory_order::release); @@ -168,7 +170,20 @@ class thread_pool m_size.fetch_sub(null_handles, std::memory_order::release); } - m_wait_cv.notify_one(); + uint64_t total = std::size(handles) - null_handles; + if (total >= m_threads.size()) + { + m_wait_cv.notify_all(); + } + else + { + for (uint64_t i = 0; i < total; ++i) + { + m_wait_cv.notify_one(); + } + } + + return total; } /** diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 07d37f1..3c33b0f 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -44,15 +44,21 @@ auto thread_pool::schedule() -> operation throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks."); } -auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void +auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool { if (handle == nullptr) { - return; + return false; + } + + if (m_shutdown_requested.load(std::memory_order::acquire)) + { + return false; } m_size.fetch_add(1, std::memory_order::release); schedule_impl(handle); + return true; } auto thread_pool::shutdown() noexcept -> void @@ -84,29 +90,44 @@ auto thread_pool::executor(std::size_t idx) -> void m_opts.on_thread_start_functor(idx); } - // Process until shutdown is requested and the total number of tasks reaches zero. - while (!m_shutdown_requested.load(std::memory_order::acquire) || m_size.load(std::memory_order::acquire) > 0) + // Process until shutdown is requested. + while (!m_shutdown_requested.load(std::memory_order::acquire)) { std::unique_lock lk{m_wait_mutex}; - m_wait_cv.wait( - lk, - [&] { - return m_size.load(std::memory_order::acquire) > 0 || - m_shutdown_requested.load(std::memory_order::acquire); - }); - // Process this batch until the queue is empty. - while (!m_queue.empty()) + m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); }); + + if (m_queue.empty()) { - auto handle = m_queue.front(); - m_queue.pop_front(); + continue; + } + + auto handle = m_queue.front(); + m_queue.pop_front(); + lk.unlock(); - // Release the lock while executing the coroutine. - lk.unlock(); - handle.resume(); + // Release the lock while executing the coroutine. + handle.resume(); + m_size.fetch_sub(1, std::memory_order::release); + } - m_size.fetch_sub(1, std::memory_order::release); - lk.lock(); + // Process until there are no ready tasks left. + while (m_size.load(std::memory_order::acquire) > 0) + { + std::unique_lock lk{m_wait_mutex}; + // m_size will only drop to zero once all executing coroutines are finished + // but the queue could be empty for threads that finished early. + if (m_queue.empty()) + { + break; } + + auto handle = m_queue.front(); + m_queue.pop_front(); + lk.unlock(); + + // Release the lock while executing the coroutine. + handle.resume(); + m_size.fetch_sub(1, std::memory_order::release); } if (m_opts.on_thread_stop_functor != nullptr) @@ -125,9 +146,8 @@ auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void { std::scoped_lock lk{m_wait_mutex}; m_queue.emplace_back(handle); + m_wait_cv.notify_one(); } - - m_wait_cv.notify_one(); } } // namespace coro diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 434a090..d0ebf2b 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -199,4 +199,36 @@ TEST_CASE("thread_pool event jump threads", "[thread_pool]") }; coro::sync_wait(coro::when_all(make_tp1_task(), make_tp2_task())); +} + +TEST_CASE("thread_pool high cpu usage when threadcount is greater than the number of tasks", "[thread_pool]") +{ + // https://github.com/jbaldwin/libcoro/issues/262 + // This test doesn't really trigger any error conditions but was reported via + // an issue that the thread_pool threads not doing work would spin on the CPU + // if there were less tasks running than threads in the pool. + // This was due to using m_size instead of m_queue.size() causing the threads + // that had no work to go into a spin trying to acquire work. + + auto sleep_for_task = [](std::chrono::seconds duration) -> coro::task + { + std::this_thread::sleep_for(duration); + co_return duration.count(); + }; + + auto wait_for_task = [&](coro::thread_pool& pool, std::chrono::seconds delay) -> coro::task<> + { + co_await pool.schedule(); + for (int i = 0; i < 5; ++i) + { + co_await sleep_for_task(delay); + std::cout << std::chrono::system_clock::now().time_since_epoch().count() << " wait for " << delay.count() + << "seconds\n"; + } + co_return; + }; + + coro::thread_pool pool{coro::thread_pool::options{.thread_count = 3}}; + coro::sync_wait( + coro::when_all(wait_for_task(pool, std::chrono::seconds{1}), wait_for_task(pool, std::chrono::seconds{3}))); } \ No newline at end of file