From 4b53081a84080decd0a52216ef3582f8e0066ab4 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Thu, 6 Feb 2025 11:00:38 -0500 Subject: [PATCH] feat(http/retry): model `ReplayBody` with `Frame` (#3598) pr #3559 (dd4fbcdb) refactored our trailer peeking body middleware to model its buffering in terms of the `Frame` type used in `http-body`'s 1.0 release. this branch performs a similar change for the other piece of body middleware that super linkerd's retry facilities: `ReplayBody`. the inner body `B` is now wrapped in the `ForwardCompatibleBody` adapter, and we now poll it in terms of frames. NB: polling the underlying in terms of frames has a subtle knock-on effect regarding when we observe the trailers, in the liminal period between this refactor and the subsequent upgrade to hyper 1.0, whilst we must still implement the existing 0.4 interface for `Body` that includes `poll_trailers()`. see the comment above `replay_trailers` for more on this, describing why we now initialize this to `true`. relatedly, this is why we no longer delegate down to `B::poll_trailers` ourselves. it will have already been called by our adapter. `ReplayBody::is_end_stream()` now behaves identically when initially polling a body compared to subsequent replays. this is fine, as `is_end_stream()` is a hint that facilitates optimizations (https://github.com/hyperium/http-body/pull/143). we do still report the end properly, we just won't be quite as prescient on the initial playthrough. in the same manner as the existing `frame()` method mimics `http_body_util::BodyExt::frame()`, this branch introduces a new `ForwardCompatibleBody::poll_frame()` method. this allows us to poll the compatibility layer for a `Frame`. see: - https://github.com/linkerd/linkerd2/issues/8733. - https://github.com/linkerd/linkerd2-proxy/pull/3559 --- * nit(http/retry): install tracing subscriber in tests some tests do not set up a tracing subscriber, because they do not use the shared `Test::new()` helper function used elsewhere in this test suite. to provide a trace of the test's execution in the event of a failure, initialize a tracing subscriber in some additional unit tests. Signed-off-by: katelyn martin * feat(http/retry): `ForwardCompatibleBody` exposes hints this commit removes the `cfg(test)` gate on the method exposing `B::is_end_stream()`, and introduces another method also exposing the `size_hint()` method. we will want these in order to implement these methods for `ReplayBody`. Signed-off-by: katelyn martin * refactor(http/retry): `ForwardCompatibleBody::poll_frame()` in the same manner as the existing `frame()` method mimics `http_body_util::BodyExt::frame()`, this commit introduces a new `ForwardCompatibleBody::poll_frame()` method. this allows us to poll the compatibility layer for a `Frame`. Signed-off-by: katelyn martin * feat(http/retry): `ReplayBody` polls for frames pr #3559 (dd4fbcd) refactored our trailer peeking body middleware to model its buffering in terms of the `Frame` type used in `http-body`'s 1.0 release. this commit performs a similar change for the other piece of body middleware that super linkerd's retry facilities: `ReplayBody`. the inner body `B` is now wrapped in the `ForwardCompatibleBody` adapter, and we now poll it in terms of frames. NB: polling the underlying in terms of frames has a subtle knock-on effect regarding when we observe the trailers, in the liminal period between this refactor and the subsequent upgrade to hyper 1.0, whilst we must still implement the existing 0.4 interface for `Body` that includes `poll_trailers()`. see the comment above `replay_trailers` for more on this, describing why we now initialize this to `true`. relatedly, this is why we now longer delegate down to `B::poll_trailers` ourselves. it will have already been called by our adapter. `ReplayBody::is_end_stream()` now behaves identically when initially polling a body compared to subsequent replays. this is fine, as `is_end_stream()` is a hint that facilitates optimizations (hyperium/http-body#143). we do still report the end properly, we just won't be quite as prescient on the initial playthrough. see: - https://github.com/linkerd/linkerd2/issues/8733. - https://github.com/linkerd/linkerd2-proxy/pull/3559 Signed-off-by: katelyn martin * feat(http/retry): `is_end_stream()` traces this commit introduces some trace-level diagnostics tracking how the replay body has determined whether or not it has reached the end of the stream. Signed-off-by: katelyn martin * nit(http/retry): capitalize trace event messages Signed-off-by: katelyn martin --------- Signed-off-by: katelyn martin --- linkerd/http/retry/src/compat.rs | 24 ++++++- linkerd/http/retry/src/replay.rs | 99 +++++++++++++++++++------- linkerd/http/retry/src/replay/tests.rs | 13 ++++ 3 files changed, 109 insertions(+), 27 deletions(-) diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index 79698d0797..a4e452d6be 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -1,6 +1,11 @@ //! Compatibility utilities for upgrading to http-body 1.0. -use http_body::Body; +use http_body::{Body, SizeHint}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; pub(crate) use self::frame::Frame; @@ -42,10 +47,25 @@ impl ForwardCompatibleBody { } /// Returns `true` when the end of stream has been reached. - #[cfg(test)] pub(crate) fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } + + /// Returns the bounds on the remaining length of the stream. + pub(crate) fn size_hint(&self) -> SizeHint { + self.inner.size_hint() + } +} + +impl ForwardCompatibleBody { + pub(crate) fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, B::Error>>> { + let mut fut = self.get_mut().frame(); + let pinned = Pin::new(&mut fut); + pinned.poll(cx) + } } /// Future that resolves to the next frame from a `Body`. diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 8a931ddf65..03804feab0 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -68,7 +68,7 @@ struct SharedState { struct BodyState { replay: Replay, trailers: Option, - rest: B, + rest: crate::compat::ForwardCompatibleBody, is_completed: bool, /// Maximum number of bytes to buffer. @@ -104,13 +104,19 @@ impl ReplayBody { state: Some(BodyState { replay: Default::default(), trailers: None, - rest: body, + rest: crate::compat::ForwardCompatibleBody::new(body), is_completed: false, max_bytes: max_bytes + 1, }), - // The initial `ReplayBody` has nothing to replay + // The initial `ReplayBody` has no data to replay. replay_body: false, - replay_trailers: false, + // NOTE(kate): When polling the inner body in terms of frames, we will not yield + // `Ready(None)` from `Body::poll_data()` until we have reached the end of the + // underlying stream. Once we have migrated to `http-body` v1, this field will be + // initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must + // initialize this to true; `poll_trailers()` will be called after the trailers have + // been observed previously, even for the initial body. + replay_trailers: true, }) } @@ -204,16 +210,33 @@ where // Poll the inner body for more data. If the body has ended, remember // that so that future clones will not try polling it again (as // described above). - let data = { + let data: B::Data = { + use futures::{future::Either, ready}; + // Poll the inner body for the next frame. tracing::trace!("Polling initial body"); - match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) { - Some(Ok(data)) => data, - Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into); + let frame = match ready!(poll) { + // The body yielded a new frame. + Some(Ok(frame)) => frame, + // The body yielded an error. + Some(Err(error)) => return Poll::Ready(Some(Err(error))), + // The body has reached the end of the stream. None => { tracing::trace!("Initial body completed"); state.is_completed = true; return Poll::Ready(None); } + }; + // Now, inspect the frame: was it a chunk of data, or a trailers frame? + match Self::split_frame(frame) { + Some(Either::Left(data)) => data, + Some(Either::Right(trailers)) => { + tracing::trace!("Initial body completed"); + state.trailers = Some(trailers); + state.is_completed = true; + return Poll::Ready(None); + } + None => return Poll::Ready(None), } }; @@ -234,7 +257,7 @@ where /// NOT be polled until the previous body has been dropped. fn poll_trailers( self: Pin<&mut Self>, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll, Self::Error>> { let this = self.get_mut(); let state = Self::acquire_state(&mut this.state, &this.shared.body); @@ -251,40 +274,40 @@ where } } - // If the inner body has previously ended, don't poll it again. - if !state.rest.is_end_stream() { - return Pin::new(&mut state.rest) - .poll_trailers(cx) - .map_ok(|tlrs| { - // Record a copy of the inner body's trailers in the shared state. - if state.trailers.is_none() { - state.trailers.clone_from(&tlrs); - } - tlrs - }) - .map_err(Into::into); - } - Poll::Ready(Ok(None)) } + #[tracing::instrument( + skip_all, + level = "trace", + fields( + state.is_some = %self.state.is_some(), + replay_trailers = %self.replay_trailers, + replay_body = %self.replay_body, + is_completed = ?self.state.as_ref().map(|s| s.is_completed), + ) + )] fn is_end_stream(&self) -> bool { // If the initial body was empty as soon as it was wrapped, then we are finished. if self.shared.was_empty { + tracing::trace!("Initial body was empty, stream has ended"); return true; } let Some(state) = self.state.as_ref() else { // This body is not currently the "active" replay being polled. + tracing::trace!("Inactive replay body is not complete"); return false; }; // if this body has data or trailers remaining to play back, it // is not EOS - !self.replay_body && !self.replay_trailers + let eos = !self.replay_body && !self.replay_trailers // if we have replayed everything, the initial body may // still have data remaining, so ask it - && state.rest.is_end_stream() + && state.rest.is_end_stream(); + tracing::trace!(%eos, "Checked replay body end-of-stream"); + eos } #[inline] @@ -334,6 +357,32 @@ impl Drop for ReplayBody { } } +impl ReplayBody { + /// Splits a `Frame` into a chunk of data or a header map. + /// + /// Frames do not expose their inner enums, and instead expose `into_data()` and + /// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))` + /// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame. + /// + /// This returns `None` if an unknown frame is provided, that is neither. + /// + /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. + fn split_frame( + frame: crate::compat::Frame, + ) -> Option> { + use {crate::compat::Frame, futures::future::Either}; + match frame.into_data().map_err(Frame::into_trailers) { + Ok(data) => Some(Either::Left(data)), + Err(Ok(trailers)) => Some(Either::Right(trailers)), + Err(Err(_unknown)) => { + // It's possible that some sort of unknown frame could be encountered. + tracing::warn!("An unknown body frame has been buffered"); + None + } + } + } +} + // === impl BodyState === impl BodyState { diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 90c0140188..67640ac23f 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -327,6 +327,7 @@ fn empty_body_is_always_eos() { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(TestBody::one_data_frame(), 64 * 1024) .expect("body must not be too large"); let replay = initial.clone(); @@ -344,6 +345,8 @@ async fn eos_only_when_fully_replayed() { .expect("yields a frame") .into_data() .expect("yields a data frame"); + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + assert!(initial.frame().await.is_none()); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); drop(initial); @@ -388,6 +391,7 @@ async fn eos_only_when_fully_replayed() { async fn eos_only_when_fully_replayed_with_trailers() { // Test that each clone of a body is not EOS until the data has been // fully replayed. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(TestBody::one_data_frame().with_trailers(), 64 * 1024) .expect("body must not be too large"); let replay = initial.clone(); @@ -561,6 +565,7 @@ async fn caps_across_replays() { #[test] fn body_too_big() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let max_size = 8; let mk_body = |sz: usize| -> BoxBody { let s = (0..sz).map(|_| "x").collect::(); @@ -597,6 +602,7 @@ fn body_too_big() { #[allow(clippy::redundant_clone)] #[test] fn size_hint_is_correct_for_empty_body() { + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let initial = ReplayBody::try_new(BoxBody::empty(), 64 * 1024).expect("empty body can't be too large"); let size = initial.size_hint(); @@ -617,6 +623,7 @@ async fn size_hint_is_correct_across_replays() { debug_assert!(SIZE as usize <= CAPACITY); // Create the initial body, and a replay. + let _trace = linkerd_tracing::test::with_default_filter("linkerd_http_retry=trace"); let mut initial = ReplayBody::try_new(BoxBody::from_static(BODY), CAPACITY) .expect("empty body can't be too large"); let mut replay = initial.clone(); @@ -629,6 +636,12 @@ async fn size_hint_is_correct_across_replays() { // Read the body, check the size hint again. assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY)); + let initial = { + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + let mut body = crate::compat::ForwardCompatibleBody::new(initial); + assert!(body.frame().await.is_none()); + body.into_inner() + }; debug_assert!(initial.is_end_stream()); // TODO(kate): this currently misreports the *remaining* size of the body. // let size = initial.size_hint();