From 2c182dad031d4800f7fcbe595a69a9138d16b1ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 14:37:54 +0000 Subject: [PATCH] fixed a test explicitly using never stop token --- include/beman/lazy/detail/any_scheduler.hpp | 16 +++-- tests/beman/lazy/any_scheduler.test.cpp | 70 +++++++++++++++++---- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index fd09fbd..f3e74ce 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -95,12 +95,18 @@ class any_scheduler { } void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); } ::beman::execution26::inplace_stop_token get_stop_token() override { - if (not this->callback) { - this->callback.emplace( - ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), - stopper{this}); + if constexpr (::std::same_as) { + return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)); + } else { + if constexpr (not::std::same_as) { + if (not this->callback) { + this->callback.emplace( + ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), + stopper{this}); + } + } + return this->source.get_token(); } - return this->source.get_token(); } }; diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index c914f35..3dbf26e 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -4,7 +4,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -29,6 +31,7 @@ struct thread_context { virtual void complete() = 0; }; + std::latch stop_done{1u}; std::mutex mutex; std::condition_variable condition; bool done{false}; @@ -57,6 +60,7 @@ struct thread_context { while (auto w{this->get_work()}) { w->complete(); } + this->stop_done.count_down(); }) {} ~thread_context() { this->stop(); @@ -141,25 +145,46 @@ struct thread_context { this->done = true; } this->condition.notify_one(); + this->stop_done.wait(); } }; enum class stop_result { none, success, failure, stopped }; +template struct stop_env { - ex::inplace_stop_token token; - auto query(ex::get_stop_token_t) const noexcept { return this->token; } + Token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } }; +template +stop_env(Token&&) -> stop_env>; + +template struct stop_receiver { using receiver_concept = ex::receiver_t; - ex::inplace_stop_token token; - stop_result& result; - stop_env get_env() const noexcept { return {this->token}; } + Token token; + stop_result& result; + std::latch* completed{}; + auto get_env() const noexcept { return stop_env{this->token}; } - void set_value(auto&&...) && noexcept { this->result = stop_result::success; } - void set_error(auto&&) && noexcept { this->result = stop_result::failure; } - void set_stopped() && noexcept { this->result = stop_result::stopped; } + void set_value(auto&&...) && noexcept { + this->result = stop_result::success; + if (this->completed) + this->completed->count_down(); + } + void set_error(auto&&) && noexcept { + this->result = stop_result::failure; + if (this->completed) + this->completed->count_down(); + } + void set_stopped() && noexcept { + this->result = stop_result::stopped; + if (this->completed) + this->completed->count_down(); + } }; -static_assert(ex::receiver); +template +stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; +static_assert(ex::receiver>); } // namespace @@ -240,7 +265,7 @@ int main() { ex::inplace_stop_source source; stop_result result{stop_result::none}; auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), - stop_receiver(source.get_token(), result))}; + stop_receiver{source.get_token(), result})}; assert(result == stop_result::none); ex::start(state); assert(result == stop_result::none); @@ -252,11 +277,34 @@ int main() { stop_result result{stop_result::none}; auto state{ ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver(source.get_token(), result))}; + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + ex::stop_source source; + stop_result result{stop_result::none}; + auto state{ + ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), + stop_receiver{source.get_token(), result})}; assert(result == stop_result::none); ex::start(state); assert(result == stop_result::none); source.request_stop(); assert(result == stop_result::stopped); } + { + std::latch completed{1}; + stop_result result{stop_result::none}; + auto state{ex::connect( + ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), + stop_receiver{ex::never_stop_token(), result, &completed})}; + assert(result == stop_result::none); + ex::start(state); + completed.wait(); + assert(result == stop_result::success); + } }