From ea1be7041b724e2483bfc46b1acc597fec86ae4a Mon Sep 17 00:00:00 2001 From: John Howard Date: Mon, 29 Jul 2024 10:36:47 -0700 Subject: [PATCH 1/3] Improve XDS error diagnostics Based on user feedback. Before/after dns outage ``` 2024-07-29T17:10:59.111431Z warn xds::client:xds{id=14} XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution, retrying in 15s 2024-07-29T17:22:14.958433Z warn xds::client:xds{id=3} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Temporary failure in name resolution (hint: is the DNS server reachable?), retrying in 80ms ``` wrong dns name ``` 2024-07-29T17:22:47.910253Z warn xds::client:xds{id=10} XDS client connection error: gRPC connection error:status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s 2024-07-29T17:22:47.910253Z warn xds::client:xds{id=10} XDS client connection error: gRPC connection error connecting to https://istiodx.istio-system.svc:15012: status: Unknown, message: "client error (Connect)", source: dns error: failed to lookup address information: Name or service not known, retrying in 10.24s ``` Bad auth (ztunnel) ``` 2024-07-29T17:25:29.137815Z warn xds::client:xds{id=11} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure", retrying in 15s 2024-07-29T17:35:00.273104Z warn xds::client:xds{id=9} XDS client connection error: gRPC connection error connecting to https://istiod.istio-system.svc:15012: status: Unauthenticated, message: "authentication failure" (hint: check the control plane logs for more information), retrying in 5.12s ``` --- src/xds.rs | 15 +++++++++++++-- src/xds/client.rs | 7 +++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/xds.rs b/src/xds.rs index afefc3209..3e2ef8bcb 100644 --- a/src/xds.rs +++ b/src/xds.rs @@ -55,6 +55,13 @@ impl<'a> fmt::Display for DisplayStatus<'a> { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let s = &self.0; write!(f, "status: {:?}, message: {:?}", s.code(), s.message())?; + + if s.message().to_string().contains("authentication failure") { + write!( + f, + " (hint: check the control plane logs for more information)" + )?; + } if !s.details().is_empty() { if let Ok(st) = std::str::from_utf8(s.details()) { write!(f, ", details: {st}")?; @@ -62,6 +69,10 @@ impl<'a> fmt::Display for DisplayStatus<'a> { } if let Some(src) = s.source().and_then(|s| s.source()) { write!(f, ", source: {src}")?; + // Error is not public to explicitly match on, so do a fuzzy match + if format!("{src}").contains("Temporary failure in name resolution") { + write!(f, " (hint: is the DNS server reachable?)")?; + } } Ok(()) } @@ -71,8 +82,8 @@ impl<'a> fmt::Display for DisplayStatus<'a> { pub enum Error { #[error("gRPC error {}", DisplayStatus(.0))] GrpcStatus(#[from] tonic::Status), - #[error("gRPC connection error:{}", DisplayStatus(.0))] - Connection(#[source] tonic::Status), + #[error("gRPC connection error connecting to {0}: {}", DisplayStatus(.1))] + Connection(String, #[source] tonic::Status), /// Attempted to send on a MPSC channel which has been canceled #[error(transparent)] RequestFailure(#[from] Box>), diff --git a/src/xds/client.rs b/src/xds/client.rs index 26d521aa7..20d7f4ab9 100644 --- a/src/xds/client.rs +++ b/src/xds/client.rs @@ -509,7 +509,7 @@ impl AdsClient { async fn run_loop(&mut self, backoff: Duration) -> Duration { match self.run_internal().await { - Err(e @ Error::Connection(_)) => { + Err(e @ Error::Connection(_, _)) => { // For connection errors, we add backoff let backoff = std::cmp::min(MAX_BACKOFF, backoff * 2); warn!( @@ -616,6 +616,7 @@ impl AdsClient { warn!("outbound stream complete"); }; + let addr = self.config.address.clone(); let tls_grpc_channel = tls::grpc_connector( self.config.address.clone(), self.config.tls_builder.fetch_cert().await?, @@ -629,7 +630,9 @@ impl AdsClient { .delta_aggregated_resources(tonic::Request::new(outbound)) .await; - let mut response_stream = ads_connection.map_err(Error::Connection)?.into_inner(); + let mut response_stream = ads_connection + .map_err(|src| Error::Connection(addr, src))? + .into_inner(); debug!("connected established"); info!("Stream established"); From f851be18269632565324ae81ba17574969a4c0be Mon Sep 17 00:00:00 2001 From: John Howard Date: Tue, 23 Jul 2024 16:44:10 -0700 Subject: [PATCH 2/3] Support hostname waypoints --- src/proxy/outbound.rs | 4 +- src/state.rs | 186 ++++++++++++++++++++++++++---------------- 2 files changed, 118 insertions(+), 72 deletions(-) diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index b13dcee43..2e2239611 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -312,7 +312,7 @@ impl OutboundConnection { { // if we have a waypoint for this svc, use it; otherwise route traffic normally if let Some(waypoint) = state - .fetch_service_waypoint(&target_service, &source_workload) + .fetch_service_waypoint(&target_service, &source_workload, target) .await? { let upstream_sans = waypoint.workload_and_services_san(); @@ -373,7 +373,7 @@ impl OutboundConnection { if !from_waypoint && !svc_addressed { // For case upstream server has enabled waypoint let waypoint = state - .fetch_workload_waypoint(&us.workload, &source_workload) + .fetch_workload_waypoint(&us.workload, &source_workload, target) .await?; if let Some(waypoint) = waypoint { let actual_destination = waypoint.workload_socket_addr(); diff --git a/src/state.rs b/src/state.rs index 6faa6f902..b34eb79a8 100644 --- a/src/state.rs +++ b/src/state.rs @@ -254,35 +254,12 @@ impl ProxyState { .services .get_by_vip(&network_addr(network.clone(), addr.ip())) { - // Randomly pick an upstream - // TODO: do this more efficiently, and not just randomly - let Some((ep, wl)) = self.load_balance(source_workload, &svc, addr, resolution_mode) - else { - debug!("VIP {} has no healthy endpoints", addr); - return None; - }; - - let svc_port = svc.ports.get(&addr.port()).copied().unwrap_or_default(); - let target_port = if let Some(&port) = ep.port.get(&addr.port()) { - // prefer endpoint port mapping - port - } else if svc_port > 0 { - // otherwise, see if the service has this port - svc_port - } else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel { - // when using app tunnel, we don't require the port to be found on the service - addr.port() - } else { - // no app tunnel or port mapping, error - debug!( - "found VIP {}, but port {} was unknown", - addr.ip(), - addr.port() - ); - return None; - }; - - return Some((wl, target_port, Some(svc))); + return self.find_upstream_from_service( + source_workload, + addr.port(), + resolution_mode, + svc, + ); } if let Some(wl) = self .workloads @@ -293,22 +270,55 @@ impl ProxyState { None } + fn find_upstream_from_service( + &self, + source_workload: &Workload, + svc_port: u16, + resolution_mode: ServiceResolutionMode, + svc: Arc, + ) -> Option<(Arc, u16, Option>)> { + // Randomly pick an upstream + // TODO: do this more efficiently, and not just randomly + let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode) + else { + debug!("Service {} has no healthy endpoints", svc.hostname); + return None; + }; + + let svc_target_port = svc.ports.get(&svc_port).copied().unwrap_or_default(); + let target_port = if let Some(&ep_target_port) = ep.port.get(&svc_port) { + // prefer endpoint port mapping + ep_target_port + } else if svc_target_port > 0 { + // otherwise, see if the service has this port + svc_target_port + } else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel { + // when using app tunnel, we don't require the port to be found on the service + svc_port + } else { + // no app tunnel or port mapping, error + debug!( + "found service {}, but port {} was unknown", + svc.hostname, svc_port + ); + return None; + }; + + Some((wl, target_port, Some(svc))) + } + fn load_balance<'a>( &self, src: &Workload, svc: &'a Service, - svc_addr: SocketAddr, + svc_port: u16, resolution_mode: ServiceResolutionMode, ) -> Option<(&'a Endpoint, Arc)> { - let target_port = svc.ports.get(&svc_addr.port()).copied(); + let target_port = svc.ports.get(&svc_port).copied(); if resolution_mode == ServiceResolutionMode::Standard && target_port.is_none() { // Port doesn't exist on the service at all, this is invalid - debug!( - "service {} does not have port {}", - svc.hostname, - svc_addr.port() - ); + debug!("service {} does not have port {}", svc.hostname, svc_port); return None; }; @@ -319,14 +329,12 @@ impl ProxyState { }; match resolution_mode { ServiceResolutionMode::Standard => { - if target_port.unwrap_or_default() == 0 - && !ep.port.contains_key(&svc_addr.port()) - { + if target_port.unwrap_or_default() == 0 && !ep.port.contains_key(&svc_port) { // Filter workload out, it doesn't have a matching port trace!( "filter endpoint {}, it does not have service port {}", ep.workload_uid, - svc_addr.port() + svc_port ); return None; } @@ -698,18 +706,37 @@ impl DemandProxyState { ) -> Result, Error> { self.fetch_address(&network_addr(network.clone(), addr.ip())) .await; - let Some((wl, port, svc)) = self.state.read().unwrap().find_upstream( - network, - source_workload, - addr, - resolution_mode, - ) else { + let upstream = { + self.state.read().unwrap().find_upstream( + network, + source_workload, + addr, + resolution_mode, + ) + // Drop the lock + }; + self.finalize_upstream(source_workload, addr, upstream) + .await + } + + async fn finalize_upstream( + &self, + source_workload: &Workload, + original_target_address: SocketAddr, + upstream: Option<(Arc, u16, Option>)>, + ) -> Result, Error> { + let Some((wl, port, svc)) = upstream else { return Ok(None); }; let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref())); let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families); let selected_workload_ip = self - .pick_workload_destination_or_resolve(&wl, source_workload, addr, ip_family_restriction) + .pick_workload_destination_or_resolve( + &wl, + source_workload, + original_target_address, + ip_family_restriction, + ) .await?; // if we can't load balance just return the error Ok(Some(Upstream { workload: wl, @@ -724,36 +751,59 @@ impl DemandProxyState { &self, gw_address: &GatewayAddress, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result { - let wp_nw_addr = match &gw_address.destination { - Destination::Address(ip) => ip, - Destination::Hostname(_) => { - return Err(Error::UnsupportedFeature( - "hostname lookup not supported yet".to_string(), - )); + // Waypoint can be referred to by an IP or Hostname. + // Hostname is preferred as it is a more stable identifier. + let res = match &gw_address.destination { + Destination::Address(ip) => { + let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port); + self.state.read().unwrap().find_upstream( + ip.network.clone(), + source_workload, + addr, + ServiceResolutionMode::Waypoint, + ) + } + Destination::Hostname(host) => { + let state = self.state.read().unwrap(); + match state.find_hostname(host) { + Some(Address::Service(s)) => state.find_upstream_from_service( + source_workload, + gw_address.hbone_mtls_port, + ServiceResolutionMode::Waypoint, + s, + ), + Some(_) => { + return Err(Error::UnsupportedFeature( + "waypoint must be a service, not a workload".to_string(), + )) + } + None => { + return Err(Error::UnknownWaypoint(format!( + "waypoint {} not found", + host.hostname + ))) + } + } } }; - let wp_socket_addr = SocketAddr::new(wp_nw_addr.address, gw_address.hbone_mtls_port); - self.fetch_upstream( - wp_nw_addr.network.clone(), - source_workload, - wp_socket_addr, - ServiceResolutionMode::Waypoint, - ) - .await? - .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {} not found", wp_nw_addr.address))) + self.finalize_upstream(source_workload, original_destination_address, res) + .await? + .ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address))) } pub async fn fetch_service_waypoint( &self, service: &Service, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result, Error> { let Some(gw_address) = &service.waypoint else { // no waypoint return Ok(None); }; - self.fetch_waypoint(gw_address, source_workload) + self.fetch_waypoint(gw_address, source_workload, original_destination_address) .await .map(Some) } @@ -762,12 +812,13 @@ impl DemandProxyState { &self, wl: &Workload, source_workload: &Workload, + original_destination_address: SocketAddr, ) -> Result, Error> { let Some(gw_address) = &wl.waypoint else { // no waypoint return Ok(None); }; - self.fetch_waypoint(gw_address, source_workload) + self.fetch_waypoint(gw_address, source_workload, original_destination_address) .await .map(Some) } @@ -1424,12 +1475,7 @@ mod tests { let assert_endpoint = |src: &Workload, svc: &Service, ips: Vec<&str>, desc: &str| { let got = state - .load_balance( - src, - svc, - "0.0.0.0:80".parse().unwrap(), - ServiceResolutionMode::Standard, - ) + .load_balance(src, svc, 80, ServiceResolutionMode::Standard) .and_then(|(ep, _)| ep.address.clone()) .map(|addr| addr.address.to_string()); if ips.is_empty() { From 42c73bbbfea751d5942b491d8b9b93d37e894b21 Mon Sep 17 00:00:00 2001 From: John Howard Date: Thu, 1 Aug 2024 13:36:24 -0700 Subject: [PATCH 3/3] add test and drop unwrap --- src/state.rs | 12 +++---- src/test_helpers/linux.rs | 24 +++++++++++++ tests/namespaced.rs | 76 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 8 deletions(-) diff --git a/src/state.rs b/src/state.rs index b34eb79a8..c01d50254 100644 --- a/src/state.rs +++ b/src/state.rs @@ -694,7 +694,7 @@ impl DemandProxyState { return None; } self.fetch_on_demand(uid.clone()).await; - self.state.read().unwrap().workloads.find_uid(uid) + self.read().workloads.find_uid(uid) } pub async fn fetch_upstream( @@ -707,12 +707,8 @@ impl DemandProxyState { self.fetch_address(&network_addr(network.clone(), addr.ip())) .await; let upstream = { - self.state.read().unwrap().find_upstream( - network, - source_workload, - addr, - resolution_mode, - ) + self.read() + .find_upstream(network, source_workload, addr, resolution_mode) // Drop the lock }; self.finalize_upstream(source_workload, addr, upstream) @@ -766,7 +762,7 @@ impl DemandProxyState { ) } Destination::Hostname(host) => { - let state = self.state.read().unwrap(); + let state = self.read(); match state.find_hostname(host) { Some(Address::Service(s)) => state.find_upstream_from_service( source_workload, diff --git a/src/test_helpers/linux.rs b/src/test_helpers/linux.rs index 29a801d74..34853f34f 100644 --- a/src/test_helpers/linux.rs +++ b/src/test_helpers/linux.rs @@ -344,6 +344,18 @@ impl<'a> TestServiceBuilder<'a> { self } + /// Set the service waypoint by hostname + pub fn waypoint_hostname(mut self, hostname: &str) -> Self { + self.s.waypoint = Some(GatewayAddress { + destination: gatewayaddress::Destination::Hostname(NamespacedHostname { + namespace: strng::literal!("default"), + hostname: hostname.into(), + }), + hbone_mtls_port: 15008, + }); + self + } + /// Finish building the service. pub async fn register(self) -> anyhow::Result<()> { self.manager @@ -411,6 +423,18 @@ impl<'a> TestWorkloadBuilder<'a> { self } + /// Set the service waypoint by hostname + pub fn waypoint_hostname(mut self, hostname: &str) -> Self { + self.w.workload.waypoint = Some(GatewayAddress { + destination: gatewayaddress::Destination::Hostname(NamespacedHostname { + namespace: strng::literal!("default"), + hostname: hostname.into(), + }), + hbone_mtls_port: 15008, + }); + self + } + /// Set a waypoint to the workload pub fn mutate_workload(mut self, f: impl FnOnce(&mut Workload)) -> Self { f(&mut self.w.workload); diff --git a/tests/namespaced.rs b/tests/namespaced.rs index 32571e29b..5ce1d81e3 100644 --- a/tests/namespaced.rs +++ b/tests/namespaced.rs @@ -249,6 +249,82 @@ mod namespaced { Ok(()) } + #[tokio::test] + async fn service_waypoint_hostname() -> anyhow::Result<()> { + let mut manager = setup_netns_test!(Shared); + + let zt = manager.deploy_ztunnel(DEFAULT_NODE).await?; + + manager + .service_builder("waypoint") + .addresses(vec![NetworkAddress { + network: strng::EMPTY, + address: TEST_VIP.parse::()?, + }]) + .ports(HashMap::from([(15008u16, 15008u16)])) + .register() + .await?; + let waypoint = manager + .workload_builder("waypoint", DEFAULT_NODE) + .uncaptured() + .service( + "default/waypoint.default.svc.cluster.local", + 80, + SERVER_PORT, + ) + .register() + .await?; + run_hbone_server(waypoint, "waypoint")?; + + manager + .workload_builder("server", DEFAULT_NODE) + .waypoint_hostname("waypoint.default.svc.cluster.local") + .register() + .await?; + let client = manager + .workload_builder("client", DEFAULT_NODE) + .register() + .await?; + + let server_ip = manager.resolver().resolve("server")?; + let waypoint_pod_ip = manager.resolver().resolve("waypoint")?; + run_tcp_to_hbone_client(client, manager.resolver(), "server")?; + + let metrics = [ + (CONNECTIONS_OPENED, 1), + (CONNECTIONS_CLOSED, 1), + (BYTES_RECV, REQ_SIZE), + (BYTES_SENT, HBONE_REQ_SIZE), + ]; + verify_metrics(&zt, &metrics, &source_labels()).await; + + let sent = format!("{REQ_SIZE}"); + let recv = format!("{HBONE_REQ_SIZE}"); + let hbone_addr = format!("{server_ip}:8080"); + let dst_addr = format!("{waypoint_pod_ip}:15008"); + let want = HashMap::from([ + ("scope", "access"), + ("src.workload", "client"), + ("dst.workload", "waypoint"), + ("dst.hbone_addr", &hbone_addr), + ("dst.addr", &dst_addr), + ("bytes_sent", &sent), + ("bytes_recv", &recv), + ("direction", "outbound"), + ("message", "connection complete"), + ( + "src.identity", + "spiffe://cluster.local/ns/default/sa/client", + ), + ( + "dst.identity", + "spiffe://cluster.local/ns/default/sa/waypoint", + ), + ]); + telemetry::testing::assert_contains(want); + Ok(()) + } + #[tokio::test] async fn sandwich_waypoint_plain() -> anyhow::Result<()> { let mut manager = setup_netns_test!(Shared);