Skip to content

Commit

Permalink
refactor(app/core): a unit test suite for rescue body (#3615)
Browse files Browse the repository at this point in the history
`linkerd-app-core` includes an error recovery body middleware. this middleware will gracefully catch and report errors encountered when polling an inner body, and via an `R`-typed recovery strategy provided by the caller, will attempt to map the error to a gRPC status code denoting an error.

before we upgrade to hyper 1.0 in service of linkerd/linkerd2#8733, we add some test coverage to ensure that we preserve the behavior of this middleware.

see:
* linkerd/linkerd2#8733
* #3614.

for historical context on this tower layer, see:
* #222
* #1246
* #1282

---

* refactor(http/retry): outline `ForwardCompatibleBody<B>`

in #3559 (4b53081), we introduced a backported `Frame<T>` type, and a
`ForwardCompatibleBody<B>` type that allows us to interact with a
`http_body::Body` circa 0.4.6 in terms of frame-based interfaces that
match those of the 1.0 interface.

see linkerd/linkerd2#8733 for more information on upgrading hyper.

in #3559, we narrowly added this as an internal submodule of the
`linkerd-http-retry` library. these facilities however, would have
utility in other places such as `linkerd-app-core`.

this commit pulls these compatibility shims out into a
`linkerd-http-body-compat` library so that they can be imported and
reused elsewhere.

Signed-off-by: katelyn martin <[email protected]>

* nit(http/body-compat): tidy `combinators` imports

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): hoist `errors::code_header` helper

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): `l5d-*` constants are headers

these are header values. `http::HeaderName` has a const fn constructor,
so let's use that.

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): grpc constants are headers

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): hoist `l5d-` and `grpc-` constants

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): outline `ResponseBody` middleware

we'll add a few tests for this middleware shortly.

this commit moves this middleware out into its own submodule.

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): encapsulate `ResponseBody` enum

for other body middleware, we hide inner enum variants and their
constituent members by using the "inner" pattern.

this commit tweaks `ResponseBody` to follow suit, such that it now holds
an `Inner`, but does not expose its passthrough and rescue variants to
callers.

Signed-off-by: katelyn martin <[email protected]>

* docs(app/core): document `ResponseBody<R, B>`

this adds a small documentation comment describing what this type does.

Signed-off-by: katelyn martin <[email protected]>

* refactor(app/core): a unit test suite for rescue body

this commit introduces a test suite for our error recovery middleware.

this body middleware provides a mechanism to "rescue" errors, gracefully
mapping an error encountered when polling a gRPC body into e.g. trailers
with a gRPC status code.

