Skip to content

Commit

Permalink
Fix regression in waypoints to use original IP (istio#1253)
Browse files Browse the repository at this point in the history
Regression from istio#1224

Before that PR (and now the behavior returns), we would use the IP
family of the IP stuck in the workload.waypoint or service.waypoint
field. This was always IPv4 due to istiod's implementation.

With the PR, we started to use the original IP family of the request.
Since waypoints had IPv6 addresses, we would use them. Due to another
bug in Istiod (fixed in istio/istio#52555), the
waypoints didn't actually listen on IPv6, though, so would drop the
requests.

With this PR, we will continue to send to the IP actually specified. In
the near future we will move to hostname references, where we can
correctly pick the best IP family based on the request.
  • Loading branch information
howardjohn authored and antonioberben committed Oct 1, 2024
1 parent d43dc9b commit 107279e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
49 changes: 48 additions & 1 deletion src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,19 @@ mod tests {
service_account: "waypoint-sa".to_string(),
..Default::default()
};
let mut workloads = vec![source, waypoint];
let waypoint_dual = XdsWorkload {
uid: "cluster1//v1/Pod/ns/waypoint-workload-dual".to_string(),
name: "waypoint-workload-dual".to_string(),
namespace: "ns".to_string(),
addresses: vec![
Bytes::copy_from_slice(&[127, 0, 0, 11]),
Bytes::copy_from_slice("ff06::c5".parse::<Ipv6Addr>().unwrap().octets().as_slice()),
],
node: "local-node".to_string(),
service_account: "waypoint-sa".to_string(),
..Default::default()
};
let mut workloads = vec![source, waypoint, waypoint_dual];
let mut services = vec![];
for x in xds {
match x {
Expand Down Expand Up @@ -745,6 +757,7 @@ mod tests {
)
.await;
}

#[tokio::test]
async fn build_request_destination_waypoint() {
run_build_request(
Expand Down Expand Up @@ -774,6 +787,40 @@ mod tests {
.await;
}

#[tokio::test]
async fn build_request_destination_waypoint_mismatch_ip() {
run_build_request(
"127.0.0.1",
"[ff06::c3]:80",
XdsAddressType::Workload(XdsWorkload {
uid: "cluster1//v1/Pod/default/my-pod".to_string(),
addresses: vec![
Bytes::copy_from_slice(&[127, 0, 0, 2]),
Bytes::copy_from_slice(
"ff06::c3".parse::<Ipv6Addr>().unwrap().octets().as_slice(),
),
],
waypoint: Some(xds::istio::workload::GatewayAddress {
destination: Some(xds::istio::workload::gateway_address::Destination::Address(
XdsNetworkAddress {
network: "".to_string(),
address: [127, 0, 0, 11].to_vec(),
},
)),
hbone_mtls_port: 15008,
}),
..Default::default()
}),
// Should use the waypoint
Some(ExpectedRequest {
protocol: Protocol::HBONE,
hbone_destination: "[ff06::c3]:80",
destination: "127.0.0.11:15008",
}),
)
.await;
}

#[tokio::test]
async fn build_request_destination_svc_waypoint() {
run_build_request(
Expand Down
37 changes: 25 additions & 12 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ impl DemandProxyState {
.find_upstream(network, source_workload, addr, resolution_mode)
// Drop the lock
};
tracing::trace!(%addr, ?upstream, "fetch_upstream");
self.finalize_upstream(source_workload, addr, upstream)
.await
}
Expand All @@ -734,13 +735,15 @@ impl DemandProxyState {
ip_family_restriction,
)
.await?; // if we can't load balance just return the error
Ok(Some(Upstream {
let res = Upstream {
workload: wl,
selected_workload_ip,
port,
service_sans: svc.map(|s| s.subject_alt_names.clone()).unwrap_or_default(),
destination_service: svc_desc,
}))
};
tracing::trace!(?res, "finalize_upstream");
Ok(Some(res))
}

async fn fetch_waypoint(
Expand All @@ -751,25 +754,35 @@ impl DemandProxyState {
) -> Result<Upstream, Error> {
// Waypoint can be referred to by an IP or Hostname.
// Hostname is preferred as it is a more stable identifier.
let res = match &gw_address.destination {
let (res, target_address) = match &gw_address.destination {
Destination::Address(ip) => {
let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port);
self.state.read().unwrap().find_upstream(
let us = self.state.read().unwrap().find_upstream(
ip.network.clone(),
source_workload,
addr,
ServiceResolutionMode::Waypoint,
)
);
// If they referenced a waypoint by IP, use that IP as the destination.
// Note this means that an IPv6 call may be translated to IPv4 if the waypoint is specified
// as an IPv4 address.
// For this reason, the Hostname method is preferred which can adapt to the callers IP family.
(us, addr)
}
Destination::Hostname(host) => {
let state = self.read();
match state.find_hostname(host) {
Some(Address::Service(s)) => state.find_upstream_from_service(
source_workload,
gw_address.hbone_mtls_port,
ServiceResolutionMode::Waypoint,
s,
),
Some(Address::Service(s)) => {
let us = state.find_upstream_from_service(
source_workload,
gw_address.hbone_mtls_port,
ServiceResolutionMode::Waypoint,
s,
);
// For hostname, use the original_destination_address as the target so we can
// adapt to the callers IP family.
(us, original_destination_address)
}
Some(_) => {
return Err(Error::UnsupportedFeature(
"waypoint must be a service, not a workload".to_string(),
Expand All @@ -784,7 +797,7 @@ impl DemandProxyState {
}
}
};
self.finalize_upstream(source_workload, original_destination_address, res)
self.finalize_upstream(source_workload, target_address, res)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address)))
}
Expand Down

0 comments on commit 107279e

Please sign in to comment.