Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support hostname waypoints #1224

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl OutboundConnection {
{
// if we have a waypoint for this svc, use it; otherwise route traffic normally
if let Some(waypoint) = state
.fetch_service_waypoint(&target_service, &source_workload)
.fetch_service_waypoint(&target_service, &source_workload, target)
.await?
{
let upstream_sans = waypoint.workload_and_services_san();
Expand Down Expand Up @@ -373,7 +373,7 @@ impl OutboundConnection {
if !from_waypoint && !svc_addressed {
// For case upstream server has enabled waypoint
let waypoint = state
.fetch_workload_waypoint(&us.workload, &source_workload)
.fetch_workload_waypoint(&us.workload, &source_workload, target)
.await?;
if let Some(waypoint) = waypoint {
let actual_destination = waypoint.workload_socket_addr();
Expand Down
184 changes: 113 additions & 71 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,35 +254,12 @@ impl ProxyState {
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, addr, resolution_mode)
else {
debug!("VIP {} has no healthy endpoints", addr);
return None;
};

let svc_port = svc.ports.get(&addr.port()).copied().unwrap_or_default();
let target_port = if let Some(&port) = ep.port.get(&addr.port()) {
// prefer endpoint port mapping
port
} else if svc_port > 0 {
// otherwise, see if the service has this port
svc_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
addr.port()
} else {
// no app tunnel or port mapping, error
debug!(
"found VIP {}, but port {} was unknown",
addr.ip(),
addr.port()
);
return None;
};

return Some((wl, target_port, Some(svc)));
return self.find_upstream_from_service(
source_workload,
addr.port(),
resolution_mode,
svc,
);
}
if let Some(wl) = self
.workloads
Expand All @@ -293,22 +270,55 @@ impl ProxyState {
None
}

fn find_upstream_from_service(
&self,
source_workload: &Workload,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
svc: Arc<Service>,
) -> Option<(Arc<Workload>, u16, Option<Arc<Service>>)> {
// Randomly pick an upstream
// TODO: do this more efficiently, and not just randomly
let Some((ep, wl)) = self.load_balance(source_workload, &svc, svc_port, resolution_mode)
else {
debug!("Service {} has no healthy endpoints", svc.hostname);
return None;
};

let svc_target_port = svc.ports.get(&svc_port).copied().unwrap_or_default();
let target_port = if let Some(&ep_target_port) = ep.port.get(&svc_port) {
// prefer endpoint port mapping
ep_target_port
} else if svc_target_port > 0 {
// otherwise, see if the service has this port
svc_target_port
} else if let Some(ApplicationTunnel { port: Some(_), .. }) = &wl.application_tunnel {
// when using app tunnel, we don't require the port to be found on the service
svc_port
} else {
// no app tunnel or port mapping, error
debug!(
"found service {}, but port {} was unknown",
svc.hostname, svc_port
);
return None;
};

Some((wl, target_port, Some(svc)))
}

fn load_balance<'a>(
&self,
src: &Workload,
svc: &'a Service,
svc_addr: SocketAddr,
svc_port: u16,
resolution_mode: ServiceResolutionMode,
) -> Option<(&'a Endpoint, Arc<Workload>)> {
let target_port = svc.ports.get(&svc_addr.port()).copied();
let target_port = svc.ports.get(&svc_port).copied();

if resolution_mode == ServiceResolutionMode::Standard && target_port.is_none() {
// Port doesn't exist on the service at all, this is invalid
debug!(
"service {} does not have port {}",
svc.hostname,
svc_addr.port()
);
debug!("service {} does not have port {}", svc.hostname, svc_port);
return None;
};

Expand All @@ -319,14 +329,12 @@ impl ProxyState {
};
match resolution_mode {
ServiceResolutionMode::Standard => {
if target_port.unwrap_or_default() == 0
&& !ep.port.contains_key(&svc_addr.port())
{
if target_port.unwrap_or_default() == 0 && !ep.port.contains_key(&svc_port) {
// Filter workload out, it doesn't have a matching port
trace!(
"filter endpoint {}, it does not have service port {}",
ep.workload_uid,
svc_addr.port()
svc_port
);
return None;
}
Expand Down Expand Up @@ -686,7 +694,7 @@ impl DemandProxyState {
return None;
}
self.fetch_on_demand(uid.clone()).await;
self.state.read().unwrap().workloads.find_uid(uid)
self.read().workloads.find_uid(uid)
}

pub async fn fetch_upstream(
Expand All @@ -698,18 +706,33 @@ impl DemandProxyState {
) -> Result<Option<Upstream>, Error> {
self.fetch_address(&network_addr(network.clone(), addr.ip()))
.await;
let Some((wl, port, svc)) = self.state.read().unwrap().find_upstream(
network,
source_workload,
addr,
resolution_mode,
) else {
let upstream = {
self.read()
.find_upstream(network, source_workload, addr, resolution_mode)
// Drop the lock
};
self.finalize_upstream(source_workload, addr, upstream)
.await
}

async fn finalize_upstream(
&self,
source_workload: &Workload,
original_target_address: SocketAddr,
upstream: Option<(Arc<Workload>, u16, Option<Arc<Service>>)>,
) -> Result<Option<Upstream>, Error> {
let Some((wl, port, svc)) = upstream else {
return Ok(None);
};
let svc_desc = svc.clone().map(|s| ServiceDescription::from(s.as_ref()));
let ip_family_restriction = svc.as_ref().and_then(|s| s.ip_families);
let selected_workload_ip = self
.pick_workload_destination_or_resolve(&wl, source_workload, addr, ip_family_restriction)
.pick_workload_destination_or_resolve(
&wl,
source_workload,
original_target_address,
ip_family_restriction,
)
.await?; // if we can't load balance just return the error
Ok(Some(Upstream {
workload: wl,
Expand All @@ -724,36 +747,59 @@ impl DemandProxyState {
&self,
gw_address: &GatewayAddress,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Upstream, Error> {
let wp_nw_addr = match &gw_address.destination {
Destination::Address(ip) => ip,
Destination::Hostname(_) => {
return Err(Error::UnsupportedFeature(
"hostname lookup not supported yet".to_string(),
));
// Waypoint can be referred to by an IP or Hostname.
// Hostname is preferred as it is a more stable identifier.
let res = match &gw_address.destination {
Destination::Address(ip) => {
let addr = SocketAddr::new(ip.address, gw_address.hbone_mtls_port);
self.state.read().unwrap().find_upstream(
howardjohn marked this conversation as resolved.
Show resolved Hide resolved
ip.network.clone(),
source_workload,
addr,
ServiceResolutionMode::Waypoint,
)
}
Destination::Hostname(host) => {
let state = self.read();
match state.find_hostname(host) {
Some(Address::Service(s)) => state.find_upstream_from_service(
source_workload,
gw_address.hbone_mtls_port,
ServiceResolutionMode::Waypoint,
s,
),
Some(_) => {
return Err(Error::UnsupportedFeature(
"waypoint must be a service, not a workload".to_string(),
))
}
None => {
return Err(Error::UnknownWaypoint(format!(
"waypoint {} not found",
host.hostname
)))
}
}
}
};
let wp_socket_addr = SocketAddr::new(wp_nw_addr.address, gw_address.hbone_mtls_port);
self.fetch_upstream(
wp_nw_addr.network.clone(),
source_workload,
wp_socket_addr,
ServiceResolutionMode::Waypoint,
)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {} not found", wp_nw_addr.address)))
self.finalize_upstream(source_workload, original_destination_address, res)
.await?
.ok_or_else(|| Error::UnknownWaypoint(format!("waypoint {:?} not found", gw_address)))
}

pub async fn fetch_service_waypoint(
&self,
service: &Service,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &service.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand All @@ -762,12 +808,13 @@ impl DemandProxyState {
&self,
wl: &Workload,
source_workload: &Workload,
original_destination_address: SocketAddr,
) -> Result<Option<Upstream>, Error> {
let Some(gw_address) = &wl.waypoint else {
// no waypoint
return Ok(None);
};
self.fetch_waypoint(gw_address, source_workload)
self.fetch_waypoint(gw_address, source_workload, original_destination_address)
.await
.map(Some)
}
Expand Down Expand Up @@ -1424,12 +1471,7 @@ mod tests {

let assert_endpoint = |src: &Workload, svc: &Service, ips: Vec<&str>, desc: &str| {
let got = state
.load_balance(
src,
svc,
"0.0.0.0:80".parse().unwrap(),
ServiceResolutionMode::Standard,
)
.load_balance(src, svc, 80, ServiceResolutionMode::Standard)
.and_then(|(ep, _)| ep.address.clone())
.map(|addr| addr.address.to_string());
if ips.is_empty() {
Expand Down
24 changes: 24 additions & 0 deletions src/test_helpers/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,18 @@ impl<'a> TestServiceBuilder<'a> {
self
}

/// Set the service waypoint by hostname
pub fn waypoint_hostname(mut self, hostname: &str) -> Self {
self.s.waypoint = Some(GatewayAddress {
destination: gatewayaddress::Destination::Hostname(NamespacedHostname {
namespace: strng::literal!("default"),
hostname: hostname.into(),
}),
hbone_mtls_port: 15008,
});
self
}

/// Finish building the service.
pub async fn register(self) -> anyhow::Result<()> {
self.manager
Expand Down Expand Up @@ -411,6 +423,18 @@ impl<'a> TestWorkloadBuilder<'a> {
self
}

/// Set the service waypoint by hostname
pub fn waypoint_hostname(mut self, hostname: &str) -> Self {
self.w.workload.waypoint = Some(GatewayAddress {
destination: gatewayaddress::Destination::Hostname(NamespacedHostname {
namespace: strng::literal!("default"),
hostname: hostname.into(),
}),
hbone_mtls_port: 15008,
});
self
}

/// Set a waypoint to the workload
pub fn mutate_workload(mut self, f: impl FnOnce(&mut Workload)) -> Self {
f(&mut self.w.workload);
Expand Down
15 changes: 13 additions & 2 deletions src/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,24 @@ impl<'a> fmt::Display for DisplayStatus<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let s = &self.0;
write!(f, "status: {:?}, message: {:?}", s.code(), s.message())?;

if s.message().to_string().contains("authentication failure") {
write!(
f,
" (hint: check the control plane logs for more information)"
)?;
}
if !s.details().is_empty() {
if let Ok(st) = std::str::from_utf8(s.details()) {
write!(f, ", details: {st}")?;
}
}
if let Some(src) = s.source().and_then(|s| s.source()) {
write!(f, ", source: {src}")?;
// Error is not public to explicitly match on, so do a fuzzy match
if format!("{src}").contains("Temporary failure in name resolution") {
write!(f, " (hint: is the DNS server reachable?)")?;
}
}
Ok(())
}
Expand All @@ -71,8 +82,8 @@ impl<'a> fmt::Display for DisplayStatus<'a> {
pub enum Error {
#[error("gRPC error {}", DisplayStatus(.0))]
GrpcStatus(#[from] tonic::Status),
#[error("gRPC connection error:{}", DisplayStatus(.0))]
Connection(#[source] tonic::Status),
#[error("gRPC connection error connecting to {0}: {}", DisplayStatus(.1))]
Connection(String, #[source] tonic::Status),
/// Attempted to send on a MPSC channel which has been canceled
#[error(transparent)]
RequestFailure(#[from] Box<mpsc::error::SendError<DeltaDiscoveryRequest>>),
Expand Down
7 changes: 5 additions & 2 deletions src/xds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ impl AdsClient {

async fn run_loop(&mut self, backoff: Duration) -> Duration {
match self.run_internal().await {
Err(e @ Error::Connection(_)) => {
Err(e @ Error::Connection(_, _)) => {
// For connection errors, we add backoff
let backoff = std::cmp::min(MAX_BACKOFF, backoff * 2);
warn!(
Expand Down Expand Up @@ -616,6 +616,7 @@ impl AdsClient {
warn!("outbound stream complete");
};

let addr = self.config.address.clone();
let tls_grpc_channel = tls::grpc_connector(
self.config.address.clone(),
self.config.tls_builder.fetch_cert().await?,
Expand All @@ -629,7 +630,9 @@ impl AdsClient {
.delta_aggregated_resources(tonic::Request::new(outbound))
.await;

let mut response_stream = ads_connection.map_err(Error::Connection)?.into_inner();
let mut response_stream = ads_connection
.map_err(|src| Error::Connection(addr, src))?
.into_inner();
debug!("connected established");

info!("Stream established");
Expand Down
Loading