diff --git a/.github/workflows/ci_tests.yml b/.github/workflows/ci_tests.yml index 4fe7076..103b82e 100644 --- a/.github/workflows/ci_tests.yml +++ b/.github/workflows/ci_tests.yml @@ -61,8 +61,9 @@ jobs: cpp_version: [20, 23, 26] cmake_args: - description: "Default" - - description: "TSan" - args: "-DBEMAN_BUILDSYS_SANITIZER=TSan" + # Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping + # - description: "TSan" + # args: "-DBEMAN_BUILDSYS_SANITIZER=TSan" - description: "MaxSan" args: "-DBEMAN_BUILDSYS_SANITIZER=MaxSan" include: diff --git a/CMakeLists.txt b/CMakeLists.txt index c5bdb1f..ec970fa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,7 @@ FetchContent_Declare( execution26 # for local development, use SOURCE_DIR /execution26 GIT_REPOSITORY https://github.com/bemanproject/execution26 - GIT_TAG b52f28c + GIT_TAG a7ee8c8 ) FetchContent_MakeAvailable(execution26) diff --git a/Makefile b/Makefile index 8fcdcd2..aab7d29 100644 --- a/Makefile +++ b/Makefile @@ -2,16 +2,17 @@ BUILDDIR = build -default: test +default: compile compile: config - cmake --build $(BUILDDIR) -j + #cmake --build $(BUILDDIR) -j + cmake --workflow --preset=appleclang-debug format: git clang-format main config: - cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR) + #CXXFLAGS=-fsanitize=thread cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR) test: compile cd $(BUILDDIR); ctest diff --git a/include/beman/lazy/detail/any_scheduler.hpp b/include/beman/lazy/detail/any_scheduler.hpp index fe8dda4..875fd65 100644 --- a/include/beman/lazy/detail/any_scheduler.hpp +++ b/include/beman/lazy/detail/any_scheduler.hpp @@ -7,37 +7,63 @@ #include #include #include +#include #include // ---------------------------------------------------------------------------- namespace beman::lazy::detail { -struct any_scheduler { - // TODO: add support for forwarding stop_tokens to the type-erased sender - // TODO: other errors than std::exception_ptr should be supported +/*! + * \brief Type-erasing scheduler + * \headerfile beman/lazy/lazy.hpp + * + * The class `any_scheduler` is used to type-erase any scheduler class. + * Any error produced by the underlying scheduler except `std::error_code` is turned into + * an `std::exception_ptr`. `std::error_code` is forwarded as is. The `any_scheduler` + * forwards stop requests reported by the stop token obtained from the `connect`ed + * receiver to the sender used by the underlying scheduler. + * + * Completion signatures: + * + * - `ex::set_value_t()` + * - `ex::set_error_t(std::error_code)` + * - `ex::set_error_t(std::exception_ptr)` + * - `ex::set_stopped()` + * + * Usage: + * + * any_scheduler sched(other_scheduler); + * auto sender{ex::schedule(sched) | some_sender}; + */ +class any_scheduler { struct state_base { - virtual ~state_base() = default; - virtual void complete_value() = 0; - virtual void complete_error(std::exception_ptr) = 0; - virtual void complete_stopped() = 0; + virtual ~state_base() = default; + virtual void complete_value() = 0; + virtual void complete_error(::std::error_code) = 0; + virtual void complete_error(::std::exception_ptr) = 0; + virtual void complete_stopped() = 0; + virtual ::beman::execution26::inplace_stop_token get_stop_token() = 0; }; struct inner_state { + struct receiver; + struct env { + state_base* state; + auto query(::beman::execution26::get_stop_token_t) const noexcept { return this->state->get_stop_token(); } + }; struct receiver { using receiver_concept = ::beman::execution26::receiver_t; state_base* state; void set_value() && noexcept { this->state->complete_value(); } + void set_error(std::error_code err) && noexcept { this->state->complete_error(err); } void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); } template - void set_error(E e) { - try { - throw std::move(e); - } catch (...) { - this->state->complete_error(std::current_exception()); - } + void set_error(E e) && noexcept { + this->state->complete_error(std::make_exception_ptr(std::move(e))); } void set_stopped() && noexcept { this->state->complete_stopped(); } + env get_env() const noexcept { return {this->state}; } }; static_assert(::beman::execution26::receiver); @@ -53,7 +79,7 @@ struct any_scheduler { concrete(S&& s, state_base* b) : state(::beman::execution26::connect(std::forward(s), receiver{b})) {} void start() override { ::beman::execution26::start(state); } }; - ::beman::lazy::detail::poly state; + ::beman::lazy::detail::poly state; template <::beman::execution26::sender S> inner_state(S&& s, state_base* b) : state(static_cast*>(nullptr), std::forward(s), b) {} void start() { this->state->start(); } @@ -62,31 +88,76 @@ struct any_scheduler { template <::beman::execution26::receiver Receiver> struct state : state_base { using operation_state_concept = ::beman::execution26::operation_state_t; + struct stopper { + state* st; + void operator()() noexcept { + state* self = this->st; + self->callback.reset(); + self->source.request_stop(); + } + }; + using token_t = + decltype(::beman::execution26::get_stop_token(::beman::execution26::get_env(std::declval()))); + using callback_t = ::beman::execution26::stop_callback_for_t; + + std::remove_cvref_t receiver; + inner_state s; + ::beman::execution26::inplace_stop_source source; + ::std::optional callback; - std::remove_cvref_t receiver; - inner_state s; template <::beman::execution26::receiver R, typename PS> state(R&& r, PS& ps) : receiver(std::forward(r)), s(ps->connect(this)) {} void start() & noexcept { this->s.start(); } void complete_value() override { ::beman::execution26::set_value(std::move(this->receiver)); } + void complete_error(std::error_code err) override { + ::beman::execution26::set_error(std::move(receiver), err); + } void complete_error(std::exception_ptr ptr) override { ::beman::execution26::set_error(std::move(receiver), std::move(ptr)); } void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); } + ::beman::execution26::inplace_stop_token get_stop_token() override { + if constexpr (::std::same_as) { + return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)); + } else { + if constexpr (not ::std::same_as) { + if (not this->callback) { + this->callback.emplace( + ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)), + stopper{this}); + } + } + return this->source.get_token(); + } + } }; - struct env { - any_scheduler query( - const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept; + class sender; + class env { + friend class sender; + + private: + const sender* sndr; + env(const sender* s) : sndr(s) {} + + public: + any_scheduler query(const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) + const noexcept { + return this->sndr->inner_sender->get_completion_scheduler(); + } }; // sender implementation - struct sender { + class sender { + friend class env; + + private: struct base { - virtual ~base() = default; - virtual base* move(void*) = 0; - virtual base* clone(void*) const = 0; - virtual inner_state connect(state_base*) = 0; + virtual ~base() = default; + virtual base* move(void*) = 0; + virtual base* clone(void*) const = 0; + virtual inner_state connect(state_base*) = 0; + virtual any_scheduler get_completion_scheduler() const = 0; }; template <::beman::execution26::scheduler Scheduler> struct concrete : base { @@ -95,17 +166,23 @@ struct any_scheduler { template <::beman::execution26::scheduler S> concrete(S&& s) : sender(::beman::execution26::schedule(std::forward(s))) {} - base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } - base* clone(void* buffer) const override { return new (buffer) concrete(*this); } - inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); } + base* clone(void* buffer) const override { return new (buffer) concrete(*this); } + inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); } + any_scheduler get_completion_scheduler() const override { + return any_scheduler(::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>( + ::beman::execution26::get_env(this->sender))); + } }; + poly inner_sender; + public: using sender_concept = ::beman::execution26::sender_t; using completion_signatures = ::beman::execution26::completion_signatures<::beman::execution26::set_value_t(), + ::beman::execution26::set_error_t(std::error_code), ::beman::execution26::set_error_t(std::exception_ptr), ::beman::execution26::set_stopped_t()>; - poly inner_sender; template <::beman::execution26::scheduler S> explicit sender(S&& s) : inner_sender(static_cast*>(nullptr), std::forward(s)) {} @@ -117,7 +194,7 @@ struct any_scheduler { return state(std::forward(r), this->inner_sender); } - env get_env() const noexcept { return {}; } + env get_env() const noexcept { return env(this); } }; // scheduler implementation @@ -144,30 +221,18 @@ struct any_scheduler { poly scheduler; + public: using scheduler_concept = ::beman::execution26::scheduler_t; template requires(not std::same_as>) explicit any_scheduler(S&& s) : scheduler(static_cast>*>(nullptr), std::forward(s)) {} - any_scheduler(any_scheduler&& other) = default; - any_scheduler(const any_scheduler& other) = default; - sender schedule() { return this->scheduler->schedule(); } - bool operator==(const any_scheduler&) const = default; + sender schedule() { return this->scheduler->schedule(); } + bool operator==(const any_scheduler&) const = default; }; static_assert(::beman::execution26::scheduler); -template -struct scheduler_of { - using type = ::beman::lazy::detail::any_scheduler; -}; -template - requires requires { typename Context::scheduler_type; } -struct scheduler_of { - using type = typename Context::scheduler_type; -}; -template -using scheduler_of_t = typename scheduler_of::type; } // namespace beman::lazy::detail // ---------------------------------------------------------------------------- diff --git a/include/beman/lazy/detail/inline_scheduler.hpp b/include/beman/lazy/detail/inline_scheduler.hpp index 9577576..e15596c 100644 --- a/include/beman/lazy/detail/inline_scheduler.hpp +++ b/include/beman/lazy/detail/inline_scheduler.hpp @@ -11,6 +11,21 @@ // ---------------------------------------------------------------------------- namespace beman::lazy::detail { +/*! + * \brief Scheduler completing immmediately when started on the same thread + * \headerfile beman/lazy/lazy.hpp + * + * The class `inline_scheduler` is used to prevent any actual schedulering. + * It does have a scheduler interface but it completes synchronously on + * the thread on which it gets `start`ed before returning from `start`. + * The implication is that any blocking working gets executed on the + * calling thread. Also, if there is lot of synchronous work repeatedly + * getting scheduled using `inline_scheduler` it is possible to get a + * stack overflow. + * + * In general, any use of `inline_scheduler` should receive a lot of + * attention as it is fairly easy to create subtle bugs using this scheduler. + */ struct inline_scheduler { struct env { inline_scheduler diff --git a/include/beman/lazy/detail/lazy.hpp b/include/beman/lazy/detail/lazy.hpp index 397c971..8a79626 100644 --- a/include/beman/lazy/detail/lazy.hpp +++ b/include/beman/lazy/detail/lazy.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include diff --git a/include/beman/lazy/detail/poly.hpp b/include/beman/lazy/detail/poly.hpp index 93f5be4..4b02217 100644 --- a/include/beman/lazy/detail/poly.hpp +++ b/include/beman/lazy/detail/poly.hpp @@ -68,7 +68,8 @@ class alignas(sizeof(double)) poly { { return other.pointer()->equals(this->pointer()); } - Base* operator->() { return this->pointer(); } + Base* operator->() { return this->pointer(); } + const Base* operator->() const { return this->pointer(); } }; } // namespace beman::lazy::detail diff --git a/include/beman/lazy/detail/scheduler_of.hpp b/include/beman/lazy/detail/scheduler_of.hpp new file mode 100644 index 0000000..e1ac388 --- /dev/null +++ b/include/beman/lazy/detail/scheduler_of.hpp @@ -0,0 +1,25 @@ +// include/beman/lazy/detail/scheduler_of.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF +#define INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF + +// ---------------------------------------------------------------------------- + +namespace beman::lazy::detail { +template +struct scheduler_of { + using type = ::beman::lazy::detail::any_scheduler; +}; +template + requires requires { typename Context::scheduler_type; } +struct scheduler_of { + using type = typename Context::scheduler_type; +}; +template +using scheduler_of_t = typename scheduler_of::type; +} // namespace beman::lazy::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/tests/beman/lazy/CMakeLists.txt b/tests/beman/lazy/CMakeLists.txt index df45cdd..e94d145 100644 --- a/tests/beman/lazy/CMakeLists.txt +++ b/tests/beman/lazy/CMakeLists.txt @@ -1,6 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception -list(APPEND lazy_tests poly lazy) +list(APPEND lazy_tests any_scheduler inline_scheduler lazy poly) foreach(test ${lazy_tests}) add_executable(beman.lazy.tests.${test}) diff --git a/tests/beman/lazy/any_scheduler.test.cpp b/tests/beman/lazy/any_scheduler.test.cpp new file mode 100644 index 0000000..3dbf26e --- /dev/null +++ b/tests/beman/lazy/any_scheduler.test.cpp @@ -0,0 +1,310 @@ +// tests/beman/lazy/any_scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution26; +namespace ly = beman::lazy; + +// ---------------------------------------------------------------------------- + +namespace { +struct thread_context { + enum class complete { success, failure, exception, never }; + struct base { + base* next{}; + virtual ~base() = default; + virtual void complete() = 0; + }; + + std::latch stop_done{1u}; + std::mutex mutex; + std::condition_variable condition; + bool done{false}; + base* work{}; + std::thread thread; + + base* get_work() { + std::unique_lock cerberus(this->mutex); + condition.wait(cerberus, [this] { return this->done || this->work; }); + base* rc{this->work}; + if (rc) { + this->work = rc->next; + } + return rc; + } + void enqueue(base* w) { + { + std::lock_guard cerberus(this->mutex); + w->next = std::exchange(this->work, w); + } + this->condition.notify_one(); + } + + thread_context() + : thread([this] { + while (auto w{this->get_work()}) { + w->complete(); + } + this->stop_done.count_down(); + }) {} + ~thread_context() { + this->stop(); + this->thread.join(); + } + + struct scheduler { + using scheduler_concept = ex::scheduler_t; + thread_context* context; + complete cmpl{complete::success}; + bool operator==(const scheduler&) const = default; + + template + struct state : base { + struct stopper { + state* st; + void operator()() noexcept { + auto self{this->st}; + self->callback.reset(); + ex::set_stopped(std::move(self->receiver)); + } + }; + using operation_state_concept = ex::operation_state_t; + using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval()))); + using callback_t = ex::stop_callback_for_t; + + thread_context* ctxt; + std::remove_cvref_t receiver; + thread_context::complete cmpl; + std::optional callback; + + template + state(auto c, R&& r, thread_context::complete cm) : ctxt(c), receiver(std::forward(r)), cmpl(cm) {} + void start() & noexcept { + callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stopper{this}); + if (cmpl != thread_context::complete::never) + this->ctxt->enqueue(this); + } + void complete() override { + this->callback.reset(); + if (this->cmpl == thread_context::complete::success) + ex::set_value(std::move(this->receiver)); + else if (this->cmpl == thread_context::complete::failure) + ex::set_error(std::move(this->receiver), std::make_error_code(std::errc::address_in_use)); + else + ex::set_error( + std::move(this->receiver), + std::make_exception_ptr(std::system_error(std::make_error_code(std::errc::address_in_use)))); + } + }; + struct env { + thread_context* ctxt; + scheduler query(const ex::get_completion_scheduler_t&) const noexcept { + return scheduler{ctxt}; + } + }; + struct sender { + using sender_concept = ex::sender_t; + using completion_signatures = + ex::completion_signatures; + + thread_context* ctxt; + thread_context::complete cmpl; + + template + auto connect(Receiver&& receiver) { + static_assert(ex::operation_state>); + return state(this->ctxt, std::forward(receiver), this->cmpl); + } + env get_env() const noexcept { return {this->ctxt}; } + }; + static_assert(ex::sender); + + sender schedule() noexcept { return sender{this->context, this->cmpl}; } + }; + static_assert(ex::scheduler); + + scheduler get_scheduler(complete cmpl = complete::success) { return scheduler{this, cmpl}; } + void stop() { + { + std::lock_guard cerberus(this->mutex); + this->done = true; + } + this->condition.notify_one(); + this->stop_done.wait(); + } +}; + +enum class stop_result { none, success, failure, stopped }; +template +struct stop_env { + Token token; + auto query(ex::get_stop_token_t) const noexcept { return this->token; } +}; +template +stop_env(Token&&) -> stop_env>; + +template +struct stop_receiver { + using receiver_concept = ex::receiver_t; + Token token; + stop_result& result; + std::latch* completed{}; + auto get_env() const noexcept { return stop_env{this->token}; } + + void set_value(auto&&...) && noexcept { + this->result = stop_result::success; + if (this->completed) + this->completed->count_down(); + } + void set_error(auto&&) && noexcept { + this->result = stop_result::failure; + if (this->completed) + this->completed->count_down(); + } + void set_stopped() && noexcept { + this->result = stop_result::stopped; + if (this->completed) + this->completed->count_down(); + } +}; +template +stop_receiver(Token&&, stop_result&, std::latch* = nullptr) -> stop_receiver>; +static_assert(ex::receiver>); + +} // namespace + +// ---------------------------------------------------------------------------- + +int main() { + static_assert(ex::scheduler); + + thread_context ctxt1; + thread_context ctxt2; + + assert(ctxt1.get_scheduler() == ctxt1.get_scheduler()); + assert(ctxt2.get_scheduler() == ctxt2.get_scheduler()); + assert(ctxt1.get_scheduler() != ctxt2.get_scheduler()); + + ly::detail::any_scheduler sched1(ctxt1.get_scheduler()); + ly::detail::any_scheduler sched2(ctxt2.get_scheduler()); + assert(sched1 == sched1); + assert(sched2 == sched2); + assert(sched1 != sched2); + + ly::detail::any_scheduler copy(sched1); + assert(copy == sched1); + assert(copy != sched2); + ly::detail::any_scheduler move(std::move(copy)); + assert(move == sched1); + assert(move != sched2); + + copy = sched2; + assert(copy == sched2); + assert(copy != sched1); + + move = std::move(copy); + assert(move == sched2); + assert(move != sched1); + + std::atomic id1{}; + std::atomic id2{}; + ex::sync_wait(ex::schedule(sched1) | ex::then([&id1]() { id1 = std::this_thread::get_id(); })); + ex::sync_wait(ex::schedule(sched2) | ex::then([&id2]() { id2 = std::this_thread::get_id(); })); + assert(id1 != id2); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched1)) | + ex::then([&id1]() { assert(id1 == std::this_thread::get_id()); })); + ex::sync_wait(ex::schedule(ly::detail::any_scheduler(sched2)) | + ex::then([&id2]() { assert(id2 == std::this_thread::get_id()); })); + + { + bool success{false}; + bool failed{false}; + bool exception{false}; + ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::failure)) | + ex::then([&success] { success = true; }) | ex::upon_error([&failed, &exception](auto err) { + if constexpr (std::same_as) + failed = true; + else if constexpr (std::same_as) + exception = true; + })); + assert(not success); + assert(failed); + assert(not exception); + } + { + bool success{false}; + bool failed{false}; + bool exception{false}; + ex::sync_wait(ex::schedule(ctxt1.get_scheduler(thread_context::complete::exception)) | + ex::then([&success] { success = true; }) | ex::upon_error([&failed, &exception](auto err) { + if constexpr (std::same_as) + failed = true; + else if constexpr (std::same_as) + exception = true; + })); + assert(not success); + assert(not failed); + assert(exception); + } + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ex::connect(ex::schedule(ctxt1.get_scheduler(thread_context::complete::never)), + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + ex::inplace_stop_source source; + stop_result result{stop_result::none}; + auto state{ + ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + ex::stop_source source; + stop_result result{stop_result::none}; + auto state{ + ex::connect(ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::never))), + stop_receiver{source.get_token(), result})}; + assert(result == stop_result::none); + ex::start(state); + assert(result == stop_result::none); + source.request_stop(); + assert(result == stop_result::stopped); + } + { + std::latch completed{1}; + stop_result result{stop_result::none}; + auto state{ex::connect( + ex::schedule(ly::detail::any_scheduler(ctxt1.get_scheduler(thread_context::complete::success))), + stop_receiver{ex::never_stop_token(), result, &completed})}; + assert(result == stop_result::none); + ex::start(state); + completed.wait(); + assert(result == stop_result::success); + } +} diff --git a/tests/beman/lazy/inline_scheduler.test.cpp b/tests/beman/lazy/inline_scheduler.test.cpp new file mode 100644 index 0000000..244e7a4 --- /dev/null +++ b/tests/beman/lazy/inline_scheduler.test.cpp @@ -0,0 +1,43 @@ +// tests/beman/lazy/inline_scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#ifdef NDEBUG +#undef NDEBUG +#endif +#include + +namespace ex = beman::execution26; +namespace ly = beman::lazy; + +// ---------------------------------------------------------------------------- + +namespace { +struct receiver { + using receiver_concept = ex::receiver_t; + int& value; + + void set_value(int v) && noexcept { this->value = v; } +}; +static_assert(ex::receiver); +} // namespace + +int main() { + ly::detail::inline_scheduler sched; + static_assert(ex::scheduler); + + auto sched_sender{ex::schedule(sched)}; + static_assert(ex::sender); + + auto env{ex::get_env(sched_sender)}; + assert(sched == ex::get_completion_scheduler(env)); + + int value{}; + auto state{ex::connect(std::move(sched_sender) | ex::then([]() noexcept { return 17; }), receiver{value})}; + static_assert(ex::operation_state); + + assert(value == 0); + ex::start(state); + assert(value == 17); +}