Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Feb 18, 2025
1 parent 3a1144e commit f041fac
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions src/core/ext/transport/chaotic_good/server_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,18 @@ auto ChaoticGoodServerTransport::StreamDispatch::ProcessNextFrame(
return Switch(
incoming_frame.header().type,
Case<FrameType::kClientInitialMetadata>([&, this]() {
return DiscardResult(TrySeq(
incoming_frame.Payload(),
[this, header = incoming_frame.header()](Frame frame) mutable {
return NewStream(
header.stream_id,
std::move(absl::get<ClientInitialMetadataFrame>(frame)));
}));
return Map(
TrySeq(
incoming_frame.Payload(),
[this, header = incoming_frame.header()](Frame frame) mutable {
return NewStream(
header.stream_id,
std::move(absl::get<ClientInitialMetadataFrame>(frame)));
}),
[](absl::Status status) {
LOG(ERROR) << "Failed to process client initial metadata: "
<< status;
});
}),
Case<FrameType::kMessage>([&, this]() mutable {
DispatchFrame<MessageFrame>(std::move(incoming_frame));
Expand Down Expand Up @@ -232,7 +237,11 @@ auto ChaoticGoodServerTransport::StreamDispatch::ProcessNextFrame(

void ChaoticGoodServerTransport::StreamDispatch::OnIncomingFrame(
IncomingFrame incoming_frame) {
incoming_frame_spawner_->Spawn(ProcessNextFrame(std::move(incoming_frame)));
incoming_frame_spawner_->Spawn(
[self = RefAsSubclass<StreamDispatch>(),
incoming_frame = std::move(incoming_frame)]() mutable {
return self->ProcessNextFrame(std::move(incoming_frame));
});
}

ChaoticGoodServerTransport::ChaoticGoodServerTransport(
Expand Down Expand Up @@ -328,16 +337,17 @@ ChaoticGoodServerTransport::StreamDispatch::ExtractStream(uint32_t stream_id) {

absl::Status ChaoticGoodServerTransport::StreamDispatch::AddStream(
uint32_t stream_id, CallInitiator call_initiator) {
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " NewStream " << stream_id;
MutexLock lock(&mu_);
GRPC_TRACE_LOG(chaotic_good, INFO)
<< "CHAOTIC_GOOD " << this << " NewStream " << stream_id
<< " last_seen_new_stream_id_=" << last_seen_new_stream_id_;
auto it = stream_map_.find(stream_id);
if (it != stream_map_.end()) {
return absl::InternalError("Stream already exists");
}
if (stream_id <= last_seen_new_stream_id_) {
return absl::InternalError("Stream id is not increasing");
}
if (it != stream_map_.end()) {
return absl::InternalError("Stream already exists");
}
const bool on_done_added = call_initiator.OnDone(
[self = RefAsSubclass<StreamDispatch>(), stream_id](bool) {
GRPC_TRACE_LOG(chaotic_good, INFO)
Expand Down

0 comments on commit f041fac

Please sign in to comment.