Skip to content

Commit

Permalink
feat: add debug spans for decoding requests
Browse files Browse the repository at this point in the history
Closes: #1759
  • Loading branch information
neoeinstein committed Jun 28, 2024
1 parent 74d7673 commit c1963b5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 16 deletions.
6 changes: 3 additions & 3 deletions tonic/src/codec/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl CompressionEncoding {
}
}

pub(crate) fn as_str(self) -> &'static str {
pub(crate) const fn as_str(self) -> &'static str {
match self {
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip => "gzip",
Expand All @@ -169,11 +169,11 @@ impl CompressionEncoding {
}

#[cfg(any(feature = "gzip", feature = "zstd"))]
pub(crate) fn into_header_value(self) -> http::HeaderValue {
pub(crate) const fn into_header_value(self) -> http::HeaderValue {
http::HeaderValue::from_static(self.as_str())
}

pub(crate) fn encodings() -> &'static [Self] {
pub(crate) const fn encodings() -> &'static [Self] {
&[
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip,
Expand Down
79 changes: 66 additions & 13 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}

#[derive(Debug, Clone)]
enum State {
ReadHeader,
ReadHeader {
span: Option<tracing::Span>,
},
ReadBody {
span: tracing::Span,
compression: Option<CompressionEncoding>,
len: usize,
},
Error(Status),
Error(Box<Status>),
}

impl State {
fn read_header() -> Self {
Self::ReadHeader { span: None }
}

fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
let span = tracing::debug_span!(
"read_body",
body.compression = compression.map(|c| c.as_str()).unwrap_or("none"),
body.bytes.compressed = compression.is_some().then_some(len),
body.bytes.uncompressed = compression.is_none().then_some(len),
);
Self::ReadBody {
span,
compression,
len,
}
}

fn span(&self) -> Option<&tracing::Span> {
match self {
Self::ReadHeader { span } => span.as_ref(),
Self::ReadBody { span, .. } => Some(span),
Self::Error(_) => None,
}
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -125,7 +156,7 @@ impl<T> Streaming<T> {
.map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
state: State::read_header(),
direction,
buf: BytesMut::with_capacity(buffer_size),
trailers: None,
Expand All @@ -142,7 +173,19 @@ impl StreamingInner {
&mut self,
buffer_settings: BufferSettings,
) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
if let State::ReadHeader { span } = &mut self.state {
if !self.buf.has_remaining() {
return Ok(None);
}

let span = span.get_or_insert_with(|| {
tracing::debug_span!(
"read_header",
body.compression = "none",
body.bytes = tracing::field::Empty,
)
});
let _guard = span.enter();
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}
Expand All @@ -151,7 +194,8 @@ impl StreamingInner {
0 => None,
1 => {
{
if self.encoding.is_some() {
if let Some(ce) = self.encoding {
span.record("body.compression", ce.as_str());
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
Expand All @@ -177,6 +221,7 @@ impl StreamingInner {
};

let len = self.buf.get_u32() as usize;
span.record("body.bytes", len);
let limit = self
.max_message_size
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
Expand All @@ -191,14 +236,19 @@ impl StreamingInner {
}

self.buf.reserve(len);
drop(_guard);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
self.state = State::read_body(compression_encoding, len)
}

if let State::ReadBody { len, compression } = self.state {
if let State::ReadBody {
len,
span,
compression,
} = &self.state
{
let (len, compression) = (*len, *compression);
let _guard = span.enter();
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
Expand Down Expand Up @@ -228,6 +278,7 @@ impl StreamingInner {
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
span.record("body.bytes.uncompressed", decompressed_len);
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
Expand All @@ -241,14 +292,16 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let _guard = self.state.span().map(|s| s.enter());
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

let _ = std::mem::replace(&mut self.state, State::Error(status.clone()));
drop(_guard);
let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone())));
debug!("decoder inner stream error: {:?}", status);
return Poll::Ready(Err(status));
}
Expand Down Expand Up @@ -378,7 +431,7 @@ impl<T> Streaming<T> {
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
Some(msg) => {
self.inner.state = State::ReadHeader;
self.inner.state = State::read_header();
Ok(Some(msg))
}
None => Ok(None),
Expand All @@ -394,7 +447,7 @@ impl<T> Stream for Streaming<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let State::Error(status) = &self.inner.state {
return Poll::Ready(Some(Err(status.clone())));
return Poll::Ready(Some(Err(*status.clone())));
}

if let Some(item) = self.decode_chunk()? {
Expand Down

0 comments on commit c1963b5

Please sign in to comment.