Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler tests #4

Merged
merged 16 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ jobs:
cpp_version: [20, 23, 26]
cmake_args:
- description: "Default"
- description: "TSan"
args: "-DBEMAN_BUILDSYS_SANITIZER=TSan"
# Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping
# - description: "TSan"
# args: "-DBEMAN_BUILDSYS_SANITIZER=TSan"
- description: "MaxSan"
args: "-DBEMAN_BUILDSYS_SANITIZER=MaxSan"
include:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ FetchContent_Declare(
execution26
# for local development, use SOURCE_DIR <path-to>/execution26
GIT_REPOSITORY https://github.com/bemanproject/execution26
GIT_TAG b52f28c
GIT_TAG a7ee8c8
)
FetchContent_MakeAvailable(execution26)

Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

BUILDDIR = build

default: test
default: compile

compile: config
cmake --build $(BUILDDIR) -j
#cmake --build $(BUILDDIR) -j
cmake --workflow --preset=appleclang-debug

format:
git clang-format main

config:
cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR)
#CXXFLAGS=-fsanitize=thread cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR)

test: compile
cd $(BUILDDIR); ctest
Expand Down
153 changes: 109 additions & 44 deletions include/beman/lazy/detail/any_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,63 @@
#include <beman/execution26/execution.hpp>
#include <beman/lazy/detail/poly.hpp>
#include <new>
#include <optional>
#include <utility>

// ----------------------------------------------------------------------------

namespace beman::lazy::detail {

struct any_scheduler {
// TODO: add support for forwarding stop_tokens to the type-erased sender
// TODO: other errors than std::exception_ptr should be supported
/*!
* \brief Type-erasing scheduler
* \headerfile beman/lazy/lazy.hpp <beman/lazy/lazy.hpp>
*
* The class `any_scheduler` is used to type-erase any scheduler class.
* Any error produced by the underlying scheduler except `std::error_code` is turned into
* an `std::exception_ptr`. `std::error_code` is forwarded as is. The `any_scheduler`
* forwards stop requests reported by the stop token obtained from the `connect`ed
* receiver to the sender used by the underlying scheduler.
*
* Completion signatures:
*
* - `ex::set_value_t()`
* - `ex::set_error_t(std::error_code)`
* - `ex::set_error_t(std::exception_ptr)`
* - `ex::set_stopped()`
*
* Usage:
*
* any_scheduler sched(other_scheduler);
* auto sender{ex::schedule(sched) | some_sender};
*/
class any_scheduler {
struct state_base {
virtual ~state_base() = default;
virtual void complete_value() = 0;
virtual void complete_error(std::exception_ptr) = 0;
virtual void complete_stopped() = 0;
virtual ~state_base() = default;
virtual void complete_value() = 0;
virtual void complete_error(::std::error_code) = 0;
virtual void complete_error(::std::exception_ptr) = 0;
virtual void complete_stopped() = 0;
virtual ::beman::execution26::inplace_stop_token get_stop_token() = 0;
};

struct inner_state {
struct receiver;
struct env {
state_base* state;
auto query(::beman::execution26::get_stop_token_t) const noexcept { return this->state->get_stop_token(); }
};
struct receiver {
using receiver_concept = ::beman::execution26::receiver_t;
state_base* state;
void set_value() && noexcept { this->state->complete_value(); }
void set_error(std::error_code err) && noexcept { this->state->complete_error(err); }
void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); }
template <typename E>
void set_error(E e) {
try {
throw std::move(e);
} catch (...) {
this->state->complete_error(std::current_exception());
}
void set_error(E e) && noexcept {
this->state->complete_error(std::make_exception_ptr(std::move(e)));
}
void set_stopped() && noexcept { this->state->complete_stopped(); }
env get_env() const noexcept { return {this->state}; }
};
static_assert(::beman::execution26::receiver<receiver>);

Expand All @@ -53,7 +79,7 @@ struct any_scheduler {
concrete(S&& s, state_base* b) : state(::beman::execution26::connect(std::forward<S>(s), receiver{b})) {}
void start() override { ::beman::execution26::start(state); }
};
::beman::lazy::detail::poly<base, 8u * sizeof(void*)> state;
::beman::lazy::detail::poly<base, 16u * sizeof(void*)> state;
template <::beman::execution26::sender S>
inner_state(S&& s, state_base* b) : state(static_cast<concrete<S>*>(nullptr), std::forward<S>(s), b) {}
void start() { this->state->start(); }
Expand All @@ -62,31 +88,76 @@ struct any_scheduler {
template <::beman::execution26::receiver Receiver>
struct state : state_base {
using operation_state_concept = ::beman::execution26::operation_state_t;
struct stopper {
state* st;
void operator()() noexcept {
state* self = this->st;
self->callback.reset();
self->source.request_stop();
}
};
using token_t =
decltype(::beman::execution26::get_stop_token(::beman::execution26::get_env(std::declval<Receiver>())));
using callback_t = ::beman::execution26::stop_callback_for_t<token_t, stopper>;

std::remove_cvref_t<Receiver> receiver;
inner_state s;
::beman::execution26::inplace_stop_source source;
::std::optional<callback_t> callback;

std::remove_cvref_t<Receiver> receiver;
inner_state s;
template <::beman::execution26::receiver R, typename PS>
state(R&& r, PS& ps) : receiver(std::forward<R>(r)), s(ps->connect(this)) {}
void start() & noexcept { this->s.start(); }
void complete_value() override { ::beman::execution26::set_value(std::move(this->receiver)); }
void complete_error(std::error_code err) override {
::beman::execution26::set_error(std::move(receiver), err);
}
void complete_error(std::exception_ptr ptr) override {
::beman::execution26::set_error(std::move(receiver), std::move(ptr));
}
void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); }
::beman::execution26::inplace_stop_token get_stop_token() override {
if constexpr (::std::same_as<token_t, ::beman::execution26::inplace_stop_token>) {
return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver));
} else {
if constexpr (not ::std::same_as<token_t, ::beman::execution26::never_stop_token>) {
if (not this->callback) {
this->callback.emplace(
::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)),
stopper{this});
}
}
return this->source.get_token();
}
}
};

