From dc0c33554fcaec96f8293db5835a7e987be95b50 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 6 Mar 2025 11:42:17 -0500 Subject: [PATCH 1/4] fix(host_metrics source): avoid panic when reading from buffer --- Cargo.lock | 4 +- src/sources/host_metrics/tcp.rs | 78 +++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71a3c073c552a..086bc0433f1c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4564,7 +4564,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.8", "tokio", "tower-service", "tracing 0.1.41", @@ -7580,7 +7580,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.13.0", "proc-macro2 1.0.93", "quote 1.0.38", "syn 2.0.98", diff --git a/src/sources/host_metrics/tcp.rs b/src/sources/host_metrics/tcp.rs index c45eb164c11ba..103712007a711 100644 --- a/src/sources/host_metrics/tcp.rs +++ b/src/sources/host_metrics/tcp.rs @@ -134,6 +134,52 @@ struct TcpStats { tx_queued_bytes: f64, } +// Helper function to parse Netlink messages from a buffer +fn parse_netlink_messages( + buffer: &[u8], + headers: &mut Vec, +) -> Result { + let mut offset = 0; + let mut done = false; + + while offset < buffer.len() { + let remaining_bytes = &buffer[offset..]; + if remaining_bytes.len() < 4 { + return Err(TcpError::TruncatedMessage); + } + + let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize; + if length < 4 || length > remaining_bytes.len() { + return Err(TcpError::InvalidLength { length }); + } + + let msg_bytes = &remaining_bytes[..length]; + let rx_packet = + >::deserialize(msg_bytes).context(NetlinkParseSnafu)?; + + match rx_packet.payload { + NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => { + headers.push(response.header); + } + NetlinkPayload::Done(_) => { + done = true; + break; // Exit the loop, signal end of dump + } + NetlinkPayload::Error(error) => { + if let Some(code) = error.code { + return Err(TcpError::NetlinkMsgError { code: code.get() }); + } + } + _ => {} + } + + offset += length; + } + + Ok(done) +} + +// Main function to fetch InetResponseHeaders async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result, TcpError> { let unicast_socket: SocketAddr = SocketAddr::new(0, 0); let mut socket = TokioSocket::new(NETLINK_SOCK_DIAG).context(NetlinkSocketSnafu)?; @@ -163,34 +209,12 @@ async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result, .context(NetlinkSendSnafu)?; let mut receive_buffer = vec![0; 4096]; - let mut inet_resp_hdrs: Vec = Vec::new(); - 'outer: while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await { - let mut offset = 0; - 'inner: loop { - let bytes = &receive_buffer[offset..]; - let length = NativeEndian::read_u32(&bytes[0..4]) as usize; - if length == 0 { - break 'inner; - } - let rx_packet = - >::deserialize(bytes).context(NetlinkParseSnafu)?; - - match rx_packet.payload { - NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => { - inet_resp_hdrs.push(response.header); - } - NetlinkPayload::Done(_) => { - break 'outer; - } - NetlinkPayload::Error(error) => { - if let Some(code) = error.code { - return Err(TcpError::NetlinkMsgError { code: code.get() }); - } - } - _ => {} - } + let mut inet_resp_hdrs = Vec::with_capacity(32); // Pre-allocate with an estimate - offset += rx_packet.header.length as usize; + while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await { + let done = parse_netlink_messages(&receive_buffer, &mut inet_resp_hdrs)?; + if done { + break; // Exit if we received a Done message } } From 6c84d8b4443e3a05eed90d5b9bd65a50679a837e Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 6 Mar 2025 14:48:52 -0500 Subject: [PATCH 2/4] tweaks --- src/sources/host_metrics/tcp.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/sources/host_metrics/tcp.rs b/src/sources/host_metrics/tcp.rs index 103712007a711..44b604a1cb5b6 100644 --- a/src/sources/host_metrics/tcp.rs +++ b/src/sources/host_metrics/tcp.rs @@ -71,6 +71,8 @@ enum TcpError { InvalidTcpState { state: u8 }, #[snafu(display("Received an error message from netlink; code: {code}"))] NetlinkMsgError { code: i32 }, + #[snafu(display("Invalid message length: {length}"))] + InvalidLength { length: usize }, } #[repr(u8)] @@ -145,11 +147,15 @@ fn parse_netlink_messages( while offset < buffer.len() { let remaining_bytes = &buffer[offset..]; if remaining_bytes.len() < 4 { + // Still treat this as an error since we can't even read the length return Err(TcpError::TruncatedMessage); } let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize; - if length < 4 || length > remaining_bytes.len() { + if length == 0 { + break; // Restore original behavior: treat as end of buffer + } + if length > remaining_bytes.len() { return Err(TcpError::InvalidLength { length }); } From 77268835fef0a0dfb024b7e135db63978954b410 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 7 Mar 2025 15:06:44 +0000 Subject: [PATCH 3/4] add docs because these functions are complex --- src/sources/host_metrics/tcp.rs | 49 +++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/src/sources/host_metrics/tcp.rs b/src/sources/host_metrics/tcp.rs index 44b604a1cb5b6..120da4f645fce 100644 --- a/src/sources/host_metrics/tcp.rs +++ b/src/sources/host_metrics/tcp.rs @@ -136,7 +136,19 @@ struct TcpStats { tx_queued_bytes: f64, } -// Helper function to parse Netlink messages from a buffer +/// Parses Netlink messages from a buffer, extracting [`InetResponseHeader`]s. +/// +/// # Arguments +/// * `buffer` - Raw byte slice containing Netlink messages. +/// * `headers` - Mutable vector to store parsed [`InetResponseHeader`]s. +/// +/// # Returns +/// * `Ok(true)` if parsing is complete (Done message received). +/// * `Ok(false)` if more data is expected. In this case, this function can be called again. +/// * `Err(TcpError)` on invalid length, deserialization failure, or Netlink error. +/// +/// # Errors +/// Returns [`TcpError`] variants for invalid message lengths or Netlink errors. fn parse_netlink_messages( buffer: &[u8], headers: &mut Vec, @@ -148,12 +160,12 @@ fn parse_netlink_messages( let remaining_bytes = &buffer[offset..]; if remaining_bytes.len() < 4 { // Still treat this as an error since we can't even read the length - return Err(TcpError::TruncatedMessage); + return Err(TcpError::InvalidLength { length: remaining_bytes.len() }); } - + // This function panics if the buffer length is less than 4. let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize; if length == 0 { - break; // Restore original behavior: treat as end of buffer + break; } if length > remaining_bytes.len() { return Err(TcpError::InvalidLength { length }); @@ -169,7 +181,7 @@ fn parse_netlink_messages( } NetlinkPayload::Done(_) => { done = true; - break; // Exit the loop, signal end of dump + break; } NetlinkPayload::Error(error) => { if let Some(code) = error.code { @@ -185,8 +197,21 @@ fn parse_netlink_messages( Ok(done) } -// Main function to fetch InetResponseHeaders -async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result, TcpError> { +/// Fetches [`InetResponseHeader`]s for TCP sockets using Netlink. +/// +/// # Arguments +/// * `addr_family` - Address family (`AF_INET` for IPv4, `AF_INET6` for IPv6). +/// +/// # Returns +/// * `Ok(Vec)` containing headers for active TCP sockets. +/// * `Err(TcpError)` on socket creation, send, receive, or parsing errors. +/// +/// # Errors +/// Returns [`TcpError`] for socket-related or message parsing failures. +/// +/// # Notes +/// Asynchronously queries the kernel via a Netlink socket for TCP socket info. +async fn fetch_netlink_inet_headers(addr_family: u8) -> Result, TcpError> { let unicast_socket: SocketAddr = SocketAddr::new(0, 0); let mut socket = TokioSocket::new(NETLINK_SOCK_DIAG).context(NetlinkSocketSnafu)?; @@ -220,7 +245,7 @@ async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result, while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await { let done = parse_netlink_messages(&receive_buffer, &mut inet_resp_hdrs)?; if done { - break; // Exit if we received a Done message + break; } } @@ -244,11 +269,11 @@ fn parse_nl_inet_hdrs( async fn build_tcp_stats() -> Result { let mut tcp_stats = TcpStats::default(); - let resp = fetch_nl_inet_hdrs(AF_INET).await?; + let resp = fetch_netlink_inet_headers(AF_INET).await?; parse_nl_inet_hdrs(resp, &mut tcp_stats)?; if is_ipv6_enabled() { - let resp = fetch_nl_inet_hdrs(AF_INET6).await?; + let resp = fetch_netlink_inet_headers(AF_INET6).await?; parse_nl_inet_hdrs(resp, &mut tcp_stats)?; } @@ -269,7 +294,7 @@ mod tests { }; use super::{ - fetch_nl_inet_hdrs, parse_nl_inet_hdrs, TcpStats, STATE, TCP_CONNS_TOTAL, + fetch_netlink_inet_headers, parse_nl_inet_hdrs, TcpStats, STATE, TCP_CONNS_TOTAL, TCP_RX_QUEUED_BYTES_TOTAL, TCP_TX_QUEUED_BYTES_TOTAL, }; use crate::sources::host_metrics::{HostMetrics, HostMetricsConfig, MetricsBuffer}; @@ -326,7 +351,7 @@ mod tests { // initiate a connection let _stream = TcpStream::connect(addr).await.unwrap(); - let hdrs = fetch_nl_inet_hdrs(AF_INET).await.unwrap(); + let hdrs = fetch_netlink_inet_headers(AF_INET).await.unwrap(); // there should be at least two connections, one for the server and one for the client assert!(hdrs.len() >= 2); From feea13b6c75f2540d46ddaf1107231851aa55ab4 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 7 Mar 2025 15:17:54 +0000 Subject: [PATCH 4/4] changelog --- changelog.d/host_metrics_tcp_panic.fix.md | 3 +++ src/sources/host_metrics/tcp.rs | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 changelog.d/host_metrics_tcp_panic.fix.md diff --git a/changelog.d/host_metrics_tcp_panic.fix.md b/changelog.d/host_metrics_tcp_panic.fix.md new file mode 100644 index 0000000000000..279a5e4c9c530 --- /dev/null +++ b/changelog.d/host_metrics_tcp_panic.fix.md @@ -0,0 +1,3 @@ +Fix potential panic in the `host_metrics` source when collecting TCP metrics. + +authors: pront diff --git a/src/sources/host_metrics/tcp.rs b/src/sources/host_metrics/tcp.rs index 120da4f645fce..bde7e0953d7d5 100644 --- a/src/sources/host_metrics/tcp.rs +++ b/src/sources/host_metrics/tcp.rs @@ -160,7 +160,9 @@ fn parse_netlink_messages( let remaining_bytes = &buffer[offset..]; if remaining_bytes.len() < 4 { // Still treat this as an error since we can't even read the length - return Err(TcpError::InvalidLength { length: remaining_bytes.len() }); + return Err(TcpError::InvalidLength { + length: remaining_bytes.len(), + }); } // This function panics if the buffer length is less than 4. let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize;