Skip to content

Commit

Permalink
coro::thread_pool high cpu usage when tasks < threads
Browse files Browse the repository at this point in the history
The check for m_size > 0 was keeping threads awake in a spin state until
all tasks completed. This correctl now uses m_queue.size() behind the
lock to correctly only wake up threads on the condition variable when
tasks are waiting to be processed.

Closes #262
  • Loading branch information
jbaldwin committed May 24, 2024
1 parent 3e8a735 commit c9bb47c
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 29 deletions.
2 changes: 1 addition & 1 deletion include/coro/concepts/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ concept executor = requires(type t, std::coroutine_handle<> c)
{
{ t.schedule() } -> coro::concepts::awaiter;
{ t.yield() } -> coro::concepts::awaiter;
{ t.resume(c) } -> std::same_as<void>;
{ t.resume(c) } -> std::same_as<bool>;
};

#ifdef LIBCORO_FEATURE_NETWORKING
Expand Down
16 changes: 14 additions & 2 deletions include/coro/io_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,18 @@ class io_scheduler
* Resumes execution of a direct coroutine handle on this io scheduler.
* @param handle The coroutine handle to resume execution.
*/
auto resume(std::coroutine_handle<> handle) -> void
auto resume(std::coroutine_handle<> handle) -> bool
{
if (handle == nullptr)
{
return false;
}

if (m_shutdown_requested.load(std::memory_order::acquire))
{
return false;
}

if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
{
Expand All @@ -245,10 +255,12 @@ class io_scheduler
eventfd_t value{1};
eventfd_write(m_schedule_fd, value);
}

return true;
}
else
{
m_thread_pool->resume(handle);
return m_thread_pool->resume(handle);
}
}

