From 23277116c6286d5c4d9e91911ca9c1c31ae8594d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Fri, 17 Jan 2025 22:11:21 +0000 Subject: [PATCH 01/16] added a test for inline_scheduler --- tests/beman/lazy/CMakeLists.txt | 2 +- tests/beman/lazy/inline_scheduler.test.cpp | 47 ++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 tests/beman/lazy/inline_scheduler.test.cpp diff --git a/tests/beman/lazy/CMakeLists.txt b/tests/beman/lazy/CMakeLists.txt index df45cdd..f277703 100644 --- a/tests/beman/lazy/CMakeLists.txt +++ b/tests/beman/lazy/CMakeLists.txt @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -list(APPEND lazy_tests poly lazy) +list(APPEND lazy_tests inline_scheduler lazy poly) foreach(test ${lazy_tests}) add_executable(beman.lazy.tests.${test}) diff --git a/tests/beman/lazy/inline_scheduler.test.cpp b/tests/beman/lazy/inline_scheduler.test.cpp new file mode 100644 index 0000000..f560d6d --- /dev/null +++ b/tests/beman/lazy/inline_scheduler.test.cpp @@ -0,0 +1,47 @@ +// tests/beman/lazy/inline_scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution26; +namespace ly = beman::lazy; + +// ---------------------------------------------------------------------------- + +namespace { + struct receiver { + using receiver_concept = ex::receiver_t; + int &value; + + void set_value(int v) && noexcept { this->value = v; } + }; + static_assert(ex::receiver); +} + +int main() { + ly::detail::inline_scheduler sched; + static_assert(ex::scheduler); + + auto sched_sender{ex::schedule(sched)}; + static_assert(ex::sender); + + auto env{ex::get_env(sched_sender)}; + assert(sched == ex::get_completion_scheduler(env)); + + int value{}; + auto state{ + ex::connect(std::move(sched_sender) + | ex::then([]() noexcept { return 17; }) + , receiver(value)) + }; + static_assert(ex::operation_state); + + assert(value == 0); + ex::start(state); + assert(value == 17); +} \ No newline at end of file From 2925e31cba8e0dd1ac79d7813dd2c6e038edc3f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Fri, 17 Jan 2025 22:14:48 +0000 Subject: [PATCH 02/16] clang format --- tests/beman/lazy/inline_scheduler.test.cpp | 24 +++++++++------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tests/beman/lazy/inline_scheduler.test.cpp b/tests/beman/lazy/inline_scheduler.test.cpp index f560d6d..e5ab7c5 100644 --- a/tests/beman/lazy/inline_scheduler.test.cpp +++ b/tests/beman/lazy/inline_scheduler.test.cpp @@ -14,14 +14,14 @@ namespace ly = beman::lazy; // ---------------------------------------------------------------------------- namespace { - struct receiver { - using receiver_concept = ex::receiver_t; - int &value; +struct receiver { + using receiver_concept = ex::receiver_t; + int& value; - void set_value(int v) && noexcept { this->value = v; } - }; - static_assert(ex::receiver); -} + void set_value(int v) && noexcept { this->value = v; } +}; +static_assert(ex::receiver); +} // namespace int main() { ly::detail::inline_scheduler sched; @@ -33,15 +33,11 @@ int main() { auto env{ex::get_env(sched_sender)}; assert(sched == ex::get_completion_scheduler(env)); - int value{}; - auto state{ - ex::connect(std::move(sched_sender) - | ex::then([]() noexcept { return 17; }) - , receiver(value)) - }; + int value{}; + auto state{ex::connect(std::move(sched_sender) | ex::then([]() noexcept { return 17; }), receiver(value))}; static_assert(ex::operation_state); assert(value == 0); ex::start(state); assert(value == 17); -} \ No newline at end of file +} From 25a3a6a0eaee51e56e53450ca30a36c9ea6cd17b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Fri, 17 Jan 2025 22:18:47 +0000 Subject: [PATCH 03/16] fixed a clang issue --- tests/beman/lazy/inline_scheduler.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/lazy/inline_scheduler.test.cpp b/tests/beman/lazy/inline_scheduler.test.cpp index e5ab7c5..244e7a4 100644 --- a/tests/beman/lazy/inline_scheduler.test.cpp +++ b/tests/beman/lazy/inline_scheduler.test.cpp @@ -34,7 +34,7 @@ int main() { assert(sched == ex::get_completion_scheduler(env)); int value{}; - auto state{ex::connect(std::move(sched_sender) | ex::then([]() noexcept { return 17; }), receiver(value))}; + auto state{ex::connect(std::move(sched_sender) | ex::then([]() noexcept { return 17; }), receiver{value})}; static_assert(ex::operation_state); assert(value == 0); From 3f8025dad60c8efa27e07363fd3f9bdfb5d3759c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 00:32:51 +0000 Subject: [PATCH 04/16] added tests for any_scheduler --- include/beman/lazy/detail/any_scheduler.hpp | 50 +++--- include/beman/lazy/detail/lazy.hpp | 1 + include/beman/lazy/detail/poly.hpp | 1 + include/beman/lazy/detail/scheduler_of.hpp | 25 +++ tests/beman/lazy/CMakeLists.txt | 2 +- tests/beman/lazy/any_scheduler.test.cpp | 164 ++++++++++++++++++++ 6 files changed, 222 insertions(+), 21 deletions(-) create mode 100644 include/beman/lazy/detail/scheduler_of.hpp create mode 100644 tests/beman/lazy/any_scheduler.test.cpp diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index fe8dda4..eb4d32c 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -13,7 +13,7 @@ namespace beman::lazy::detail { -struct any_scheduler { +class any_scheduler { // TODO: add support for forwarding stop_tokens to the type-erased sender // TODO: other errors than std::exception_ptr should be supported struct state_base { @@ -75,18 +75,30 @@ struct any_scheduler { void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); } }; - struct env { + class sender; + class env { + friend class sender; + private: + sender const* sndr; + env(sender const* s): sndr(s) {} + + public: any_scheduler query( - const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept; + const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept { + return this->sndr->inner_sender->get_completion_scheduler(); + } }; // sender implementation - struct sender { + class sender { + friend class env; + private: struct base { virtual ~base() = default; virtual base* move(void*) = 0; virtual base* clone(void*) const = 0; virtual inner_state connect(state_base*) = 0; + virtual any_scheduler get_completion_scheduler() const = 0; }; template <::beman::execution26::scheduler Scheduler> struct concrete : base { @@ -98,14 +110,22 @@ struct any_scheduler { base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } base* clone(void* buffer) const override { return new (buffer) concrete(*this); } inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + any_scheduler get_completion_scheduler() const override { + return any_scheduler( + ::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>( + ::beman::execution26::get_env(this->sender) + ) + ); + } }; + poly inner_sender; + public: using sender_concept = ::beman::execution26::sender_t; using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), ::beman::execution26::set_error_t(std::exception_ptr), ::beman::execution26::set_stopped_t()>; - poly inner_sender; template <::beman::execution26::scheduler S> explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} @@ -117,7 +137,9 @@ struct any_scheduler { return state(std::forward(r), this->inner_sender); } - env get_env() const noexcept { return {}; } + env get_env() const noexcept { + return env(this); + } }; // scheduler implementation @@ -144,30 +166,18 @@ struct any_scheduler { poly scheduler; +public: using scheduler_concept = ::beman::execution26::scheduler_t; template requires(not std::same_as>) explicit any_scheduler(S&& s) : scheduler(static_cast>*>(nullptr), std::forward(s)) {} - any_scheduler(any_scheduler&& other) = default; - any_scheduler(const any_scheduler& other) = default; - sender schedule() { return this->scheduler->schedule(); } + sender schedule() { return this->scheduler->schedule(); } bool operator==(const any_scheduler&) const = default; }; static_assert(::beman::execution26::scheduler); -template -struct scheduler_of { - using type = ::beman::lazy::detail::any_scheduler; -}; -template - requires requires { typename Context::scheduler_type; } -struct scheduler_of { - using type = typename Context::scheduler_type; -}; -template -using scheduler_of_t = typename scheduler_of::type; } // namespace beman::lazy::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/lazy/detail/lazy.hpp b/include/beman/lazy/detail/lazy.hpp index 397c971..8a79626 100644 --- a/include/beman/lazy/detail/lazy.hpp +++ b/include/beman/lazy/detail/lazy.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/include/beman/lazy/detail/poly.hpp b/include/beman/lazy/detail/poly.hpp index 93f5be4..3e2b4ce 100644 --- a/include/beman/lazy/detail/poly.hpp +++ b/include/beman/lazy/detail/poly.hpp @@ -69,6 +69,7 @@ class alignas(sizeof(double)) poly { return other.pointer()->equals(this->pointer()); } Base* operator->() { return this->pointer(); } + Base const* operator->() const { return this->pointer(); } }; } // namespace beman::lazy::detail diff --git a/include/beman/lazy/detail/scheduler_of.hpp b/include/beman/lazy/detail/scheduler_of.hpp new file mode 100644 index 0000000..3a160e0 --- /dev/null +++ b/include/beman/lazy/detail/scheduler_of.hpp @@ -0,0 +1,25 @@ +// include/beman/lazy/detail/scheduler_of.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF +#define INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF + +// ---------------------------------------------------------------------------- + +namespace beman::lazy::detail { +template +struct scheduler_of { + using type = ::beman::lazy::detail::any_scheduler; +}; +template + requires requires { typename Context::scheduler_type; } +struct scheduler_of { + using type = typename Context::scheduler_type; +}; +template +using scheduler_of_t = typename scheduler_of::type; +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/tests/beman/lazy/CMakeLists.txt b/tests/beman/lazy/CMakeLists.txt index f277703..e94d145 100644 --- a/tests/beman/lazy/CMakeLists.txt +++ b/tests/beman/lazy/CMakeLists.txt @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -list(APPEND lazy_tests inline_scheduler lazy poly) +list(APPEND lazy_tests any_scheduler inline_scheduler lazy poly) foreach(test ${lazy_tests}) add_executable(beman.lazy.tests.${test}) diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp new file mode 100644 index 0000000..89e7c2a --- /dev/null +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -0,0 +1,164 @@ +// tests/beman/lazy/any_scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution26; +namespace ly = beman::lazy; + +// ---------------------------------------------------------------------------- + +namespace { + struct thread_context { + struct base { + base* next{}; + virtual ~base() = default; + virtual void complete() = 0; + }; + + std::thread thread; + std::mutex mutex; + std::condition_variable condition; + bool done{false}; + base* work{}; + + base* get_work() { + std::unique_lock cerberus(this->mutex); + condition.wait(cerberus, [this]{ return this->done || this->work; }); + base* rc{this->work}; + if (rc) { + this->work = rc->next; + } + std::cout << "get_work(rc=" << rc << " work=" << this->work << " done=" << std::boolalpha << this-> done << ")\n"; + return rc; + } + void enqueue(base* w) { + { + std::lock_guard cerberus(this->mutex); + w->next = std::exchange(this->work, w); + } + this->condition.notify_one(); + } + + thread_context(): thread([this]{ + while (auto w{this->get_work()}) { + std::cout << "calling complete\n" << std::flush; + w->complete(); + } + std::cout << "loop done\n" << std::flush; + }) { + } + ~thread_context() { + this->stop(); + this->thread.join(); + } + + struct scheduler { + using scheduler_concept = ex::scheduler_t; + thread_context* context; + bool operator== (scheduler const&) const = default; + + template + struct state: base { + using operation_state_concept = ex::operation_state_t; + + thread_context* ctxt; + std::remove_cvref_t receiver; + template + state(auto c, R&& r) + : ctxt(c), receiver(std::forward(r)) + { + } + void start() & noexcept { this->ctxt->enqueue(this); } + void complete() override { + ex::set_value(std::move(this->receiver)); + std::cout << "completing\n"; + } + }; + struct env { + thread_context* ctxt; + scheduler query(ex::get_completion_scheduler_t const&) const noexcept { + return scheduler{ctxt}; + } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; + + thread_context* ctxt; + + template + auto connect(Receiver&& receiver) { + static_assert(ex::operation_state>); + return state(this->ctxt, std::forward(receiver)); + } + env get_env() const noexcept { return { this->ctxt }; } + }; + static_assert(ex::sender); + + sender schedule() noexcept { return sender{this->context}; } + }; + static_assert(ex::scheduler); + + scheduler get_scheduler() { return scheduler{this}; } + void stop() { + { + std::lock_guard cerberus(this->mutex); + this->done = true; + } + this->condition.notify_one(); + } + }; +} + +// ---------------------------------------------------------------------------- + +int main() { + static_assert(ex::scheduler); + + thread_context ctxt1; + thread_context ctxt2; + + assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); + assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); + assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); + + ly::detail::any_scheduler sched1(ctxt1.get_scheduler()); + ly::detail::any_scheduler sched2(ctxt2.get_scheduler()); + assert(sched1 == sched1); + assert(sched2 == sched2); + assert(sched1 != sched2); + + ly::detail::any_scheduler copy(sched1); + assert(copy == sched1); + assert(copy != sched2); + ly::detail::any_scheduler move(std::move(copy)); + assert(move == sched1); + assert(move != sched2); + + copy = sched2; + assert(copy == sched2); + assert(copy != sched1); + + move = std::move(copy); + assert(move == sched2); + assert(move != sched1); + + std::thread::id id1{}; + std::thread::id id2{}; + ex::sync_wait(ex::schedule(sched1) | ex::then([&id1](){ id1 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched2) | ex::then([&id2](){ id2 = std::this_thread::get_id(); })); + assert(id1 != id2); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched1)) | ex::then([&id1](){ assert(id1 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched2)) | ex::then([&id2](){ assert(id2 == std::this_thread::get_id()); })); +} From f7ec73b33d40cb41b653c32687439d33e0e1d783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 00:33:32 +0000 Subject: [PATCH 05/16] clang-format --- include/beman/lazy/detail/any_scheduler.hpp | 37 ++-- include/beman/lazy/detail/poly.hpp | 2 +- include/beman/lazy/detail/scheduler_of.hpp | 2 +- tests/beman/lazy/any_scheduler.test.cpp | 190 ++++++++++---------- 4 files changed, 114 insertions(+), 117 deletions(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index eb4d32c..452e8e6 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -78,21 +78,23 @@ class any_scheduler { class sender; class env { friend class sender; - private: - sender const* sndr; - env(sender const* s): sndr(s) {} - - public: - any_scheduler query( - const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept { - return this->sndr->inner_sender->get_completion_scheduler(); - } + + private: + const sender* sndr; + env(const sender* s) : sndr(s) {} + + public: + any_scheduler query(const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) + const noexcept { + return this->sndr->inner_sender->get_completion_scheduler(); + } }; // sender implementation class sender { friend class env; - private: + + private: struct base { virtual ~base() = default; virtual base* move(void*) = 0; @@ -111,16 +113,13 @@ class any_scheduler { base* clone(void* buffer) const override { return new (buffer) concrete(*this); } inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } any_scheduler get_completion_scheduler() const override { - return any_scheduler( - ::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>( - ::beman::execution26::get_env(this->sender) - ) - ); + return any_scheduler(::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>( + ::beman::execution26::get_env(this->sender))); } }; poly inner_sender; - public: + public: using sender_concept = ::beman::execution26::sender_t; using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), @@ -137,9 +136,7 @@ class any_scheduler { return state(std::forward(r), this->inner_sender); } - env get_env() const noexcept { - return env(this); - } + env get_env() const noexcept { return env(this); } }; // scheduler implementation @@ -166,7 +163,7 @@ class any_scheduler { poly scheduler; -public: + public: using scheduler_concept = ::beman::execution26::scheduler_t; template diff --git a/include/beman/lazy/detail/poly.hpp b/include/beman/lazy/detail/poly.hpp index 3e2b4ce..e7ef2c0 100644 --- a/include/beman/lazy/detail/poly.hpp +++ b/include/beman/lazy/detail/poly.hpp @@ -69,7 +69,7 @@ class alignas(sizeof(double)) poly { return other.pointer()->equals(this->pointer()); } Base* operator->() { return this->pointer(); } - Base const* operator->() const { return this->pointer(); } + const Base* operator->() const { return this->pointer(); } }; } // namespace beman::lazy::detail diff --git a/include/beman/lazy/detail/scheduler_of.hpp b/include/beman/lazy/detail/scheduler_of.hpp index 3a160e0..e1ac388 100644 --- a/include/beman/lazy/detail/scheduler_of.hpp +++ b/include/beman/lazy/detail/scheduler_of.hpp @@ -18,7 +18,7 @@ struct scheduler_of { }; template using scheduler_of_t = typename scheduler_of::type; -} +} // namespace beman::lazy::detail // ---------------------------------------------------------------------------- diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index 89e7c2a..a378925 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -19,107 +19,105 @@ namespace ly = beman::lazy; // ---------------------------------------------------------------------------- namespace { - struct thread_context { - struct base { - base* next{}; - virtual ~base() = default; - virtual void complete() = 0; - }; +struct thread_context { + struct base { + base* next{}; + virtual ~base() = default; + virtual void complete() = 0; + }; - std::thread thread; - std::mutex mutex; - std::condition_variable condition; - bool done{false}; - base* work{}; - - base* get_work() { - std::unique_lock cerberus(this->mutex); - condition.wait(cerberus, [this]{ return this->done || this->work; }); - base* rc{this->work}; - if (rc) { - this->work = rc->next; - } - std::cout << "get_work(rc=" << rc << " work=" << this->work << " done=" << std::boolalpha << this-> done << ")\n"; - return rc; + std::thread thread; + std::mutex mutex; + std::condition_variable condition; + bool done{false}; + base* work{}; + + base* get_work() { + std::unique_lock cerberus(this->mutex); + condition.wait(cerberus, [this] { return this->done || this->work; }); + base* rc{this->work}; + if (rc) { + this->work = rc->next; } - void enqueue(base* w) { - { - std::lock_guard cerberus(this->mutex); - w->next = std::exchange(this->work, w); - } - this->condition.notify_one(); + std::cout << "get_work(rc=" << rc << " work=" << this->work << " done=" << std::boolalpha << this->done + << ")\n"; + return rc; + } + void enqueue(base* w) { + { + std::lock_guard cerberus(this->mutex); + w->next = std::exchange(this->work, w); } - - thread_context(): thread([this]{ - while (auto w{this->get_work()}) { - std::cout << "calling complete\n" << std::flush; - w->complete(); + this->condition.notify_one(); + } + + thread_context() + : thread([this] { + while (auto w{this->get_work()}) { + std::cout << "calling complete\n" << std::flush; + w->complete(); + } + std::cout << "loop done\n" << std::flush; + }) {} + ~thread_context() { + this->stop(); + this->thread.join(); + } + + struct scheduler { + using scheduler_concept = ex::scheduler_t; + thread_context* context; + bool operator==(const scheduler&) const = default; + + template + struct state : base { + using operation_state_concept = ex::operation_state_t; + + thread_context* ctxt; + std::remove_cvref_t receiver; + template + state(auto c, R&& r) : ctxt(c), receiver(std::forward(r)) {} + void start() & noexcept { this->ctxt->enqueue(this); } + void complete() override { + ex::set_value(std::move(this->receiver)); + std::cout << "completing\n"; } - std::cout << "loop done\n" << std::flush; - }) { - } - ~thread_context() { - this->stop(); - this->thread.join(); - } + }; + struct env { + thread_context* ctxt; + scheduler query(const ex::get_completion_scheduler_t&) const noexcept { + return scheduler{ctxt}; + } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = ex::completion_signatures; - struct scheduler { - using scheduler_concept = ex::scheduler_t; - thread_context* context; - bool operator== (scheduler const&) const = default; + thread_context* ctxt; template - struct state: base { - using operation_state_concept = ex::operation_state_t; - - thread_context* ctxt; - std::remove_cvref_t receiver; - template - state(auto c, R&& r) - : ctxt(c), receiver(std::forward(r)) - { - } - void start() & noexcept { this->ctxt->enqueue(this); } - void complete() override { - ex::set_value(std::move(this->receiver)); - std::cout << "completing\n"; - } - }; - struct env { - thread_context* ctxt; - scheduler query(ex::get_completion_scheduler_t const&) const noexcept { - return scheduler{ctxt}; - } - }; - struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; - - thread_context* ctxt; - - template - auto connect(Receiver&& receiver) { - static_assert(ex::operation_state>); - return state(this->ctxt, std::forward(receiver)); - } - env get_env() const noexcept { return { this->ctxt }; } - }; - static_assert(ex::sender); - - sender schedule() noexcept { return sender{this->context}; } + auto connect(Receiver&& receiver) { + static_assert(ex::operation_state>); + return state(this->ctxt, std::forward(receiver)); + } + env get_env() const noexcept { return {this->ctxt}; } }; - static_assert(ex::scheduler); + static_assert(ex::sender); - scheduler get_scheduler() { return scheduler{this}; } - void stop() { - { - std::lock_guard cerberus(this->mutex); - this->done = true; - } - this->condition.notify_one(); - } + sender schedule() noexcept { return sender{this->context}; } }; -} + static_assert(ex::scheduler); + + scheduler get_scheduler() { return scheduler{this}; } + void stop() { + { + std::lock_guard cerberus(this->mutex); + this->done = true; + } + this->condition.notify_one(); + } +}; +} // namespace // ---------------------------------------------------------------------------- @@ -156,9 +154,11 @@ int main() { std::thread::id id1{}; std::thread::id id2{}; - ex::sync_wait(ex::schedule(sched1) | ex::then([&id1](){ id1 = std::this_thread::get_id(); })); - ex::sync_wait(ex::schedule(sched2) | ex::then([&id2](){ id2 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); assert(id1 != id2); - ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched1)) | ex::then([&id1](){ assert(id1 == std::this_thread::get_id()); })); - ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched2)) | ex::then([&id2](){ assert(id2 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched1)) | + ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched2)) | + ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); } From 3185292fe19cc8ace7bf2b5ff8736b0a2428861a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 00:52:59 +0000 Subject: [PATCH 06/16] fixed a race --- include/beman/lazy/detail/any_scheduler.hpp | 16 ++++++++-------- include/beman/lazy/detail/poly.hpp | 2 +- tests/beman/lazy/any_scheduler.test.cpp | 15 ++++----------- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index 452e8e6..4d7ffbf 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -96,10 +96,10 @@ class any_scheduler { private: struct base { - virtual ~base() = default; - virtual base* move(void*) = 0; - virtual base* clone(void*) const = 0; - virtual inner_state connect(state_base*) = 0; + virtual ~base() = default; + virtual base* move(void*) = 0; + virtual base* clone(void*) const = 0; + virtual inner_state connect(state_base*) = 0; virtual any_scheduler get_completion_scheduler() const = 0; }; template <::beman::execution26::scheduler Scheduler> @@ -109,9 +109,9 @@ class any_scheduler { template <::beman::execution26::scheduler S> concrete(S&& s) : sender(::beman::execution26::schedule(std::forward(s))) {} - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } any_scheduler get_completion_scheduler() const override { return any_scheduler(::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>( ::beman::execution26::get_env(this->sender))); @@ -171,7 +171,7 @@ class any_scheduler { explicit any_scheduler(S&& s) : scheduler(static_cast>*>(nullptr), std::forward(s)) {} sender schedule() { return this->scheduler->schedule(); } - bool operator==(const any_scheduler&) const = default; + bool operator==(const any_scheduler&) const = default; }; static_assert(::beman::execution26::scheduler); diff --git a/include/beman/lazy/detail/poly.hpp b/include/beman/lazy/detail/poly.hpp index e7ef2c0..4b02217 100644 --- a/include/beman/lazy/detail/poly.hpp +++ b/include/beman/lazy/detail/poly.hpp @@ -68,7 +68,7 @@ class alignas(sizeof(double)) poly { { return other.pointer()->equals(this->pointer()); } - Base* operator->() { return this->pointer(); } + Base* operator->() { return this->pointer(); } const Base* operator->() const { return this->pointer(); } }; } // namespace beman::lazy::detail diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index a378925..0bc2fb6 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -39,8 +39,6 @@ struct thread_context { if (rc) { this->work = rc->next; } - std::cout << "get_work(rc=" << rc << " work=" << this->work << " done=" << std::boolalpha << this->done - << ")\n"; return rc; } void enqueue(base* w) { @@ -54,10 +52,8 @@ struct thread_context { thread_context() : thread([this] { while (auto w{this->get_work()}) { - std::cout << "calling complete\n" << std::flush; w->complete(); } - std::cout << "loop done\n" << std::flush; }) {} ~thread_context() { this->stop(); @@ -78,10 +74,7 @@ struct thread_context { template state(auto c, R&& r) : ctxt(c), receiver(std::forward(r)) {} void start() & noexcept { this->ctxt->enqueue(this); } - void complete() override { - ex::set_value(std::move(this->receiver)); - std::cout << "completing\n"; - } + void complete() override { ex::set_value(std::move(this->receiver)); } }; struct env { thread_context* ctxt; @@ -152,8 +145,8 @@ int main() { assert(move == sched2); assert(move != sched1); - std::thread::id id1{}; - std::thread::id id2{}; + std::atomic id1{}; + std::atomic id2{}; ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); assert(id1 != id2); From 9c49c3c8d758784ab1280b4463ba3701e075aade Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 01:57:10 +0000 Subject: [PATCH 07/16] fixed another race --- tests/beman/lazy/any_scheduler.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index 0bc2fb6..3d8d9b6 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -26,11 +26,11 @@ struct thread_context { virtual void complete() = 0; }; - std::thread thread; std::mutex mutex; std::condition_variable condition; bool done{false}; base* work{}; + std::thread thread; base* get_work() { std::unique_lock cerberus(this->mutex); From ce941d2b3f81a0f523d4bcea06209a74598dcc1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 10:48:42 +0000 Subject: [PATCH 08/16] updated to pull a newer execution26 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c5bdb1f..ec970fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,7 @@ FetchContent_Declare( execution26 # for local development, use SOURCE_DIR /execution26 GIT_REPOSITORY https://github.com/bemanproject/execution26 - GIT_TAG b52f28c + GIT_TAG a7ee8c8 ) FetchContent_MakeAvailable(execution26) From cb9f48222c1f74db72eebabd5aaf9e35f31c6e14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 11:00:25 +0000 Subject: [PATCH 09/16] invoke cmake via presets --- Makefile | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 8fcdcd2..a85eaa5 100644 --- a/Makefile +++ b/Makefile @@ -2,16 +2,17 @@ BUILDDIR = build -default: test +default: compile compile: config - cmake --build $(BUILDDIR) -j + #cmake --build $(BUILDDIR) -j + cmake --workflow --preset=appleclang-debug --fresh format: git clang-format main config: - cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR) + #CXXFLAGS=-fsanitize=thread cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR) test: compile cd $(BUILDDIR); ctest From 3c7afcf84c00828bf6901af62e9b2a0f3f63e761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 11:32:53 +0000 Subject: [PATCH 10/16] try to work around a TSan bug --- .github/workflows/ci_tests.yml | 1 + Makefile | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 4fe7076..9505ec7 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -209,6 +209,7 @@ jobs: sudo apt-get update sudo apt-get install -y $CC sudo apt-get install -y $CXX + sudo sysctl vm.mmap_rnd_bits=28 $CC --version $CXX --version diff --git a/Makefile b/Makefile index a85eaa5..0956a84 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ default: compile compile: config #cmake --build $(BUILDDIR) -j - cmake --workflow --preset=appleclang-debug --fresh + cmake --workflow --preset=gcc-debug --fresh format: git clang-format main From 492ab029f5bedade446468d616ede2147d840970 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 11:39:06 +0000 Subject: [PATCH 11/16] disable TSan on Linux for now --- .github/workflows/ci_tests.yml | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 9505ec7..49b7e2f 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -88,6 +88,16 @@ jobs: description: "Windows MSVC" cmake_args: description: "TSan" + # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping + - platform: + description: "Ubuntu GCC" + cmake_args: + description: "TSan" + # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping + - platform: + description: "Ubuntu LLVM" + cmake_args: + description: "TSan" name: "Unit: ${{ matrix.platform.description }} ${{ matrix.cpp_version }} ${{ matrix.cmake_args.description }}" runs-on: ${{ matrix.platform.os }} @@ -209,7 +219,6 @@ jobs: sudo apt-get update sudo apt-get install -y $CC sudo apt-get install -y $CXX - sudo sysctl vm.mmap_rnd_bits=28 $CC --version $CXX --version From d584e16c0056786eb151c111d80618a78926cd11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 11:42:55 +0000 Subject: [PATCH 12/16] another attempt to avoid the TSan issue --- .github/workflows/ci_tests.yml | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 49b7e2f..103b82e 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -61,8 +61,9 @@ jobs: cpp_version: [20, 23, 26] cmake_args: - description: "Default" - - description: "TSan" - args: "-DBEMAN_BUILDSYS_SANITIZER=TSan" + # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping + # - description: "TSan" + # args: "-DBEMAN_BUILDSYS_SANITIZER=TSan" - description: "MaxSan" args: "-DBEMAN_BUILDSYS_SANITIZER=MaxSan" include: @@ -88,16 +89,6 @@ jobs: description: "Windows MSVC" cmake_args: description: "TSan" - # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping - - platform: - description: "Ubuntu GCC" - cmake_args: - description: "TSan" - # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping - - platform: - description: "Ubuntu LLVM" - cmake_args: - description: "TSan" name: "Unit: ${{ matrix.platform.description }} ${{ matrix.cpp_version }} ${{ matrix.cmake_args.description }}" runs-on: ${{ matrix.platform.os }} From 63ed1b6dd23b7d207600bee3d3a6b097d2109854 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 13:58:41 +0000 Subject: [PATCH 13/16] added error_code support and forwarding of stop tokens --- Makefile | 2 +- include/beman/lazy/detail/any_scheduler.hpp | 50 ++++++-- tests/beman/lazy/any_scheduler.test.cpp | 123 ++++++++++++++++++-- 3 files changed, 155 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index 0956a84..aab7d29 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ default: compile compile: config #cmake --build $(BUILDDIR) -j - cmake --workflow --preset=gcc-debug --fresh + cmake --workflow --preset=appleclang-debug format: git clang-format main diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index 4d7ffbf..fd09fbd 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include // ---------------------------------------------------------------------------- @@ -14,30 +15,33 @@ namespace beman::lazy::detail { class any_scheduler { - // TODO: add support for forwarding stop_tokens to the type-erased sender - // TODO: other errors than std::exception_ptr should be supported struct state_base { virtual ~state_base() = default; virtual void complete_value() = 0; - virtual void complete_error(std::exception_ptr) = 0; + virtual void complete_error(::std::error_code) = 0; + virtual void complete_error(::std::exception_ptr) = 0; virtual void complete_stopped() = 0; + virtual ::beman::execution26::inplace_stop_token get_stop_token() = 0; }; struct inner_state { + struct receiver; + struct env { + state_base* state; + auto query(::beman::execution26::get_stop_token_t) const noexcept { return this->state->get_stop_token(); } + }; struct receiver { using receiver_concept = ::beman::execution26::receiver_t; state_base* state; void set_value() && noexcept { this->state->complete_value(); } + void set_error(std::error_code err) && noexcept { this->state->complete_error(err); } void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); } template - void set_error(E e) { - try { - throw std::move(e); - } catch (...) { - this->state->complete_error(std::current_exception()); - } + void set_error(E e) && noexcept { + this->state->complete_error(std::make_exception_ptr(std::move(e))); } void set_stopped() && noexcept { this->state->complete_stopped(); } + env get_env() const noexcept { return {this->state}; } }; static_assert(::beman::execution26::receiver); @@ -53,7 +57,7 @@ class any_scheduler { concrete(S&& s, state_base* b) : state(::beman::execution26::connect(std::forward(s), receiver{b})) {} void start() override { ::beman::execution26::start(state); } }; - ::beman::lazy::detail::poly state; + ::beman::lazy::detail::poly state; template <::beman::execution26::sender S> inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} void start() { this->state->start(); } @@ -62,17 +66,42 @@ class any_scheduler { template <::beman::execution26::receiver Receiver> struct state : state_base { using operation_state_concept = ::beman::execution26::operation_state_t; + struct stopper { + state* st; + void operator()() noexcept { + state* self = this->st; + self->callback.reset(); + self->source.request_stop(); + } + }; + using token_t = + decltype(::beman::execution26::get_stop_token(::beman::execution26::get_env(std::declval()))); + using callback_t = ::beman::execution26::stop_callback_for_t; std::remove_cvref_t receiver; inner_state s; + ::beman::execution26::inplace_stop_source source; + ::std::optional callback; + template <::beman::execution26::receiver R, typename PS> state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} void start() & noexcept { this->s.start(); } void complete_value() override { ::beman::execution26::set_value(std::move(this->receiver)); } + void complete_error(std::error_code err) override { + ::beman::execution26::set_error(std::move(receiver), err); + } void complete_error(std::exception_ptr ptr) override { ::beman::execution26::set_error(std::move(receiver), std::move(ptr)); } void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); } + ::beman::execution26::inplace_stop_token get_stop_token() override { + if (not this->callback) { + this->callback.emplace( + ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), + stopper{this}); + } + return this->source.get_token(); + } }; class sender; @@ -123,6 +152,7 @@ class any_scheduler { using sender_concept = ::beman::execution26::sender_t; using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t(std::error_code), ::beman::execution26::set_error_t(std::exception_ptr), ::beman::execution26::set_stopped_t()>; diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index 3d8d9b6..c914f35 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include #include @@ -20,6 +22,7 @@ namespace ly = beman::lazy; namespace { struct thread_context { + enum class complete { success, failure, exception, never }; struct base { base* next{}; virtual ~base() = default; @@ -63,18 +66,46 @@ struct thread_context { struct scheduler { using scheduler_concept = ex::scheduler_t; thread_context* context; + complete cmpl{complete::success}; bool operator==(const scheduler&) const = default; template struct state : base { + struct stopper { + state* st; + void operator()() noexcept { + auto self{this->st}; + self->callback.reset(); + ex::set_stopped(std::move(self->receiver)); + } + }; using operation_state_concept = ex::operation_state_t; + using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); + using callback_t = ex::stop_callback_for_t; thread_context* ctxt; std::remove_cvref_t receiver; + thread_context::complete cmpl; + std::optional callback; + template - state(auto c, R&& r) : ctxt(c), receiver(std::forward(r)) {} - void start() & noexcept { this->ctxt->enqueue(this); } - void complete() override { ex::set_value(std::move(this->receiver)); } + state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} + void start() & noexcept { + callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); + if (cmpl != thread_context::complete::never) + this->ctxt->enqueue(this); + } + void complete() override { + this->callback.reset(); + if (this->cmpl == thread_context::complete::success) + ex::set_value(std::move(this->receiver)); + else if (this->cmpl == thread_context::complete::failure) + ex::set_error(std::move(this->receiver), std::make_error_code(std::errc::address_in_use)); + else + ex::set_error( + std::move(this->receiver), + std::make_exception_ptr(std::system_error(std::make_error_code(std::errc::address_in_use)))); + } }; struct env { thread_context* ctxt; @@ -83,25 +114,27 @@ struct thread_context { } }; struct sender { - using sender_concept = ex::sender_t; - using completion_signatures = ex::completion_signatures; + using sender_concept = ex::sender_t; + using completion_signatures = + ex::completion_signatures; - thread_context* ctxt; + thread_context* ctxt; + thread_context::complete cmpl; template auto connect(Receiver&& receiver) { static_assert(ex::operation_state>); - return state(this->ctxt, std::forward(receiver)); + return state(this->ctxt, std::forward(receiver), this->cmpl); } env get_env() const noexcept { return {this->ctxt}; } }; static_assert(ex::sender); - sender schedule() noexcept { return sender{this->context}; } + sender schedule() noexcept { return sender{this->context, this->cmpl}; } }; static_assert(ex::scheduler); - scheduler get_scheduler() { return scheduler{this}; } + scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } void stop() { { std::lock_guard cerberus(this->mutex); @@ -110,6 +143,24 @@ struct thread_context { this->condition.notify_one(); } }; + +enum class stop_result { none, success, failure, stopped }; +struct stop_env { + ex::inplace_stop_token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } +}; +struct stop_receiver { + using receiver_concept = ex::receiver_t; + ex::inplace_stop_token token; + stop_result& result; + stop_env get_env() const noexcept { return {this->token}; } + + void set_value(auto&&...) && noexcept { this->result = stop_result::success; } + void set_error(auto&&) && noexcept { this->result = stop_result::failure; } + void set_stopped() && noexcept { this->result = stop_result::stopped; } +}; +static_assert(ex::receiver); + } // namespace // ---------------------------------------------------------------------------- @@ -154,4 +205,58 @@ int main() { ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched2)) | ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); + + { + bool success{false}; + bool failed{false}; + bool exception{false}; + ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::failure)) | + ex::then([&success] { success = true; }) | ex::upon_error([&failed, &exception](auto err) { + if constexpr (std::same_as) + failed = true; + else if constexpr (std::same_as) + exception = true; + })); + assert(not success); + assert(failed); + assert(not exception); + } + { + bool success{false}; + bool failed{false}; + bool exception{false}; + ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::exception)) | + ex::then([&success] { success = true; }) | ex::upon_error([&failed, &exception](auto err) { + if constexpr (std::same_as) + failed = true; + else if constexpr (std::same_as) + exception = true; + })); + assert(not success); + assert(not failed); + assert(exception); + } + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), + stop_receiver(source.get_token(), result))}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ + ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), + stop_receiver(source.get_token(), result))}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } } From 2c182dad031d4800f7fcbe595a69a9138d16b1ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 14:37:54 +0000 Subject: [PATCH 14/16] fixed a test explicitly using never stop token --- include/beman/lazy/detail/any_scheduler.hpp | 16 +++-- tests/beman/lazy/any_scheduler.test.cpp | 70 +++++++++++++++++---- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index fd09fbd..f3e74ce 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -95,12 +95,18 @@ class any_scheduler { } void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); } ::beman::execution26::inplace_stop_token get_stop_token() override { - if (not this->callback) { - this->callback.emplace( - ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), - stopper{this}); + if constexpr (::std::same_as) { + return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)); + } else { + if constexpr (not::std::same_as) { + if (not this->callback) { + this->callback.emplace( + ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), + stopper{this}); + } + } + return this->source.get_token(); } - return this->source.get_token(); } }; diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp index c914f35..3dbf26e 100644 --- a/tests/beman/lazy/any_scheduler.test.cpp +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -4,7 +4,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -29,6 +31,7 @@ struct thread_context { virtual void complete() = 0; }; + std::latch stop_done{1u}; std::mutex mutex; std::condition_variable condition; bool done{false}; @@ -57,6 +60,7 @@ struct thread_context { while (auto w{this->get_work()}) { w->complete(); } + this->stop_done.count_down(); }) {} ~thread_context() { this->stop(); @@ -141,25 +145,46 @@ struct thread_context { this->done = true; } this->condition.notify_one(); + this->stop_done.wait(); } }; enum class stop_result { none, success, failure, stopped }; +template struct stop_env { - ex::inplace_stop_token token; - auto query(ex::get_stop_token_t) const noexcept { return this->token; } + Token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } }; +template +stop_env(Token&&) -> stop_env>; + +template struct stop_receiver { using receiver_concept = ex::receiver_t; - ex::inplace_stop_token token; - stop_result& result; - stop_env get_env() const noexcept { return {this->token}; } + Token token; + stop_result& result; + std::latch* completed{}; + auto get_env() const noexcept { return stop_env{this->token}; } - void set_value(auto&&...) && noexcept { this->result = stop_result::success; } - void set_error(auto&&) && noexcept { this->result = stop_result::failure; } - void set_stopped() && noexcept { this->result = stop_result::stopped; } + void set_value(auto&&...) && noexcept { + this->result = stop_result::success; + if (this->completed) + this->completed->count_down(); + } + void set_error(auto&&) && noexcept { + this->result = stop_result::failure; + if (this->completed) + this->completed->count_down(); + } + void set_stopped() && noexcept { + this->result = stop_result::stopped; + if (this->completed) + this->completed->count_down(); + } }; -static_assert(ex::receiver); +template +stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; +static_assert(ex::receiver>); } // namespace @@ -240,7 +265,7 @@ int main() { ex::inplace_stop_source source; stop_result result{stop_result::none}; auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), - stop_receiver(source.get_token(), result))}; + stop_receiver{source.get_token(), result})}; assert(result == stop_result::none); ex::start(state); assert(result == stop_result::none); @@ -252,11 +277,34 @@ int main() { stop_result result{stop_result::none}; auto state{ ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), - stop_receiver(source.get_token(), result))}; + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + ex::stop_source source; + stop_result result{stop_result::none}; + auto state{ + ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), + stop_receiver{source.get_token(), result})}; assert(result == stop_result::none); ex::start(state); assert(result == stop_result::none); source.request_stop(); assert(result == stop_result::stopped); } + { + std::latch completed{1}; + stop_result result{stop_result::none}; + auto state{ex::connect( + ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), + stop_receiver{ex::never_stop_token(), result, &completed})}; + assert(result == stop_result::none); + ex::start(state); + completed.wait(); + assert(result == stop_result::success); + } } From a4880a13263bc68ead43de88bc0f09642030e8cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 15:28:39 +0000 Subject: [PATCH 15/16] added some documentation for the schedulers --- include/beman/lazy/detail/any_scheduler.hpp | 32 ++++++++++++++++--- .../beman/lazy/detail/inline_scheduler.hpp | 15 +++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index f3e74ce..d937866 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -14,13 +14,35 @@ namespace beman::lazy::detail { +/*! + * \brief Type-erasing scheduler + * \headerfile beman/lazy/lazy.hpp + * + * The class `any_scheduler` is used to type-erase any scheduler class. + * Any error produced by the underlying scheduler except `std::error_code` is turned into + * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `any_scheduler` + * forwards stop requests reported by the stop token obtained from the `connect`ed + * receiver to the sender used by the underlying scheduler. + * + * Completion signatures: + * + * - `ex::set_value_t()` + * - `ex::set_error_t(std::error_code)` + * - `ex::set_error_t(std::exception_ptr)` + * - `ex::set_stopped()` + * + * Usage: + * + * any_scheduler sched(other_scheduler); + * auto sender{ex::schedule(sched) | some_sender}; + */ class any_scheduler { struct state_base { - virtual ~state_base() = default; - virtual void complete_value() = 0; + virtual ~state_base() = default; + virtual void complete_value() = 0; virtual void complete_error(::std::error_code) = 0; virtual void complete_error(::std::exception_ptr) = 0; - virtual void complete_stopped() = 0; + virtual void complete_stopped() = 0; virtual ::beman::execution26::inplace_stop_token get_stop_token() = 0; }; @@ -78,8 +100,8 @@ class any_scheduler { decltype(::beman::execution26::get_stop_token(::beman::execution26::get_env(std::declval()))); using callback_t = ::beman::execution26::stop_callback_for_t; - std::remove_cvref_t receiver; - inner_state s; + std::remove_cvref_t receiver; + inner_state s; ::beman::execution26::inplace_stop_source source; ::std::optional callback; diff --git a/include/beman/lazy/detail/inline_scheduler.hpp b/include/beman/lazy/detail/inline_scheduler.hpp index 9577576..e15596c 100644 --- a/include/beman/lazy/detail/inline_scheduler.hpp +++ b/include/beman/lazy/detail/inline_scheduler.hpp @@ -11,6 +11,21 @@ // ---------------------------------------------------------------------------- namespace beman::lazy::detail { +/*! + * \brief Scheduler completing immmediately when started on the same thread + * \headerfile beman/lazy/lazy.hpp + * + * The class `inline_scheduler` is used to prevent any actual schedulering. + * It does have a scheduler interface but it completes synchronously on + * the thread on which it gets `start`ed before returning from `start`. + * The implication is that any blocking working gets executed on the + * calling thread. Also, if there is lot of synchronous work repeatedly + * getting scheduled using `inline_scheduler` it is possible to get a + * stack overflow. + * + * In general, any use of `inline_scheduler` should receive a lot of + * attention as it is fairly easy to create subtle bugs using this scheduler. + */ struct inline_scheduler { struct env { inline_scheduler From 1ffb49e4f9dbfb846cc5982e719e6e45cfea6b25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Sat, 18 Jan 2025 15:31:11 +0000 Subject: [PATCH 16/16] fixed a formatting issue --- include/beman/lazy/detail/any_scheduler.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index d937866..875fd65 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -120,7 +120,7 @@ class any_scheduler { if constexpr (::std::same_as) { return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)); } else { - if constexpr (not::std::same_as) { + if constexpr (not ::std::same_as) { if (not this->callback) { this->callback.emplace( ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)),