Skip to content

Commit

Permalink
Added tests and documentation for any_scheduler and inline_scheduler
Browse files Browse the repository at this point in the history
Add scheduler tests
  • Loading branch information
dietmarkuehl authored Jan 18, 2025
2 parents fdf87c8 + 1ffb49e commit bf76562
Show file tree
Hide file tree
Showing 11 changed files with 514 additions and 52 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ jobs:
cpp_version: [20, 23, 26]
cmake_args:
- description: "Default"
- description: "TSan"
args: "-DBEMAN_BUILDSYS_SANITIZER=TSan"
# Linux has a TSan bug: FATAL: ThreadSanitizer: unexpected memory mapping
# - description: "TSan"
# args: "-DBEMAN_BUILDSYS_SANITIZER=TSan"
- description: "MaxSan"
args: "-DBEMAN_BUILDSYS_SANITIZER=MaxSan"
include:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ FetchContent_Declare(
execution26
# for local development, use SOURCE_DIR <path-to>/execution26
GIT_REPOSITORY https://github.com/bemanproject/execution26
GIT_TAG b52f28c
GIT_TAG a7ee8c8
)
FetchContent_MakeAvailable(execution26)

Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

BUILDDIR = build

default: test
default: compile

compile: config
cmake --build $(BUILDDIR) -j
#cmake --build $(BUILDDIR) -j
cmake --workflow --preset=appleclang-debug

format:
git clang-format main

config:
cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR)
#CXXFLAGS=-fsanitize=thread cmake -DCMAKE_BUILD_TYPE=Debug -B $(BUILDDIR)

test: compile
cd $(BUILDDIR); ctest
Expand Down
153 changes: 109 additions & 44 deletions include/beman/lazy/detail/any_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,63 @@
#include <beman/execution26/execution.hpp>
#include <beman/lazy/detail/poly.hpp>
#include <new>
#include <optional>
#include <utility>

// ----------------------------------------------------------------------------