before we upgrade this middleware in service of linkerd/linkerd2#8733,
we add some test coverage to ensure that we preserve this middleware.

Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn authored Feb 13, 2025
1 parent 2f848a0 commit b883089
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 163 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1358,12 +1358,14 @@ dependencies = [
"linkerd-error",
"linkerd-error-respond",
"linkerd-exp-backoff",
"linkerd-http-body-compat",
"linkerd-http-metrics",
"linkerd-identity",
"linkerd-idle-cache",
"linkerd-io",
"linkerd-meshtls",
"linkerd-metrics",
"linkerd-mock-http-body",
"linkerd-opencensus",
"linkerd-opentelemetry",
"linkerd-proxy-api-resolve",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,6 @@ linkerd-system = { path = "../../system" }
semver = "1"

[dev-dependencies]
linkerd-http-body-compat = { path = "../../http/body-compat" }
linkerd-mock-http-body = { path = "../../mock/http-body" }
quickcheck = { version = "1", default-features = false }
35 changes: 35 additions & 0 deletions linkerd/app/core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod body;
pub mod respond;

pub use self::respond::{HttpRescue, NewRespond, NewRespondService, SyntheticHttpResponse};
Expand All @@ -6,6 +7,16 @@ pub use linkerd_proxy_http::h2::H2Error;
pub use linkerd_stack::{FailFastError, LoadShedError};
pub use tonic::Code as Grpc;

/// Header names and values related to error responses.
pub mod header {
use http::header::{HeaderName, HeaderValue};
pub const L5D_PROXY_CONNECTION: HeaderName = HeaderName::from_static("l5d-proxy-connection");
pub const L5D_PROXY_ERROR: HeaderName = HeaderName::from_static("l5d-proxy-error");
pub(super) const GRPC_CONTENT_TYPE: HeaderValue = HeaderValue::from_static("application/grpc");
pub(super) const GRPC_MESSAGE: HeaderName = HeaderName::from_static("grpc-message");
pub(super) const GRPC_STATUS: HeaderName = HeaderName::from_static("grpc-status");
}

#[derive(Debug, thiserror::Error)]
#[error("connect timed out after {0:?}")]
pub struct ConnectTimeout(pub(crate) std::time::Duration);
Expand All @@ -18,3 +29,27 @@ pub fn has_grpc_status(error: &crate::Error, code: tonic::Code) -> bool {
.map(|s| s.code() == code)
.unwrap_or(false)
}

// Copied from tonic, where it's private.
fn code_header(code: tonic::Code) -> http::HeaderValue {
use {http::HeaderValue, tonic::Code};
match code {
Code::Ok => HeaderValue::from_static("0"),
Code::Cancelled => HeaderValue::from_static("1"),
Code::Unknown => HeaderValue::from_static("2"),
Code::InvalidArgument => HeaderValue::from_static("3"),
Code::DeadlineExceeded => HeaderValue::from_static("4"),
Code::NotFound => HeaderValue::from_static("5"),
Code::AlreadyExists => HeaderValue::from_static("6"),
Code::PermissionDenied => HeaderValue::from_static("7"),
Code::ResourceExhausted => HeaderValue::from_static("8"),
Code::FailedPrecondition => HeaderValue::from_static("9"),
Code::Aborted => HeaderValue::from_static("10"),
Code::OutOfRange => HeaderValue::from_static("11"),
Code::Unimplemented => HeaderValue::from_static("12"),
Code::Internal => HeaderValue::from_static("13"),
Code::Unavailable => HeaderValue::from_static("14"),
Code::DataLoss => HeaderValue::from_static("15"),
Code::Unauthenticated => HeaderValue::from_static("16"),
}
}
313 changes: 313 additions & 0 deletions linkerd/app/core/src/errors/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
use super::{
header::{GRPC_MESSAGE, GRPC_STATUS},
respond::{HttpRescue, SyntheticHttpResponse},
};
use http::header::HeaderValue;
use linkerd_error::{Error, Result};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tracing::{debug, warn};

/// Returns a "gRPC rescue" body.
///
/// This returns a body that, should the inner `B`-typed body return an error when polling for
/// DATA frames, will "rescue" the stream and return a TRAILERS frame that describes the error.
#[pin_project(project = ResponseBodyProj)]
pub struct ResponseBody<R, B>(#[pin] Inner<R, B>);

#[pin_project(project = InnerProj)]
enum Inner<R, B> {
Passthru(#[pin] B),
GrpcRescue {
#[pin]
inner: B,
trailers: Option<http::HeaderMap>,
rescue: R,
emit_headers: bool,
},
}

// === impl ResponseBody ===

impl<R, B> ResponseBody<R, B> {
/// Returns a body in "passthru" mode.
pub fn passthru(inner: B) -> Self {
Self(Inner::Passthru(inner))
}

/// Returns a "gRPC rescue" body.
pub fn grpc_rescue(inner: B, rescue: R, emit_headers: bool) -> Self {
Self(Inner::GrpcRescue {
inner,
rescue,
emit_headers,
trailers: None,
})
}
}

impl<R, B: Default + linkerd_proxy_http::Body> Default for ResponseBody<R, B> {
fn default() -> Self {
Self(Inner::Passthru(B::default()))
}
}

impl<R, B> linkerd_proxy_http::Body for ResponseBody<R, B>
where
B: linkerd_proxy_http::Body<Error = Error>,
R: HttpRescue<B::Error>,
{
type Data = B::Data;
type Error = B::Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let ResponseBodyProj(inner) = self.project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_data(cx),
InnerProj::GrpcRescue {
inner,
trailers,
rescue,
emit_headers,
} => {
// should not be calling poll_data if we have set trailers derived from an error
assert!(trailers.is_none());
match inner.poll_data(cx) {
Poll::Ready(Some(Err(error))) => {
let SyntheticHttpResponse {
grpc_status,
message,
..
} = rescue.rescue(error)?;
let t = Self::grpc_trailers(grpc_status, &message, *emit_headers);
*trailers = Some(t);
Poll::Ready(None)
}
data => data,
}
}
}
}

#[inline]
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let ResponseBodyProj(inner) = self.project();
match inner.project() {
InnerProj::Passthru(inner) => inner.poll_trailers(cx),
InnerProj::GrpcRescue {
inner, trailers, ..
} => match trailers.take() {
Some(t) => Poll::Ready(Ok(Some(t))),
None => inner.poll_trailers(cx),
},
}
}

#[inline]
fn is_end_stream(&self) -> bool {
let Self(inner) = self;
match inner {
Inner::Passthru(inner) => inner.is_end_stream(),
Inner::GrpcRescue {
inner, trailers, ..
} => trailers.is_none() && inner.is_end_stream(),
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
let Self(inner) = self;
match inner {
Inner::Passthru(inner) => inner.size_hint(),
Inner::GrpcRescue { inner, .. } => inner.size_hint(),
}
}
}

impl<R, B> ResponseBody<R, B> {
fn grpc_trailers(code: tonic::Code, message: &str, emit_headers: bool) -> http::HeaderMap {
debug!(grpc.status = ?code, "Synthesizing gRPC trailers");
let mut t = http::HeaderMap::new();
t.insert(GRPC_STATUS, super::code_header(code));
if emit_headers {
t.insert(
GRPC_MESSAGE,
HeaderValue::from_str(message).unwrap_or_else(|error| {
warn!(%error, "Failed to encode error header");
HeaderValue::from_static("Unexpected error")
}),
);
}
t
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::errors::header::{GRPC_MESSAGE, GRPC_STATUS};
use http::HeaderMap;
use linkerd_mock_http_body::MockBody;

struct MockRescue;
impl<E> HttpRescue<E> for MockRescue {
/// Attempts to synthesize a response from the given error.
fn rescue(&self, _: E) -> Result<SyntheticHttpResponse, E> {
let synthetic = SyntheticHttpResponse::internal_error("MockRescue::rescue");
Ok(synthetic)
}
}

#[tokio::test]
async fn rescue_body_recovers_from_error_without_grpc_message() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let trailers = {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("caboose");
trls.insert("trailer", value);
trls
};
let rescue = {
let inner = MockBody::default()
.then_yield_data(Poll::Ready(Some(Ok("inter".into()))))
.then_yield_data(Poll::Ready(Some(Err("an error midstream".into()))))
.then_yield_data(Poll::Ready(Some(Ok("rupted".into()))))
.then_yield_trailer(Poll::Ready(Ok(Some(trailers))));
let rescue = MockRescue;
let emit_headers = false;
ResponseBody::grpc_rescue(inner, rescue, emit_headers)
};
let (data, Some(trailers)) = body_to_string(rescue).await else {
panic!("trailers should exist");
};
assert_eq!(data, "inter");
assert_eq!(
trailers[GRPC_STATUS],
i32::from(tonic::Code::Internal).to_string()
);
assert_eq!(trailers.get(GRPC_MESSAGE), None);
}

#[tokio::test]
async fn rescue_body_recovers_from_error_emitting_message() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let trailers = {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("caboose");
trls.insert("trailer", value);
trls
};
let rescue = {
let inner = MockBody::default()
.then_yield_data(Poll::Ready(Some(Ok("inter".into()))))
.then_yield_data(Poll::Ready(Some(Err("an error midstream".into()))))
.then_yield_data(Poll::Ready(Some(Ok("rupted".into()))))
.then_yield_trailer(Poll::Ready(Ok(Some(trailers))));
let rescue = MockRescue;
let emit_headers = true;
ResponseBody::grpc_rescue(inner, rescue, emit_headers)
};
let (data, Some(trailers)) = body_to_string(rescue).await else {
panic!("trailers should exist");
};
assert_eq!(data, "inter");
assert_eq!(
trailers[GRPC_STATUS],
i32::from(tonic::Code::Internal).to_string()
);
assert_eq!(trailers[GRPC_MESSAGE], "MockRescue::rescue");
}

#[tokio::test]
async fn rescue_body_works_for_empty() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let rescue = {
let inner = MockBody::default();
let rescue = MockRescue;
let emit_headers = false;
ResponseBody::grpc_rescue(inner, rescue, emit_headers)
};
let (data, trailers) = body_to_string(rescue).await;
assert_eq!(data, "");
assert_eq!(trailers, None);
}

