Skip to content

Commit

Permalink
started to implement on
Browse files Browse the repository at this point in the history
  • Loading branch information
dietmarkuehl committed Jan 3, 2025
1 parent 752882e commit d3e61b5
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 2 deletions.
85 changes: 85 additions & 0 deletions include/beman/execution26/detail/on.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// include/beman/execution26/detail/on.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON
#define INCLUDED_INCLUDE_BEMAN_EXECUTION26_DETAIL_ON

#include <beman/execution26/detail/scheduler.hpp>
#include <beman/execution26/detail/sender.hpp>
#include <beman/execution26/detail/sender_adaptor_closure.hpp>
#include <beman/execution26/detail/transform_sender.hpp>
#include <beman/execution26/detail/query_with_default.hpp>
#include <beman/execution26/detail/get_domain.hpp>
#include <beman/execution26/detail/get_domain_early.hpp>
#include <beman/execution26/detail/default_domain.hpp>
#include <beman/execution26/detail/make_sender.hpp>
#include <beman/execution26/detail/product_type.hpp>
#include <beman/execution26/detail/sender_for.hpp>
#include <beman/execution26/detail/join_env.hpp>
#include <beman/execution26/detail/forward_like.hpp>
#include <beman/execution26/detail/fwd_env.hpp>
#include <beman/execution26/detail/sched_env.hpp>
#include <utility>

// ----------------------------------------------------------------------------

namespace beman::execution26::detail {
struct on_t {
template <::beman::execution26::detail::sender_for<on_t> OutSndr, typename Env>
auto transform_env(OutSndr&& out_sndr, Env&& env) const -> decltype(auto) {
// auto&&[_, data, _] = out_sndr;
auto&& data{out_sndr.template get<1>()};

if constexpr (::beman::execution26::scheduler<decltype(data)>)
return ::beman::execution26::detail::join_env(
::beman::execution26::detail::sched_env(::beman::execution26::detail::forward_like<OutSndr>(data)

),
::beman::execution26::detail::fwd_env(::std::forward<Env>(env)));
else
return std::forward<Env>(env);
}

template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr>
requires ::beman::execution26::detail::is_sender_adaptor_closure<Sndr>
auto operator()(Sch&&, Sndr&&) const -> void =
BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure");

template <::beman::execution26::scheduler Sch,
::beman::execution26::sender Sndr,
::beman::execution26::detail::is_sender_adaptor_closure Closure>
requires ::beman::execution26::detail::is_sender_adaptor_closure<Sndr>
auto operator()(Sndr&&, Sch&&, Closure&&) const -> void =
BEMAN_EXECUTION26_DELETE("on(sch, sndr) requires that sndr isn't both a sender and sender adaptor closure");

template <::beman::execution26::scheduler Sch, ::beman::execution26::sender Sndr>
auto operator()(Sch&& sch, Sndr&& sndr) const {
auto domain{::beman::execution26::detail::query_with_default(
::beman::execution26::get_domain, sch, ::beman::execution26::default_domain{})};
return ::beman::execution26::transform_sender(
domain,
::beman::execution26::detail::make_sender(*this, ::std::forward<Sch>(sch), ::std::forward<Sndr>(sndr)));
}
template <::beman::execution26::scheduler Sch,
::beman::execution26::sender Sndr,
::beman::execution26::detail::is_sender_adaptor_closure Closure>
auto operator()(Sndr&& sndr, Sch&& sch, Closure&& closure) const {
auto domain{::beman::execution26::detail::get_domain_early(sndr)};
return ::beman::execution26::transform_sender(
domain,
::beman::execution26::detail::make_sender(
*this,
::beman::execution26::detail::product_type{::std::forward<Sch>(sch), ::std::forward<Closure>(closure)},
::std::forward<Sndr>(sndr)));
}
};
} // namespace beman::execution26::detail