namespace beman::lazy::detail {

struct any_scheduler {
// TODO: add support for forwarding stop_tokens to the type-erased sender
// TODO: other errors than std::exception_ptr should be supported
/*!
* \brief Type-erasing scheduler
* \headerfile beman/lazy/lazy.hpp <beman/lazy/lazy.hpp>
*
* The class `any_scheduler` is used to type-erase any scheduler class.
* Any error produced by the underlying scheduler except `std::error_code` is turned into
* an `std::exception_ptr`. `std::error_code` is forwarded as is. The `any_scheduler`
* forwards stop requests reported by the stop token obtained from the `connect`ed
* receiver to the sender used by the underlying scheduler.
*
* Completion signatures:
*
* - `ex::set_value_t()`
* - `ex::set_error_t(std::error_code)`
* - `ex::set_error_t(std::exception_ptr)`
* - `ex::set_stopped()`
*
* Usage:
*
* any_scheduler sched(other_scheduler);
* auto sender{ex::schedule(sched) | some_sender};
*/
class any_scheduler {
struct state_base {
virtual ~state_base() = default;
virtual void complete_value() = 0;
virtual void complete_error(std::exception_ptr) = 0;
virtual void complete_stopped() = 0;
virtual ~state_base() = default;
virtual void complete_value() = 0;
virtual void complete_error(::std::error_code) = 0;
virtual void complete_error(::std::exception_ptr) = 0;
virtual void complete_stopped() = 0;
virtual ::beman::execution26::inplace_stop_token get_stop_token() = 0;
};

struct inner_state {
struct receiver;
struct env {
state_base* state;
auto query(::beman::execution26::get_stop_token_t) const noexcept { return this->state->get_stop_token(); }
};
struct receiver {
using receiver_concept = ::beman::execution26::receiver_t;
state_base* state;
void set_value() && noexcept { this->state->complete_value(); }
void set_error(std::error_code err) && noexcept { this->state->complete_error(err); }
void set_error(std::exception_ptr ptr) && noexcept { this->state->complete_error(std::move(ptr)); }
template <typename E>
void set_error(E e) {
try {
throw std::move(e);
} catch (...) {
this->state->complete_error(std::current_exception());
}
void set_error(E e) && noexcept {
this->state->complete_error(std::make_exception_ptr(std::move(e)));
}
void set_stopped() && noexcept { this->state->complete_stopped(); }
env get_env() const noexcept { return {this->state}; }
};
static_assert(::beman::execution26::receiver<receiver>);

Expand All @@ -53,7 +79,7 @@ struct any_scheduler {
concrete(S&& s, state_base* b) : state(::beman::execution26::connect(std::forward<S>(s), receiver{b})) {}
void start() override { ::beman::execution26::start(state); }
};
::beman::lazy::detail::poly<base, 8u * sizeof(void*)> state;
::beman::lazy::detail::poly<base, 16u * sizeof(void*)> state;
template <::beman::execution26::sender S>
inner_state(S&& s, state_base* b) : state(static_cast<concrete<S>*>(nullptr), std::forward<S>(s), b) {}
void start() { this->state->start(); }
Expand All @@ -62,31 +88,76 @@ struct any_scheduler {
template <::beman::execution26::receiver Receiver>
struct state : state_base {
using operation_state_concept = ::beman::execution26::operation_state_t;
struct stopper {
state* st;
void operator()() noexcept {
state* self = this->st;
self->callback.reset();
self->source.request_stop();
}
};
using token_t =
decltype(::beman::execution26::get_stop_token(::beman::execution26::get_env(std::declval<Receiver>())));
using callback_t = ::beman::execution26::stop_callback_for_t<token_t, stopper>;

std::remove_cvref_t<Receiver> receiver;
inner_state s;
::beman::execution26::inplace_stop_source source;
::std::optional<callback_t> callback;

std::remove_cvref_t<Receiver> receiver;
inner_state s;
template <::beman::execution26::receiver R, typename PS>
state(R&& r, PS& ps) : receiver(std::forward<R>(r)), s(ps->connect(this)) {}
void start() & noexcept { this->s.start(); }
void complete_value() override { ::beman::execution26::set_value(std::move(this->receiver)); }
void complete_error(std::error_code err) override {
::beman::execution26::set_error(std::move(receiver), err);
}
void complete_error(std::exception_ptr ptr) override {
::beman::execution26::set_error(std::move(receiver), std::move(ptr));
}
void complete_stopped() override { ::beman::execution26::set_stopped(std::move(this->receiver)); }
::beman::execution26::inplace_stop_token get_stop_token() override {
if constexpr (::std::same_as<token_t, ::beman::execution26::inplace_stop_token>) {
return ::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver));
} else {
if constexpr (not ::std::same_as<token_t, ::beman::execution26::never_stop_token>) {
if (not this->callback) {
this->callback.emplace(
::beman::execution26::get_stop_token(::beman::execution26::get_env(this->receiver)),
stopper{this});
}
}
return this->source.get_token();
}
}
};

struct env {
any_scheduler query(
const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&) const noexcept;
class sender;
class env {
friend class sender;

private:
const sender* sndr;
env(const sender* s) : sndr(s) {}

public:
any_scheduler query(const ::beman::execution26::get_completion_scheduler_t<::beman::execution26::set_value_t>&)
const noexcept {
return this->sndr->inner_sender->get_completion_scheduler();
}
};

// sender implementation
struct sender {
class sender {
friend class env;

private:
struct base {
virtual ~base() = default;
virtual base* move(void*) = 0;
virtual base* clone(void*) const = 0;
virtual inner_state connect(state_base*) = 0;
virtual ~base() = default;
virtual base* move(void*) = 0;
virtual base* clone(void*) const = 0;
virtual inner_state connect(state_base*) = 0;
virtual any_scheduler get_completion_scheduler() const = 0;
};
template <::beman::execution26::scheduler Scheduler>
struct concrete : base {
Expand All @@ -95,17 +166,23 @@ struct any_scheduler {

template <::beman::execution26::scheduler S>
concrete(S&& s) : sender(::beman::execution26::schedule(std::forward<S>(s))) {}
base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); }
base* clone(void* buffer) const override { return new (buffer) concrete(*this); }
inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); }
base* move(void* buffer) override { return new (buffer) concrete(std::move(*this)); }
base* clone(void* buffer) const override { return new (buffer) concrete(*this); }
inner_state connect(state_base* b) override { return inner_state(::std::move(sender), b); }
any_scheduler get_completion_scheduler() const override {
return any_scheduler(::beman::execution26::get_completion_scheduler<::beman::execution26::set_value_t>(
::beman::execution26::get_env(this->sender)));
}
};
poly<base, 4 * sizeof(void*)> inner_sender;

public:
using sender_concept = ::beman::execution26::sender_t;
using completion_signatures =
::beman::execution26::completion_signatures<::beman::execution26::set_value_t(),
::beman::execution26::set_error_t(std::error_code),
::beman::execution26::set_error_t(std::exception_ptr),
::beman::execution26::set_stopped_t()>;
poly<base, 4 * sizeof(void*)> inner_sender;

template <::beman::execution26::scheduler S>
explicit sender(S&& s) : inner_sender(static_cast<concrete<S>*>(nullptr), std::forward<S>(s)) {}
Expand All @@ -117,7 +194,7 @@ struct any_scheduler {
return state<R>(std::forward<R>(r), this->inner_sender);
}

env get_env() const noexcept { return {}; }
env get_env() const noexcept { return env(this); }
};

// scheduler implementation
Expand All @@ -144,30 +221,18 @@ struct any_scheduler {

poly<base, 4 * sizeof(void*)> scheduler;

public:
using scheduler_concept = ::beman::execution26::scheduler_t;

template <typename S>
requires(not std::same_as<any_scheduler, std::remove_cvref_t<S>>)
explicit any_scheduler(S&& s) : scheduler(static_cast<concrete<std::decay_t<S>>*>(nullptr), std::forward<S>(s)) {}
any_scheduler(any_scheduler&& other) = default;
any_scheduler(const any_scheduler& other) = default;
sender schedule() { return this->scheduler->schedule(); }

bool operator==(const any_scheduler&) const = default;
sender schedule() { return this->scheduler->schedule(); }
bool operator==(const any_scheduler&) const = default;
};
static_assert(::beman::execution26::scheduler<any_scheduler>);

template <typename>
struct scheduler_of {
using type = ::beman::lazy::detail::any_scheduler;
};
template <typename Context>
requires requires { typename Context::scheduler_type; }
struct scheduler_of<Context> {
using type = typename Context::scheduler_type;
};
template <typename Context>
using scheduler_of_t = typename scheduler_of<Context>::type;
} // namespace beman::lazy::detail

// ----------------------------------------------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions include/beman/lazy/detail/inline_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@
// ----------------------------------------------------------------------------

namespace beman::lazy::detail {
/*!
* \brief Scheduler completing immmediately when started on the same thread
* \headerfile beman/lazy/lazy.hpp <beman/lazy/lazy.hpp>
*
* The class `inline_scheduler` is used to prevent any actual schedulering.
* It does have a scheduler interface but it completes synchronously on
* the thread on which it gets `start`ed before returning from `start`.
* The implication is that any blocking working gets executed on the
* calling thread. Also, if there is lot of synchronous work repeatedly
* getting scheduled using `inline_scheduler` it is possible to get a
* stack overflow.
*
* In general, any use of `inline_scheduler` should receive a lot of
* attention as it is fairly easy to create subtle bugs using this scheduler.
*/
struct inline_scheduler {
struct env {
inline_scheduler
Expand Down
1 change: 1 addition & 0 deletions include/beman/lazy/detail/lazy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <beman/execution26/execution.hpp>
#include <beman/lazy/detail/allocator.hpp>
#include <beman/lazy/detail/any_scheduler.hpp>
#include <beman/lazy/detail/scheduler_of.hpp>
#include <beman/lazy/detail/stop_source.hpp>
#include <beman/lazy/detail/inline_scheduler.hpp>
#include <concepts>
Expand Down
3 changes: 2 additions & 1 deletion include/beman/lazy/detail/poly.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class alignas(sizeof(double)) poly {
{
return other.pointer()->equals(this->pointer());
}
Base* operator->() { return this->pointer(); }
Base* operator->() { return this->pointer(); }
const Base* operator->() const { return this->pointer(); }
};
} // namespace beman::lazy::detail

Expand Down
25 changes: 25 additions & 0 deletions include/beman/lazy/detail/scheduler_of.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// include/beman/lazy/detail/scheduler_of.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF
#define INCLUDED_INCLUDE_BEMAN_LAZY_DETAIL_SCHEDULER_OF

// ----------------------------------------------------------------------------

namespace beman::lazy::detail {
template <typename>
struct scheduler_of {
using type = ::beman::lazy::detail::any_scheduler;
};
template <typename Context>
requires requires { typename Context::scheduler_type; }
struct scheduler_of<Context> {
using type = typename Context::scheduler_type;
};
template <typename Context>
using scheduler_of_t = typename scheduler_of<Context>::type;
} // namespace beman::lazy::detail

// ----------------------------------------------------------------------------

#endif
2 changes: 1 addition & 1 deletion tests/beman/lazy/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

list(APPEND lazy_tests poly lazy)
list(APPEND lazy_tests any_scheduler inline_scheduler lazy poly)

foreach(test ${lazy_tests})
add_executable(beman.lazy.tests.${test})
Expand Down
Loading

0 comments on commit bf76562

Please sign in to comment.