From d3e61b5dc0d42258cbd493e162e1a6b6f76978ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dietmar=20K=C3=BChl?= Date: Fri, 3 Jan 2025 00:13:03 +0000 Subject: [PATCH] started to implement on --- include/beman/execution26/detail/on.hpp | 85 ++++++++++++++ .../detail/sender_adaptor_closure.hpp | 8 ++ include/beman/execution26/execution.hpp | 3 +- src/beman/execution26/CMakeLists.txt | 1 + tests/beman/execution26/CMakeLists.txt | 3 +- tests/beman/execution26/exec-on.test.cpp | 57 +++++++++ .../execution26/include/test/thread_pool.hpp | 108 ++++++++++++++++++ 7 files changed, 263 insertions(+), 2 deletions(-) create mode 100644 include/beman/execution26/detail/on.hpp create mode 100644 tests/beman/execution26/exec-on.test.cpp create mode 100644 tests/beman/execution26/include/test/thread_pool.hpp diff --git a/include/beman/execution26/detail/on.hpp b/include/beman/execution26/detail/on.hpp new file mode 100644 index 00000000..1290efc0 --- /dev/null +++ b/include/beman/execution26/detail/on.hpp @@ -0,0 +1,85 @@ +// include/beman/execution26/detail/on.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON +#define INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution26::detail { +struct on_t { + template <::beman::execution26::detail::sender_for OutSndr, typename Env> + auto transform_env(OutSndr&& out_sndr, Env&& env) const -> decltype(auto) { + // auto&&[_, data, _] = out_sndr; + auto&& data{out_sndr.template get<1>()}; + + if constexpr (::beman::execution26::scheduler) + return ::beman::execution26::detail::join_env( + ::beman::execution26::detail::sched_env(::beman::execution26::detail::forward_like(data) + + ), + ::beman::execution26::detail::fwd_env(::std::forward(env))); + else + return std::forward(env); + } + + template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr> + requires ::beman::execution26::detail::is_sender_adaptor_closure + auto operator()(Sch&&, Sndr&&) const -> void = + BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure"); + + template <::beman::execution26::scheduler Sch, + ::beman::execution26::sender Sndr, + ::beman::execution26::detail::is_sender_adaptor_closure Closure> + requires ::beman::execution26::detail::is_sender_adaptor_closure + auto operator()(Sndr&&, Sch&&, Closure&&) const -> void = + BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure"); + + template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr> + auto operator()(Sch&& sch, Sndr&& sndr) const { + auto domain{::beman::execution26::detail::query_with_default( + ::beman::execution26::get_domain, sch, ::beman::execution26::default_domain{})}; + return ::beman::execution26::transform_sender( + domain, + ::beman::execution26::detail::make_sender(*this, ::std::forward(sch), ::std::forward(sndr))); + } + template <::beman::execution26::scheduler Sch, + ::beman::execution26::sender Sndr, + ::beman::execution26::detail::is_sender_adaptor_closure Closure> + auto operator()(Sndr&& sndr, Sch&& sch, Closure&& closure) const { + auto domain{::beman::execution26::detail::get_domain_early(sndr)}; + return ::beman::execution26::transform_sender( + domain, + ::beman::execution26::detail::make_sender( + *this, + ::beman::execution26::detail::product_type{::std::forward(sch), ::std::forward(closure)}, + ::std::forward(sndr))); + } +}; +} // namespace beman::execution26::detail + +namespace beman::execution26 { +using on_t = ::beman::execution26::detail::on_t; +inline constexpr ::beman::execution26::on_t on{}; +} // namespace beman::execution26 + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution26/detail/sender_adaptor_closure.hpp b/include/beman/execution26/detail/sender_adaptor_closure.hpp index 53945dc2..23da0705 100644 --- a/include/beman/execution26/detail/sender_adaptor_closure.hpp +++ b/include/beman/execution26/detail/sender_adaptor_closure.hpp @@ -20,8 +20,16 @@ namespace beman::execution26 { template struct sender_adaptor_closure : ::beman::execution26::detail::pipeable::sender_adaptor_closure_base {}; // NOLINTEND(bugprone-crtp-constructor-accessibility) + } // namespace beman::execution26 +namespace beman::execution26::detail { +template +concept is_sender_adaptor_closure = + ::std::derived_from<::std::decay_t, + ::beman::execution26::sender_adaptor_closure<::std::decay_t>>; +} + namespace beman::execution26::detail::pipeable { template <::beman::execution26::sender Sender, typename Adaptor> requires(not::beman::execution26::sender) && diff --git a/include/beman/execution26/execution.hpp b/include/beman/execution26/execution.hpp index 10f33ede..7d88ffa9 100644 --- a/include/beman/execution26/execution.hpp +++ b/include/beman/execution26/execution.hpp @@ -42,15 +42,16 @@ #include #include #include +// #include #include #include #include #include #include -#include #include #include #include +#include // ---------------------------------------------------------------------------- diff --git a/src/beman/execution26/CMakeLists.txt b/src/beman/execution26/CMakeLists.txt index a7463994..1c02b30a 100644 --- a/src/beman/execution26/CMakeLists.txt +++ b/src/beman/execution26/CMakeLists.txt @@ -123,6 +123,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nostopstate.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nothrow_callable.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/notify.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on_stop_request.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state_task.hpp diff --git a/tests/beman/execution26/CMakeLists.txt b/tests/beman/execution26/CMakeLists.txt index 068148df..0166c7e3 100644 --- a/tests/beman/execution26/CMakeLists.txt +++ b/tests/beman/execution26/CMakeLists.txt @@ -11,6 +11,7 @@ endif() list( APPEND execution_tests + exec-on.test notify.test exec-scounting.test exec-awaitable.test @@ -128,7 +129,7 @@ foreach(test ${execution_tests}) add_test(NAME ${TEST_EXE} COMMAND $) endforeach() -if(NOT PROJECT_IS_TOP_LEVEL) +if(PROJECT_IS_TOP_LEVEL) # test if the targets are findable from the build directory # cmake-format: off add_test(NAME find-package-test diff --git a/tests/beman/execution26/exec-on.test.cpp b/tests/beman/execution26/exec-on.test.cpp new file mode 100644 index 00000000..cfd8a995 --- /dev/null +++ b/tests/beman/execution26/exec-on.test.cpp @@ -0,0 +1,57 @@ +// tests/beman/execution26/exec-on.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace { +struct both : test_std::sender_adaptor_closure { + using sender_concept = test_std::sender_t; +}; + +static_assert(test_std::sender); +static_assert(test_detail::is_sender_adaptor_closure); + +template +auto test_interface(Sch sch, Sndr sndr, Closure closure, Both both) -> void { + static_assert(requires { + { test_std::on(sch, sndr) } -> test_std::sender; + }); + static_assert(not requires { test_std::on(sch, both); }); + static_assert(requires { + { test_std::on(sndr, sch, closure) } -> test_std::sender; + }); + static_assert(not requires { test_std::on(both, sch, closure); }); + + auto sndr1{test_std::on(sch, sndr)}; + auto sndr2{test_std::on(sndr, sch, closure)}; + test::use(sndr1, sndr2); +} + +template OutSndr> +auto test_transform_env(OutSndr out_sndr) -> void { + auto e{test_std::on.transform_env(out_sndr, test_std::empty_env{})}; +} +} // namespace + +TEST(exec_on) { + test::thread_pool pool{}; + + static_assert(std::same_as); + static_assert(test_detail::is_sender_adaptor_closure); + static_assert(not test_detail::is_sender_adaptor_closure); + test_interface(pool.get_scheduler(), test_std::just(), test_std::then([] {}), both{}); + + test_transform_env(test_std::on(pool.get_scheduler(), test_std::just())); + test_transform_env(test_std::on(test_std::just(), pool.get_scheduler(), test_std::then([] {}))); +} \ No newline at end of file diff --git a/tests/beman/execution26/include/test/thread_pool.hpp b/tests/beman/execution26/include/test/thread_pool.hpp new file mode 100644 index 00000000..4758795e --- /dev/null +++ b/tests/beman/execution26/include/test/thread_pool.hpp @@ -0,0 +1,108 @@ +// tests/beman/execution26/include/test/thread_pool.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// ---------------------------------------------------------------------------- + +#ifndef INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL +#define INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL + +#include +#include + +#include +#include +#include +#include +// ---------------------------------------------------------------------------- + +namespace test { +struct thread_pool; +} + +struct test::thread_pool { + struct node { + node* next; + virtual void run() = 0; + + protected: + ~node() = default; + }; + + std::mutex mutex; + std::condition_variable condition; + node* stack{}; + bool stopped{false}; + std::thread driver{[this] { + while (std::optional n = [this] { + std::unique_lock cerberus(mutex); + condition.wait(cerberus, [this] { return stopped || stack; }); + return this->stack ? std::optional(std::exchange(this->stack, this->stack->next)) + : std::optional(); + }()) { + (*n)->run(); + } + }}; + + thread_pool() = default; + thread_pool(thread_pool&&) = delete; + ~thread_pool() { + this->stop(); + this->driver.join(); + } + void stop() { + { + std::lock_guard cerberus(this->mutex); + stopped = true; + } + this->condition.notify_one(); + } + + struct scheduler { + using scheduler_concept = test_std::scheduler_t; + struct env { + test::thread_pool* pool; + + template + scheduler query(const test_std::get_completion_scheduler_t&) const noexcept { + return {this->pool}; + } + }; + template + struct state final : test::thread_pool::node { + using operation_state_concept = test_std::operation_state_t; + std::remove_cvref_t receiver; + test::thread_pool* pool; + + template + state(R&& r, test::thread_pool* p) : node{}, receiver(std::forward(r)), pool(p) {} + void start() & noexcept { + { + std::lock_guard cerberus(this->pool->mutex); + this->next = std::exchange(this->pool->stack, this); + } + this->pool->condition.notify_one(); + } + void run() override { test_std::set_value(std::move(this->receiver)); } + }; + struct sender { + using sender_concept = test_std::sender_t; + using completion_signatures = test_std::completion_signatures; + test::thread_pool* pool; + template + state connect(Receiver&& receiver) { + return state(std::forward(receiver), pool); + } + + env get_env() const noexcept { return {this->pool}; } + }; + test::thread_pool* pool; + sender schedule() { return {this->pool}; } + bool operator==(const scheduler&) const = default; + }; + scheduler get_scheduler() { return {this}; } +}; + +static_assert(test_std::scheduler); + +// ---------------------------------------------------------------------------- + +#endif