namespace beman::execution26 {
using on_t = ::beman::execution26::detail::on_t;
inline constexpr ::beman::execution26::on_t on{};
} // namespace beman::execution26

// ----------------------------------------------------------------------------

#endif
8 changes: 8 additions & 0 deletions include/beman/execution26/detail/sender_adaptor_closure.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ namespace beman::execution26 {
template <typename>
struct sender_adaptor_closure : ::beman::execution26::detail::pipeable::sender_adaptor_closure_base {};
// NOLINTEND(bugprone-crtp-constructor-accessibility)

} // namespace beman::execution26

namespace beman::execution26::detail {
template <typename Closure>
concept is_sender_adaptor_closure =
::std::derived_from<::std::decay_t<Closure>,
::beman::execution26::sender_adaptor_closure<::std::decay_t<Closure>>>;
}

namespace beman::execution26::detail::pipeable {
template <::beman::execution26::sender Sender, typename Adaptor>
requires(not::beman::execution26::sender<Adaptor>) &&
Expand Down
3 changes: 2 additions & 1 deletion include/beman/execution26/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@
#include <beman/execution26/detail/into_variant.hpp>
#include <beman/execution26/detail/just.hpp>
#include <beman/execution26/detail/let.hpp>
// #include <beman/execution26/detail/on.hpp>
#include <beman/execution26/detail/read_env.hpp>
#include <beman/execution26/detail/schedule_from.hpp>
#include <beman/execution26/detail/starts_on.hpp>
#include <beman/execution26/detail/sync_wait.hpp>
#include <beman/execution26/detail/then.hpp>
#include <beman/execution26/detail/write_env.hpp>
#include <beman/execution26/detail/when_all.hpp>
#include <beman/execution26/detail/when_all_with_variant.hpp>
#include <beman/execution26/detail/with_awaitable_senders.hpp>
#include <beman/execution26/detail/write_env.hpp>

// ----------------------------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions src/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nostopstate.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nothrow_callable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/notify.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on_stop_request.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state_task.hpp
Expand Down
3 changes: 2 additions & 1 deletion tests/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ endif()
list(
APPEND
execution_tests
exec-on.test
notify.test
exec-scounting.test
exec-awaitable.test
Expand Down Expand Up @@ -128,7 +129,7 @@ foreach(test ${execution_tests})
add_test(NAME ${TEST_EXE} COMMAND $<TARGET_FILE:${TEST_EXE}>)
endforeach()

if(NOT PROJECT_IS_TOP_LEVEL)
if(PROJECT_IS_TOP_LEVEL)
# test if the targets are findable from the build directory
# cmake-format: off
add_test(NAME find-package-test
Expand Down
57 changes: 57 additions & 0 deletions tests/beman/execution26/exec-on.test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// tests/beman/execution26/exec-on.test.cpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution26/detail/on.hpp>
#include <beman/execution26/detail/just.hpp>
#include <beman/execution26/detail/sender_adaptor_closure.hpp>
#include <beman/execution26/detail/then.hpp>
#include <test/execution.hpp>
#include <test/thread_pool.hpp>
#include <concepts>

// ----------------------------------------------------------------------------

namespace {
struct both : test_std::sender_adaptor_closure<both> {
using sender_concept = test_std::sender_t;
};

static_assert(test_std::sender<both>);
static_assert(test_detail::is_sender_adaptor_closure<both>);

template <test_std::scheduler Sch,
test_std::sender Sndr,
test_detail::is_sender_adaptor_closure Closure,
typename Both>
auto test_interface(Sch sch, Sndr sndr, Closure closure, Both both) -> void {
static_assert(requires {
{ test_std::on(sch, sndr) } -> test_std::sender;
});
static_assert(not requires { test_std::on(sch, both); });
static_assert(requires {
{ test_std::on(sndr, sch, closure) } -> test_std::sender;
});
static_assert(not requires { test_std::on(both, sch, closure); });

auto sndr1{test_std::on(sch, sndr)};
auto sndr2{test_std::on(sndr, sch, closure)};
test::use(sndr1, sndr2);
}

template <test_detail::sender_for<test_std::on_t> OutSndr>
auto test_transform_env(OutSndr out_sndr) -> void {
auto e{test_std::on.transform_env(out_sndr, test_std::empty_env{})};
}
} // namespace

TEST(exec_on) {
test::thread_pool pool{};

static_assert(std::same_as<const test_std::on_t, decltype(test_std::on)>);
static_assert(test_detail::is_sender_adaptor_closure<decltype(test_std::then([] {}))>);
static_assert(not test_detail::is_sender_adaptor_closure<decltype(test_std::just([] {}))>);
test_interface(pool.get_scheduler(), test_std::just(), test_std::then([] {}), both{});

test_transform_env(test_std::on(pool.get_scheduler(), test_std::just()));
test_transform_env(test_std::on(test_std::just(), pool.get_scheduler(), test_std::then([] {})));
}
108 changes: 108 additions & 0 deletions tests/beman/execution26/include/test/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// tests/beman/execution26/include/test/thread_pool.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
// ----------------------------------------------------------------------------

#ifndef INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL
#define INCLUDED_TESTS_BEMAN_EXECUTION26_INCLUDE_TEST_THREAD_POOL

#include <beman/execution26/execution.hpp>
#include <test/execution.hpp>

#include <mutex>
#include <condition_variable>
#include <thread>
#include <memory>
// ----------------------------------------------------------------------------

namespace test {
struct thread_pool;
}

struct test::thread_pool {
struct node {
node* next;
virtual void run() = 0;

protected:
~node() = default;
};

std::mutex mutex;
std::condition_variable condition;
node* stack{};
bool stopped{false};
std::thread driver{[this] {
while (std::optional<node*> n = [this] {
std::unique_lock cerberus(mutex);
condition.wait(cerberus, [this] { return stopped || stack; });
return this->stack ? std::optional<node*>(std::exchange(this->stack, this->stack->next))
: std::optional<node*>();
}()) {
(*n)->run();
}
}};

thread_pool() = default;
thread_pool(thread_pool&&) = delete;
~thread_pool() {
this->stop();
this->driver.join();
}
void stop() {
{
std::lock_guard cerberus(this->mutex);
stopped = true;
}
this->condition.notify_one();
}

struct scheduler {
using scheduler_concept = test_std::scheduler_t;
struct env {
test::thread_pool* pool;

template <typename T>
scheduler query(const test_std::get_completion_scheduler_t<T>&) const noexcept {
return {this->pool};
}
};
template <typename Receiver>
struct state final : test::thread_pool::node {
using operation_state_concept = test_std::operation_state_t;
std::remove_cvref_t<Receiver> receiver;
test::thread_pool* pool;

template <typename R>
state(R&& r, test::thread_pool* p) : node{}, receiver(std::forward<R>(r)), pool(p) {}
void start() & noexcept {
{
std::lock_guard cerberus(this->pool->mutex);
this->next = std::exchange(this->pool->stack, this);
}
this->pool->condition.notify_one();
}
void run() override { test_std::set_value(std::move(this->receiver)); }
};
struct sender {
using sender_concept = test_std::sender_t;
using completion_signatures = test_std::completion_signatures<test_std::set_value_t()>;
test::thread_pool* pool;
template <typename Receiver>
state<Receiver> connect(Receiver&& receiver) {
return state<Receiver>(std::forward<Receiver>(receiver), pool);
}

env get_env() const noexcept { return {this->pool}; }
};
test::thread_pool* pool;
sender schedule() { return {this->pool}; }
bool operator==(const scheduler&) const = default;
};
scheduler get_scheduler() { return {this}; }
};

static_assert(test_std::scheduler<test::thread_pool::scheduler>);

// ----------------------------------------------------------------------------

#endif

0 comments on commit d3e61b5

Please sign in to comment.