diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index db509e261e..a4e452d6be 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -47,20 +47,17 @@ impl ForwardCompatibleBody { } /// Returns `true` when the end of stream has been reached. - #[allow(unused, reason = "not yet used")] pub(crate) fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } /// Returns the bounds on the remaining length of the stream. - #[allow(unused, reason = "not yet used")] pub(crate) fn size_hint(&self) -> SizeHint { self.inner.size_hint() } } impl ForwardCompatibleBody { - #[allow(unused, reason = "not yet used")] pub(crate) fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 8a931ddf65..d24e58c737 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -68,7 +68,7 @@ struct SharedState { struct BodyState { replay: Replay, trailers: Option, - rest: B, + rest: crate::compat::ForwardCompatibleBody, is_completed: bool, /// Maximum number of bytes to buffer. @@ -104,13 +104,19 @@ impl ReplayBody { state: Some(BodyState { replay: Default::default(), trailers: None, - rest: body, + rest: crate::compat::ForwardCompatibleBody::new(body), is_completed: false, max_bytes: max_bytes + 1, }), - // The initial `ReplayBody` has nothing to replay + // The initial `ReplayBody` has no data to replay. replay_body: false, - replay_trailers: false, + // NOTE(kate): When polling the inner body in terms of frames, we will not yield + // `Ready(None)` from `Body::poll_data()` until we have reached the end of the + // underlying stream. Once we have migrated to `http-body` v1, this field will be + // initialized `false` thanks to the use of `Body::poll_frame()`, but for now we must + // initialize this to true; `poll_trailers()` will be called after the trailers have + // been observed previously, even for the initial body. + replay_trailers: true, }) } @@ -204,16 +210,33 @@ where // Poll the inner body for more data. If the body has ended, remember // that so that future clones will not try polling it again (as // described above). - let data = { + let data: B::Data = { + use futures::{future::Either, ready}; + // Poll the inner body for the next frame. tracing::trace!("Polling initial body"); - match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) { - Some(Ok(data)) => data, - Some(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + let poll = Pin::new(&mut state.rest).poll_frame(cx).map_err(Into::into); + let frame = match ready!(poll) { + // The body yielded a new frame. + Some(Ok(frame)) => frame, + // The body yielded an error. + Some(Err(error)) => return Poll::Ready(Some(Err(error))), + // The body has reached the end of the stream. None => { tracing::trace!("Initial body completed"); state.is_completed = true; return Poll::Ready(None); } + }; + // Now, inspect the frame: was it a chunk of data, or a trailers frame? + match Self::split_frame(frame) { + Some(Either::Left(data)) => data, + Some(Either::Right(trailers)) => { + tracing::trace!("Initial body completed"); + state.trailers = Some(trailers); + state.is_completed = true; + return Poll::Ready(None); + } + None => return Poll::Ready(None), } }; @@ -234,7 +257,7 @@ where /// NOT be polled until the previous body has been dropped. fn poll_trailers( self: Pin<&mut Self>, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll, Self::Error>> { let this = self.get_mut(); let state = Self::acquire_state(&mut this.state, &this.shared.body); @@ -251,20 +274,6 @@ where } } - // If the inner body has previously ended, don't poll it again. - if !state.rest.is_end_stream() { - return Pin::new(&mut state.rest) - .poll_trailers(cx) - .map_ok(|tlrs| { - // Record a copy of the inner body's trailers in the shared state. - if state.trailers.is_none() { - state.trailers.clone_from(&tlrs); - } - tlrs - }) - .map_err(Into::into); - } - Poll::Ready(Ok(None)) } @@ -334,6 +343,32 @@ impl Drop for ReplayBody { } } +impl ReplayBody { + /// Splits a `Frame` into a chunk of data or a header map. + /// + /// Frames do not expose their inner enums, and instead expose `into_data()` and + /// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))` + /// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame. + /// + /// This returns `None` if an unknown frame is provided, that is neither. + /// + /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. + fn split_frame( + frame: crate::compat::Frame, + ) -> Option> { + use {crate::compat::Frame, futures::future::Either}; + match frame.into_data().map_err(Frame::into_trailers) { + Ok(data) => Some(Either::Left(data)), + Err(Ok(trailers)) => Some(Either::Right(trailers)), + Err(Err(_unknown)) => { + // It's possible that some sort of unknown frame could be encountered. + tracing::warn!("an unknown body frame has been buffered"); + None + } + } + } +} + // === impl BodyState === impl BodyState { diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 75c6447fd8..67640ac23f 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -345,6 +345,8 @@ async fn eos_only_when_fully_replayed() { .expect("yields a frame") .into_data() .expect("yields a data frame"); + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + assert!(initial.frame().await.is_none()); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); drop(initial); @@ -634,6 +636,12 @@ async fn size_hint_is_correct_across_replays() { // Read the body, check the size hint again. assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY)); + let initial = { + // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. + let mut body = crate::compat::ForwardCompatibleBody::new(initial); + assert!(body.frame().await.is_none()); + body.into_inner() + }; debug_assert!(initial.is_end_stream()); // TODO(kate): this currently misreports the *remaining* size of the body. // let size = initial.size_hint();