Skip to content

Commit

Permalink
Flesh out comments
Browse files Browse the repository at this point in the history
  • Loading branch information
harrishancock committed Jan 28, 2025
1 parent 51454d7 commit 9c43b07
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 28 deletions.
46 changes: 38 additions & 8 deletions src/rust/async/await.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

namespace workerd::rust::async {

// =======================================================================================
// ArcWakerAwaiter

// ArcWakerAwaiter is an awaiter intended to await Promises associated with the ArcWaker produced
// when a KjWaker is cloned.
//
// TODO(perf): This is only an Event because we need to handle the case where all the Wakers are
// dropped and we receive a WakeInstruction::IGNORE. If we could somehow disarm the
// CrossThreadPromiseFulfillers inside ArcWaker when it's dropped, we could avoid requiring this
Expand All @@ -26,8 +32,9 @@ class ArcWakerAwaiter final: public kj::_::Event {
void tracePromise(kj::_::TraceBuilder& builder, bool stopAtNextEvent);

private:
// We need to keep a reference to our CoAwaitWaker so that we can arm its Event when our
// wrapped OwnPromiseNode becomes ready.
// We need to keep a reference to our CoAwaitWaker so that we can decide whether our
// `traceEvent()` implementation should forward to its Future poll() Event. Additionally, we need
// to be able to arm the Future poll() Event when our wrapped OwnPromiseNode becomes ready.
//
// Safety: It is safe to store a bare reference to our CoAwaitWaker, because this object
// (ArcWakerAwaiter) lives inside of CoAwaitWaker, and thus our lifetime is encompassed by
Expand Down Expand Up @@ -151,7 +158,15 @@ void guarded_rust_promise_awaiter_drop_in_place(PtrGuardedRustPromiseAwaiter);
// =======================================================================================
// CoAwaitWaker

// A CxxWaker implementation which provides an optimized path for awaiting KJ Promises in Rust.
// A CxxWaker implementation which provides an optimized path for awaiting KJ Promises in Rust. It
// consists of a KjWaker, an Event reference, and a set of "sub-Promise awaiters".
//
// The Event in question is responsible for calling `Future::poll()`, elsewhere I call it "the
// Future poll() Event". It owns this CoAwaitWaker in an object lifetime sense.
//
// The sub-Promise awaiters comprise an optional ArcWakerAwaiter and a list of zero or more
// RustPromiseAwaiters. These sub-Promise awaiters all wrap a KJ Promise of some sort, and arrange
// to arm the Future poll() Event when their Promises become ready.
//
// The PromiseNode base class is a hack to implement async tracing. That is, we only implement the
// `tracePromise()` function, and decide which Promise to trace into if/when the coroutine calls our
Expand Down Expand Up @@ -222,6 +237,13 @@ class CoAwaitWaker: public CxxWaker,
kj::Maybe<ArcWakerAwaiter> arcWakerAwaiter;
};

// =======================================================================================
// BoxFutureAwaiter, LazyBoxFutureAwaiter, and operator co_await implementations

// BoxFutureAwaiter<T> is a Future poll() Event, and is the inner implementation of our co_await
// syntax. It wraps a BoxFuture<T> and captures a reference to its enclosing KJ coroutine, arranging
// to continuously call `BoxFuture<T>::poll()` on the KJ event loop until the Future produces a
// result, after which it arms the enclosing KJ coroutine's Event.
template <typename T>
class BoxFutureAwaiter final: public kj::_::Event {
public:
Expand Down Expand Up @@ -272,15 +294,14 @@ class BoxFutureAwaiter final: public kj::_::Event {
static_cast<Event&>(coroutine).traceEvent(builder);
}

protected:
private:
kj::Maybe<kj::Own<kj::_::Event>> fire() override {
if (!awaitSuspendImpl()) {
coroutine.armDepthFirst();
}
return kj::none;
}

private:
kj::_::CoroutineBase& coroutine;
CoAwaitWaker coAwaitWaker;
// HACK: CoAwaitWaker implements the PromiseNode interface to integrate with the Coroutine class'
Expand All @@ -290,27 +311,36 @@ class BoxFutureAwaiter final: public kj::_::Event {
BoxFuture<T> future;
};

// LazyBoxFutureAwaiter<T> is the outer implementation of our co_await syntax, providing the
// await_ready(), await_suspend(), await_resume() facade expected by the compiler.
//
// LazyBoxFutureAwaiter is a type with two stages. At first, it merely wraps a BoxFuture<T>. Once
// its await_suspend() function is called, it transitions to wrap a BoxFutureAwaiter<T>, our inner
// awaiter implementation. We do this because we don't get a reference to our enclosing
// coroutine until await_suspend() is called, and our awaiter implementation is greatly simplified
// if we can avoid using a Maybe. So, we defer the real awaiter instantiation to await_suspend().
template <typename T>
class LazyBoxFutureAwaiter {
public:
LazyBoxFutureAwaiter(BoxFuture<T>&& future): impl(kj::mv(future)) {}

// Always return false, so our await_suspend() is guaranteed to be called.
bool await_ready() const { return false; }

// Initialize our wrapped Awaiter and forward to `BoxFutureAwaiter<T>::awaitSuspendImpl()`.
template <typename U> requires (kj::canConvert<U&, kj::_::CoroutineBase&>())
bool await_suspend(kj::_::stdcoro::coroutine_handle<U> handle) {
auto future = kj::mv(KJ_ASSERT_NONNULL(impl.template tryGet<BoxFuture<T>>()));
return impl.template init<BoxFutureAwaiter<T>>(handle.promise(), kj::mv(future))
.awaitSuspendImpl();
}

// TODO(now): Return non-void T.
// Forward to our wrapped `BoxFutureAwaiter<T>::awaitResumeImpl()`.
void await_resume() {
KJ_ASSERT_NONNULL(impl.template tryGet<BoxFutureAwaiter<T>>()).awaitResumeImpl();
return KJ_ASSERT_NONNULL(impl.template tryGet<BoxFutureAwaiter<T>>()).awaitResumeImpl();
}

private:
// TODO(now): Comment.
kj::OneOf<BoxFuture<T>, BoxFutureAwaiter<T>> impl;
};

Expand Down
4 changes: 1 addition & 3 deletions src/rust/async/future.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ bool box_future_poll_with_co_await_waker(BoxFuture<T>& self, const CoAwaitWaker&

// A `Pin<Box<dyn Future<Output = ()>>>` owned by C++.
//
// The only way to construct a BoxFutureVoid is by returning one from a Rust function.
//
// TODO(now): Figure out how to make this a template, BoxFuture<T>.
// The only way to construct a BoxFuture<T> is by returning one from a Rust function.
template <typename T>
class BoxFuture {
public:
Expand Down
9 changes: 9 additions & 0 deletions src/rust/async/promise.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

namespace workerd::rust::async {

// If these static assertions ever fire, we must update the `pub struct OwnPromiseNode` definition
// in promise.rs to match the new C++ size/layout.
//
// TODO(cleanup): Integrate bindgen into build system to obviate this.
static_assert(sizeof(OwnPromiseNode) == sizeof(uint64_t) * 1,
"OwnPromiseNode size changed");
static_assert(alignof(OwnPromiseNode) == alignof(uint64_t) * 1,
"OwnPromiseNode alignment changed");

void own_promise_node_drop_in_place(OwnPromiseNode* node) {
node->~OwnPromiseNode();
}
Expand Down
43 changes: 26 additions & 17 deletions src/rust/async/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,15 @@ use crate::ffi::own_promise_node_drop_in_place;
#[allow(dead_code)]
pub struct OwnPromiseNode(*const ());

#[repr(transparent)]
pub struct PtrOwnPromiseNode(*mut OwnPromiseNode);

// TODO(now): Safety comment, observe that it can be sent across threads, but will panic if polled
// without a KJ EventLoop active on the current thread.
// Safety: KJ Promises are not associated with threads, but with event loops at construction time.
// Therefore, they can be polled from any thread, as long as that thread has the correct event loop
// active at the time of the call to `poll()`. If the correct event loop is not active, the
// OwnPromiseNode's API will typically panic, undefined behavior could be possible. However, Rust
// doesn't have direct access to OwnPromiseNode's API. Instead, it can only use the Promise by
// having GuardedRustPromiseAwaiter consume it, and GuardedRustPromiseAwaiter implements the
// correct-executor guarantee.
unsafe impl Send for OwnPromiseNode {}

// TODO(now): bindgen to guarantee safety
unsafe impl ExternType for OwnPromiseNode {
type Id = cxx::type_id!("workerd::rust::async::OwnPromiseNode");
type Kind = cxx::kind::Trivial;
}

// TODO(now): bindgen to guarantee safety
unsafe impl ExternType for PtrOwnPromiseNode {
type Id = cxx::type_id!("workerd::rust::async::PtrOwnPromiseNode");
type Kind = cxx::kind::Trivial;
}

impl Drop for OwnPromiseNode {
fn drop(&mut self) {
// Safety:
Expand All @@ -38,3 +28,22 @@ impl Drop for OwnPromiseNode {
}
}
}

// Safety: We have a static_assert in promise.c++ which breaks if you change the size or alignment
// of the C++ definition of OwnPromiseNode, with a comment directing the reader to adjust the
// OwnPromiseNode definition in this .rs file.
//
// https://docs.rs/cxx/latest/cxx/trait.ExternType.html#integrating-with-bindgen-generated-types
unsafe impl ExternType for OwnPromiseNode {
type Id = cxx::type_id!("workerd::rust::async::OwnPromiseNode");
type Kind = cxx::kind::Trivial;
}

#[repr(transparent)]
pub struct PtrOwnPromiseNode(*mut OwnPromiseNode);

// Safety: Raw pointers are the same size in both languages.
unsafe impl ExternType for PtrOwnPromiseNode {
type Id = cxx::type_id!("workerd::rust::async::PtrOwnPromiseNode");
type Kind = cxx::kind::Trivial;
}

0 comments on commit 9c43b07

Please sign in to comment.