#[tokio::test]
async fn rescue_body_works_for_body_with_data() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let rescue = {
let inner = MockBody::default().then_yield_data(Poll::Ready(Some(Ok("unary".into()))));
let rescue = MockRescue;
let emit_headers = false;
ResponseBody::grpc_rescue(inner, rescue, emit_headers)
};
let (data, trailers) = body_to_string(rescue).await;
assert_eq!(data, "unary");
assert_eq!(trailers, None);
}

#[tokio::test]
async fn rescue_body_works_for_body_with_trailers() {
let (_guard, _handle) = linkerd_tracing::test::trace_init();
let trailers = {
let mut trls = HeaderMap::with_capacity(1);
let value = HeaderValue::from_static("caboose");
trls.insert("trailer", value);
trls
};
let rescue = {
let inner = MockBody::default().then_yield_trailer(Poll::Ready(Ok(Some(trailers))));
let rescue = MockRescue;
let emit_headers = false;
ResponseBody::grpc_rescue(inner, rescue, emit_headers)
};
let (data, trailers) = body_to_string(rescue).await;
assert_eq!(data, "");
assert_eq!(trailers.expect("has trailers")["trailer"], "caboose");
}

async fn body_to_string<B>(body: B) -> (String, Option<HeaderMap>)
where
B: http_body::Body + Unpin,
B::Error: std::fmt::Debug,
{
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let mut data = String::new();
let mut trailers = None;

// Continue reading frames from the body until it is finished.
while let Some(frame) = body
.frame()
.await
.transpose()
.expect("reading a frame succeeds")
{
match frame.into_data().map(|mut buf| {
use bytes::Buf;
let bytes = buf.copy_to_bytes(buf.remaining());
String::from_utf8(bytes.to_vec()).unwrap()
}) {
Ok(ref s) => data.push_str(s),
Err(frame) => {
let trls = frame
.into_trailers()
.map_err(drop)
.expect("test frame is either data or trailers");
trailers = Some(trls);
}
}
}

tracing::info!(?data, ?trailers, "finished reading body");
(data, trailers)
}
}
Loading

0 comments on commit b883089

Please sign in to comment.