Skip to content

Commit

Permalink
fix(host_metrics source): avoid panic when reading from buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
pront committed Mar 6, 2025
1 parent 6302271 commit 0820d86
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 39 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 51 additions & 27 deletions src/sources/host_metrics/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,52 @@ struct TcpStats {
tx_queued_bytes: f64,
}

// Helper function to parse Netlink messages from a buffer
fn parse_netlink_messages(
buffer: &[u8],
headers: &mut Vec<InetResponseHeader>,
) -> Result<bool, TcpError> {
let mut offset = 0;
let mut done = false;

while offset < buffer.len() {
let remaining_bytes = &buffer[offset..];
if remaining_bytes.len() < 4 {
return Err(TcpError::TruncatedMessage);

Check failure on line 148 in src/sources/host_metrics/tcp.rs

View workflow job for this annotation

GitHub Actions / Checks

no variant or associated item named `TruncatedMessage` found for enum `sources::host_metrics::tcp::TcpError` in the current scope
}

let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize;
if length < 4 || length > remaining_bytes.len() {
return Err(TcpError::InvalidLength { length });

Check failure on line 153 in src/sources/host_metrics/tcp.rs

View workflow job for this annotation

GitHub Actions / Checks

no variant named `InvalidLength` found for enum `sources::host_metrics::tcp::TcpError`
}

let msg_bytes = &remaining_bytes[..length];
let rx_packet =
<NetlinkMessage<SockDiagMessage>>::deserialize(msg_bytes).context(NetlinkParseSnafu)?;

match rx_packet.payload {
NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => {
headers.push(response.header);
}
NetlinkPayload::Done(_) => {
done = true;
break; // Exit the loop, signal end of dump
}
NetlinkPayload::Error(error) => {
if let Some(code) = error.code {
return Err(TcpError::NetlinkMsgError { code: code.get() });
}
}
_ => {}
}

offset += length;
}

Ok(done)
}

// Main function to fetch InetResponseHeaders
async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result<Vec<InetResponseHeader>, TcpError> {
let unicast_socket: SocketAddr = SocketAddr::new(0, 0);
let mut socket = TokioSocket::new(NETLINK_SOCK_DIAG).context(NetlinkSocketSnafu)?;
Expand Down Expand Up @@ -163,34 +209,12 @@ async fn fetch_nl_inet_hdrs(addr_family: u8) -> Result<Vec<InetResponseHeader>,
.context(NetlinkSendSnafu)?;

let mut receive_buffer = vec![0; 4096];
let mut inet_resp_hdrs: Vec<InetResponseHeader> = Vec::new();
'outer: while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await {
let mut offset = 0;
'inner: loop {
let bytes = &receive_buffer[offset..];
let length = NativeEndian::read_u32(&bytes[0..4]) as usize;
if length == 0 {
break 'inner;
}
let rx_packet =
<NetlinkMessage<SockDiagMessage>>::deserialize(bytes).context(NetlinkParseSnafu)?;

match rx_packet.payload {
NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => {
inet_resp_hdrs.push(response.header);
}
NetlinkPayload::Done(_) => {
break 'outer;
}
NetlinkPayload::Error(error) => {
if let Some(code) = error.code {
return Err(TcpError::NetlinkMsgError { code: code.get() });
}
}
_ => {}
}
let mut inet_resp_hdrs = Vec::with_capacity(32); // Pre-allocate with an estimate

offset += rx_packet.header.length as usize;
while let Ok(()) = socket.recv(&mut &mut receive_buffer[..]).await {
let done = parse_netlink_messages(&receive_buffer, &mut inet_resp_hdrs)?;
if done {
break; // Exit if we received a Done message
}
}

Expand Down

0 comments on commit 0820d86

Please sign in to comment.