Expand Down
7 changes: 5 additions & 2 deletions include/coro/task_container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class task_container
{
m_size.fetch_add(1, std::memory_order::relaxed);

std::scoped_lock lk{m_mutex};
std::unique_lock lk{m_mutex};

if (cleanup == garbage_collect_t::yes)
{
Expand All @@ -95,6 +95,9 @@ class task_container
std::size_t index = m_free_task_indices.front();
m_free_task_indices.pop();

// We've reserved the slot, we can release the lock.
lk.unlock();

// Store the task inside a cleanup task for self deletion.
m_tasks[index] = make_cleanup_task(std::move(user_task), index);

Expand Down Expand Up @@ -170,7 +173,7 @@ class task_container
auto gc_internal() -> std::size_t
{
std::size_t deleted{0};
auto pos = std::begin(m_tasks_to_delete);
auto pos = std::begin(m_tasks_to_delete);
while (pos != std::end(m_tasks_to_delete))
{
// Skip tasks that are still running or have yet to start.
Expand Down
21 changes: 18 additions & 3 deletions include/coro/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ class thread_pool
/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
* @return True if the coroutine is resumed, false if its a nullptr.
*/
auto resume(std::coroutine_handle<> handle) noexcept -> void;
auto resume(std::coroutine_handle<> handle) noexcept -> bool;

/**
* Schedules the set of coroutine handles that are ready to be resumed.
* @param handles The coroutine handles to schedule.
* @param uint64_t The number of tasks resumed, if any where null they are discarded.
*/
template<coro::concepts::range_of<std::coroutine_handle<>> range_type>
auto resume(const range_type& handles) noexcept -> void
auto resume(const range_type& handles) noexcept -> uint64_t
{
m_size.fetch_add(std::size(handles), std::memory_order::release);

Expand All @@ -168,7 +170,20 @@ class thread_pool
m_size.fetch_sub(null_handles, std::memory_order::release);
}

m_wait_cv.notify_one();
uint64_t total = std::size(handles) - null_handles;
if (total >= m_threads.size())
{
m_wait_cv.notify_all();
}
else
{
for (uint64_t i = 0; i < total; ++i)
{
m_wait_cv.notify_one();
}
}

return total;
}

/**
Expand Down
62 changes: 41 additions & 21 deletions src/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,21 @@ auto thread_pool::schedule() -> operation
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
}

auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> bool
{
if (handle == nullptr)
{
return;
return false;
}

if (m_shutdown_requested.load(std::memory_order::acquire))
{
return false;
}

m_size.fetch_add(1, std::memory_order::release);
schedule_impl(handle);
return true;
}

auto thread_pool::shutdown() noexcept -> void
Expand Down Expand Up @@ -84,29 +90,44 @@ auto thread_pool::executor(std::size_t idx) -> void
m_opts.on_thread_start_functor(idx);
}

// Process until shutdown is requested and the total number of tasks reaches zero.
while (!m_shutdown_requested.load(std::memory_order::acquire) || m_size.load(std::memory_order::acquire) > 0)
// Process until shutdown is requested.
while (!m_shutdown_requested.load(std::memory_order::acquire))
{
std::unique_lock<std::mutex> lk{m_wait_mutex};
m_wait_cv.wait(
lk,
[&] {
return m_size.load(std::memory_order::acquire) > 0 ||
m_shutdown_requested.load(std::memory_order::acquire);
});
// Process this batch until the queue is empty.
while (!m_queue.empty())
m_wait_cv.wait(lk, [&]() { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::acquire); });

if (m_queue.empty())
{
auto handle = m_queue.front();
m_queue.pop_front();
continue;
}

auto handle = m_queue.front();
m_queue.pop_front();
lk.unlock();

// Release the lock while executing the coroutine.
lk.unlock();
handle.resume();
// Release the lock while executing the coroutine.
handle.resume();
m_size.fetch_sub(1, std::memory_order::release);
}

m_size.fetch_sub(1, std::memory_order::release);
lk.lock();
// Process until there are no ready tasks left.
while (m_size.load(std::memory_order::acquire) > 0)
{
std::unique_lock<std::mutex> lk{m_wait_mutex};
// m_size will only drop to zero once all executing coroutines are finished
// but the queue could be empty for threads that finished early.
if (m_queue.empty())
{
break;
}

auto handle = m_queue.front();
m_queue.pop_front();
lk.unlock();

// Release the lock while executing the coroutine.
handle.resume();
m_size.fetch_sub(1, std::memory_order::release);
}

if (m_opts.on_thread_stop_functor != nullptr)
Expand All @@ -125,9 +146,8 @@ auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
{
std::scoped_lock lk{m_wait_mutex};
m_queue.emplace_back(handle);
m_wait_cv.notify_one();
}

m_wait_cv.notify_one();
}

} // namespace coro
32 changes: 32 additions & 0 deletions test/test_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,36 @@ TEST_CASE("thread_pool event jump threads", "[thread_pool]")
};

coro::sync_wait(coro::when_all(make_tp1_task(), make_tp2_task()));
}

TEST_CASE("thread_pool high cpu usage when threadcount is greater than the number of tasks", "[thread_pool]")
{
// https://github.com/jbaldwin/libcoro/issues/262
// This test doesn't really trigger any error conditions but was reported via
// an issue that the thread_pool threads not doing work would spin on the CPU
// if there were less tasks running than threads in the pool.
// This was due to using m_size instead of m_queue.size() causing the threads
// that had no work to go into a spin trying to acquire work.

auto sleep_for_task = [](std::chrono::seconds duration) -> coro::task<int>
{
std::this_thread::sleep_for(duration);
co_return duration.count();
};

auto wait_for_task = [&](coro::thread_pool& pool, std::chrono::seconds delay) -> coro::task<>
{
co_await pool.schedule();
for (int i = 0; i < 5; ++i)
{
co_await sleep_for_task(delay);
std::cout << std::chrono::system_clock::now().time_since_epoch().count() << " wait for " << delay.count()
<< "seconds\n";
}
co_return;
};

coro::thread_pool pool{coro::thread_pool::options{.thread_count = 3}};
coro::sync_wait(
coro::when_all(wait_for_task(pool, std::chrono::seconds{1}), wait_for_task(pool, std::chrono::seconds{3})));
}

0 comments on commit c9bb47c

Please sign in to comment.