Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Feb 19, 2025
1 parent 6fea7b4 commit 75cda6e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ void ChaoticGoodServerTransport::StreamDispatch::DispatchFrame(
if (stream == nullptr) return;
stream->spawn_serializer->Spawn(
[this, stream, frame = std::move(frame)]() mutable {
DCHECK_NE(stream.get(), nullptr);
auto& call = stream->call;
return call.CancelIfFails(call.UntilCallCompletes(TrySeq(
frame.Payload(),
Expand Down Expand Up @@ -228,7 +229,7 @@ auto ChaoticGoodServerTransport::StreamDispatch::ProcessNextFrame(
<< "Cancel stream " << incoming_frame.header().stream_id
<< (stream != nullptr ? " (active)" : " (not found)");
if (stream == nullptr) return;
auto c = std::move(stream->call);
auto& c = stream->call;
c.SpawnInfallible("cancel", [c]() mutable { c.Cancel(); });
}),
Default([&]() {
Expand Down
53 changes: 45 additions & 8 deletions src/core/lib/transport/call_spine.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,92 +321,129 @@ class CallInitiator {

CallInitiator() = default;
explicit CallInitiator(RefCountedPtr<CallSpine> spine)
: spine_(std::move(spine)) {}
: spine_(std::move(spine)) {
DCHECK_NE(spine_.get(), nullptr);
}

// Wrap a promise so that if it returns failure it automatically cancels
// the rest of the call.
// The resulting (returned) promise will resolve to Empty.
template <typename Promise>
auto CancelIfFails(Promise promise) {
DCHECK_NE(spine_.get(), nullptr);
return spine_->CancelIfFails(std::move(promise));
}

auto PullServerInitialMetadata() {
DCHECK_NE(spine_.get(), nullptr);
return spine_->PullServerInitialMetadata();
}

auto PushMessage(MessageHandle message) {
DCHECK_NE(spine_.get(), nullptr);
return spine_->PushClientToServerMessage(std::move(message));
}

void SpawnPushMessage(MessageHandle message) {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnPushClientToServerMessage(std::move(message));
}

void FinishSends() { spine_->FinishSends(); }
void FinishSends() {
DCHECK_NE(spine_.get(), nullptr);
spine_->FinishSends();
}

void SpawnFinishSends() { spine_->SpawnFinishSends(); }
void SpawnFinishSends() {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnFinishSends();
}

auto PullMessage() { return spine_->PullServerToClientMessage(); }
auto PullMessage() {
DCHECK_NE(spine_.get(), nullptr);
return spine_->PullServerToClientMessage();
}

auto PullServerTrailingMetadata() {
DCHECK_NE(spine_.get(), nullptr);
return spine_->PullServerTrailingMetadata();
}

void Cancel(absl::Status error) {
DCHECK_NE(spine_.get(), nullptr);
CHECK(!error.ok());
auto status = ServerMetadataFromStatus(error);
status->Set(GrpcCallWasCancelled(), true);
spine_->PushServerTrailingMetadata(std::move(status));
}

void SpawnCancel(absl::Status error) {
DCHECK_NE(spine_.get(), nullptr);
CHECK(!error.ok());
auto status = ServerMetadataFromStatus(error);
status->Set(GrpcCallWasCancelled(), true);
spine_->SpawnPushServerTrailingMetadata(std::move(status));
}

void Cancel() { spine_->Cancel(); }
void Cancel() {
DCHECK_NE(spine_.get(), nullptr);
spine_->Cancel();
}

void SpawnCancel() { spine_->SpawnCancel(); }
void SpawnCancel() {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnCancel();
}

GRPC_MUST_USE_RESULT bool OnDone(absl::AnyInvocable<void(bool)> fn) {
DCHECK_NE(spine_.get(), nullptr);
return spine_->OnDone(std::move(fn));
}

template <typename Promise>
auto UntilCallCompletes(Promise promise) {
DCHECK_NE(spine_.get(), nullptr);
return spine_->UntilCallCompletes(std::move(promise));
}

template <typename PromiseFactory>
void SpawnGuarded(absl::string_view name, PromiseFactory promise_factory) {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnGuarded(name, std::move(promise_factory));
}

template <typename PromiseFactory>
void SpawnGuardedUntilCallCompletes(absl::string_view name,
PromiseFactory promise_factory) {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnGuardedUntilCallCompletes(name, std::move(promise_factory));
}

template <typename PromiseFactory>
void SpawnInfallible(absl::string_view name, PromiseFactory promise_factory) {
DCHECK_NE(spine_.get(), nullptr);
spine_->SpawnInfallible(name, std::move(promise_factory));
}

template <typename PromiseFactory>
auto SpawnWaitable(absl::string_view name, PromiseFactory promise_factory) {
DCHECK_NE(spine_.get(), nullptr);
return spine_->SpawnWaitable(name, std::move(promise_factory));
}

bool WasCancelledPushed() const {
DCHECK_NE(spine_.get(), nullptr);
return spine_->call_filters().WasCancelledPushed();
}

Arena* arena() { return spine_->arena(); }
Party* party() { return spine_.get(); }
Arena* arena() {
DCHECK_NE(spine_.get(), nullptr);
return spine_->arena();
}
Party* party() {
DCHECK_NE(spine_.get(), nullptr);
return spine_.get();
}

private:
friend class CallHandler;
Expand Down

0 comments on commit 75cda6e

Please sign in to comment.