struct env {
any_scheduler query(
const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept;
class sender;
class env {
friend class sender;

private:
const sender* sndr;
env(const sender* s) : sndr(s) {}

public:
any_scheduler query(const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&)
const noexcept {
return this->sndr->inner_sender->get_completion_scheduler();
}
};

// sender implementation
struct sender {
class sender {
friend class env;

private:
struct base {
virtual ~base() = default;
virtual base* move(void*) = 0;
virtual base* clone(void*) const = 0;
virtual inner_state connect(state_base*) = 0;
virtual ~base() = default;
virtual base* move(void*) = 0;
virtual base* clone(void*) const = 0;
virtual inner_state connect(state_base*) = 0;
virtual any_scheduler get_completion_scheduler() const = 0;
};
template <::beman::execution26::scheduler Scheduler>
struct concrete : base {
Expand All @@ -95,17 +166,23 @@ struct any_scheduler {

template <::beman::execution26::scheduler S>
concrete(S&& s) : sender(::beman::execution26::schedule(std::forward<S>(s))) {}
base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); }
base* clone(void* buffer) const override { return new (buffer) concrete(*this); }
inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); }
base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); }
base* clone(void* buffer) const override { return new (buffer) concrete(*this); }
inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); }
any_scheduler get_completion_scheduler() const override {
return any_scheduler(::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>(
::beman::execution26::get_env(this->sender)));
}
};
poly<base, 4 * sizeof(void*)> inner_sender;

public:
using sender_concept = ::beman::execution26::sender_t;
using completion_signatures =
::beman::execution26::completion_signatures<::beman::execution26::set_value_t(),
::beman::execution26::set_error_t(std::error_code),
::beman::execution26::set_error_t(std::exception_ptr),
::beman::execution26::set_stopped_t()>;
poly<base, 4 * sizeof(void*)> inner_sender;

template <::beman::execution26::scheduler S>
explicit sender(S&& s) : inner_sender(static_cast<concrete<S>*>(nullptr), std::forward<S>(s)) {}
Expand All @@ -117,7 +194,7 @@ struct any_scheduler {
return state<R>(std::forward<R>(r), this->inner_sender);
}

env get_env() const noexcept { return {}; }
env get_env() const noexcept { return env(this); }
};

// scheduler implementation
Expand All @@ -144,30 +221,18 @@ struct any_scheduler {

poly<base, 4 * sizeof(void*)> scheduler;

public:
using scheduler_concept = ::beman::execution26::scheduler_t;

template <typename S>
requires(not std::same_as<any_scheduler, std::remove_cvref_t<S>>)
explicit any_scheduler(S&& s) : scheduler(static_cast<concrete<std::decay_t<S>>*>(nullptr), std::forward<S>(s)) {}
any_scheduler(any_scheduler&& other) = default;
any_scheduler(const any_scheduler& other) = default;
sender schedule() { return this->scheduler->schedule(); }

bool operator==(const any_scheduler&) const = default;
sender schedule() { return this->scheduler->schedule(); }
bool operator==(const any_scheduler&) const = default;
};
static_assert(::beman::execution26::scheduler<any_scheduler>);

template <typename>
struct scheduler_of {
using type = ::beman::lazy::detail::any_scheduler;
};
template <typename Context>
requires requires { typename Context::scheduler_type; }
struct scheduler_of<Context> {
using type = typename Context::scheduler_type;
};
template <typename Context>
using scheduler_of_t = typename scheduler_of<Context>::type;
} // namespace beman::lazy::detail

// ----------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions include/beman/lazy/detail/inline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@
// ----------------------------------------------------------------------------

namespace beman::lazy::detail {
/*!
* \brief Scheduler completing immmediately when started on the same thread
* \headerfile beman/lazy/lazy.hpp <beman/lazy/lazy.hpp>
*
* The class `inline_scheduler` is used to prevent any actual schedulering.
* It does have a scheduler interface but it completes synchronously on
* the thread on which it gets `start`ed before returning from `start`.
* The implication is that any blocking working gets executed on the
* calling thread. Also, if there is lot of synchronous work repeatedly
* getting scheduled using `inline_scheduler` it is possible to get a
* stack overflow.
*
* In general, any use of `inline_scheduler` should receive a lot of
* attention as it is fairly easy to create subtle bugs using this scheduler.
*/
struct inline_scheduler {
struct env {
inline_scheduler
Expand Down
1 change: 1 addition & 0 deletions include/beman/lazy/detail/lazy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <beman/execution26/execution.hpp>
#include <beman/lazy/detail/allocator.hpp>
#include <beman/lazy/detail/any_scheduler.hpp>
#include <beman/lazy/detail/scheduler_of.hpp>
#include <beman/lazy/detail/stop_source.hpp>
#include <beman/lazy/detail/inline_scheduler.hpp>
#include <concepts>
Expand Down
3 changes: 2 additions & 1 deletion include/beman/lazy/detail/poly.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class alignas(sizeof(double)) poly {
{
return other.pointer()->equals(this->pointer());
}
Base* operator->() { return this->pointer(); }
Base* operator->() { return this->pointer(); }
const Base* operator->() const { return this->pointer(); }
};
} // namespace beman::lazy::detail

Expand Down
25 changes: 25 additions & 0 deletions include/beman/lazy/detail/scheduler_of.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// include/beman/lazy/detail/scheduler_of.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF
#define INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF

// ----------------------------------------------------------------------------

namespace beman::lazy::detail {
template <typename>
struct scheduler_of {
using type = ::beman::lazy::detail::any_scheduler;
};
template <typename Context>
requires requires { typename Context::scheduler_type; }
struct scheduler_of<Context> {
using type = typename Context::scheduler_type;
};
template <typename Context>
using scheduler_of_t = typename scheduler_of<Context>::type;
} // namespace beman::lazy::detail

// ----------------------------------------------------------------------------

#endif
2 changes: 1 addition & 1 deletion tests/beman/lazy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

list(APPEND lazy_tests poly lazy)
list(APPEND lazy_tests any_scheduler inline_scheduler lazy poly)

foreach(test ${lazy_tests})
add_executable(beman.lazy.tests.${test})
Expand Down
Loading
Loading