From 1013e9fd3806b29492bbdf00ddc3387b43357572 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 27 Feb 2025 15:26:48 -0500 Subject: [PATCH 1/5] Add config for maximum allowed line size after merging --- src/sources/kubernetes_logs/mod.rs | 31 ++++++- .../kubernetes_logs/partial_events_merger.rs | 88 +++++++++++++++++-- 2 files changed, 106 insertions(+), 13 deletions(-) diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index ef0f7bd947023..c7f66205f94b8 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -4,7 +4,7 @@ //! running inside the cluster as a DaemonSet. #![deny(missing_docs)] -use std::{path::PathBuf, time::Duration}; +use std::{cmp::min, path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; @@ -193,6 +193,16 @@ pub struct Config { #[configurable(metadata(docs::type_unit = "bytes"))] max_line_bytes: usize, + /// The maximum number of bytes a line can contain - after merging - before being discarded. + /// + /// This protects against malformed lines or tailing incorrect files. + /// + /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this + /// config will have no practical impact (the same is true of auto_partial_merge). Finally, the smaller of max_merged_line_bytes and max_line_bytes will apply + /// if auto_partial_merge is true, so if this is set to eg 1 MiB but max_line_bytes is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped. + #[configurable(metadata(docs::type_unit = "bytes"))] + max_merged_line_bytes: usize, + /// The number of lines to read for generating the checksum. /// /// If your files share a common header that is not always a fixed size, @@ -294,6 +304,7 @@ impl Default for Config { max_read_bytes: default_max_read_bytes(), oldest_first: default_oldest_first(), max_line_bytes: default_max_line_bytes(), + max_merged_line_bytes: default_max_merged_line_bytes(), fingerprint_lines: default_fingerprint_lines(), glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(), ingestion_timestamp_field: None, @@ -553,6 +564,7 @@ struct Source { max_read_bytes: usize, oldest_first: bool, max_line_bytes: usize, + max_merged_line_bytes: usize, fingerprint_lines: usize, glob_minimum_cooldown: Duration, use_apiserver_cache: bool, @@ -641,6 +653,7 @@ impl Source { max_read_bytes: config.max_read_bytes, oldest_first: config.oldest_first, max_line_bytes: config.max_line_bytes, + max_merged_line_bytes: config.max_merged_line_bytes, fingerprint_lines: config.fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache: config.use_apiserver_cache, @@ -676,6 +689,7 @@ impl Source { max_read_bytes, oldest_first, max_line_bytes, + max_merged_line_bytes, fingerprint_lines, glob_minimum_cooldown, use_apiserver_cache, @@ -775,6 +789,11 @@ impl Source { let ignore_before = calculate_ignore_before(ignore_older_secs); + let mut resolved_max_line_bytes = max_line_bytes; + if auto_partial_merge { + resolved_max_line_bytes = min(max_line_bytes, max_merged_line_bytes); + } + // TODO: maybe more of the parameters have to be configurable. let checkpointer = Checkpointer::new(&data_dir); @@ -799,7 +818,7 @@ impl Source { ignore_before, // The maximum number of bytes a line can contain before being discarded. This // protects against malformed lines or tailing incorrect files. - max_line_bytes, + max_line_bytes: resolved_max_line_bytes, // Delimiter bytes that is used to read the file line-by-line line_delimiter: Bytes::from("\n"), // The directory where to keep the checkpoints. @@ -817,7 +836,7 @@ impl Source { ignored_header_bytes: 0, lines: fingerprint_lines, }, - max_line_length: max_line_bytes, + max_line_length: resolved_max_line_bytes, ignore_not_found: true, }, oldest_first, @@ -893,7 +912,7 @@ impl Source { let (events_count, _) = events.size_hint(); let mut stream = if auto_partial_merge { - merge_partial_events(events, log_namespace).left_stream() + merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream() } else { events.right_stream() }; @@ -1025,6 +1044,10 @@ const fn default_max_line_bytes() -> usize { 32 * 1024 // 32 KiB } +const fn default_max_merged_line_bytes() -> usize { + 50 * 1024 * 1024 // 50 MiB +} + const fn default_glob_minimum_cooldown_ms() -> Duration { Duration::from_millis(60_000) } diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 14628df46ad6e..a377c129f040a 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -29,25 +29,42 @@ impl PartialEventMergeState { file: &str, message_path: &OwnedTargetPath, expiration_time: Duration, + max_merged_line_bytes: usize, ) { + let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { + // don't bother continuing to process events that are already too big + if bucket.too_big { + return; + } + // merging with existing event if let (Some(Value::Bytes(prev_value)), Some(Value::Bytes(new_value))) = (bucket.event.get_mut(message_path), event.get(message_path)) { - let mut bytes_mut = BytesMut::new(); bytes_mut.extend_from_slice(prev_value); bytes_mut.extend_from_slice(new_value); + + // drop event if it's bigger than max allowed + if bytes_mut.len() > max_merged_line_bytes { + bucket.too_big = true; + } + *prev_value = bytes_mut.freeze(); } } else { + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { + bytes_mut.extend_from_slice(event_bytes); + } + // new event self.buckets.insert( file.to_owned(), Bucket { event, expiration: Instant::now() + expiration_time, + too_big: bytes_mut.len() > max_merged_line_bytes, }, ); } @@ -70,7 +87,9 @@ impl PartialEventMergeState { fn flush_events(&mut self, emitter: &mut Emitter) { for (_, bucket) in self.buckets.drain() { - emitter.emit(bucket.event); + if !bucket.too_big { + emitter.emit(bucket.event); + } } } } @@ -78,13 +97,15 @@ impl PartialEventMergeState { struct Bucket { event: LogEvent, expiration: Instant, + too_big: bool, } pub fn merge_partial_events( stream: impl Stream + 'static, log_namespace: LogNamespace, + max_merged_line_bytes: usize, ) -> impl Stream { - merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME) + merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME, max_merged_line_bytes) } // internal function that allows customizing the expiration time (for testing) @@ -92,6 +113,7 @@ fn merge_partial_events_with_custom_expiration( stream: impl Stream + 'static, log_namespace: LogNamespace, expiration_time: Duration, + max_merged_line_bytes: usize, ) -> impl Stream { let partial_flag_path = match log_namespace { LogNamespace::Vector => { @@ -132,7 +154,7 @@ fn merge_partial_events_with_custom_expiration( .map(|x| x.to_string()) .unwrap_or_default(); - state.add_event(event, &file, &message_path, expiration_time); + state.add_event(event, &file, &message_path, expiration_time, max_merged_line_bytes); if !is_partial { if let Some(log_event) = state.remove_event(&file) { emitter.emit(log_event); @@ -164,7 +186,7 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -174,6 +196,18 @@ mod test { ); } + #[tokio::test] + async fn merge_single_event_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + + let input_stream = futures::stream::iter([e_1.into()]); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 1); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn merge_multiple_events_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -184,7 +218,7 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -194,6 +228,23 @@ mod test { ); } + #[tokio::test] + async fn merge_multiple_events_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 32 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_flush_legacy() { let mut e_1 = LogEvent::from("test message 1"); @@ -205,7 +256,7 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy); + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -215,6 +266,24 @@ mod test { ); } + #[tokio::test] + async fn multiple_events_flush_legacy_too_big() { + let mut e_1 = LogEvent::from("test message 1"); + e_1.insert("foo", 1); + e_1.insert("_partial", true); + + let mut e_2 = LogEvent::from("test message 2"); + e_2.insert("foo2", 1); + e_1.insert("_partial", true); + + let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); + // 32 > length of first message but less than the two combined + let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 32); + + let output: Vec = output_stream.collect().await; + assert_eq!(output.len(), 0); + } + #[tokio::test] async fn multiple_events_expire_legacy() { let mut e_1 = LogEvent::from("test message"); @@ -233,6 +302,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), + 50*1024*1024, ); let output: Vec = output_stream.take(2).collect().await; @@ -256,7 +326,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -286,7 +356,7 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector); + let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 2b3ea0a37de9f8363f0d981f0317edae5a750e72 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Thu, 27 Feb 2025 15:58:46 -0500 Subject: [PATCH 2/5] Add warns when we drop partial logs for being too big; shift some comments around --- .../kubernetes_logs/partial_events_merger.rs | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index a377c129f040a..e7270872d5976 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -33,7 +33,7 @@ impl PartialEventMergeState { ) { let mut bytes_mut = BytesMut::new(); if let Some(bucket) = self.buckets.get_mut(file) { - // don't bother continuing to process events that are already too big + // don't bother continuing to process new partial events that match existing ones that are already too big if bucket.too_big { return; } @@ -49,22 +49,36 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { bucket.too_big = true; + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + internal_log_rate_limit = true + ); } *prev_value = bytes_mut.freeze(); } } else { + // new event + if let Some(Value::Bytes(event_bytes)) = event.get(message_path) { bytes_mut.extend_from_slice(event_bytes); } - // new event + let too_big = bytes_mut.len() > max_merged_line_bytes; + + if too_big { + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + internal_log_rate_limit = true + ); + } + self.buckets.insert( file.to_owned(), Bucket { event, expiration: Instant::now() + expiration_time, - too_big: bytes_mut.len() > max_merged_line_bytes, + too_big, }, ); } From cd9d607f46bd7a94d3a22c445b7ad95c83b81c3c Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 3 Mar 2025 16:26:02 -0500 Subject: [PATCH 3/5] Add changelog --- changelog.d/22581_max_merged_line_bytes.feature.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog.d/22581_max_merged_line_bytes.feature.md diff --git a/changelog.d/22581_max_merged_line_bytes.feature.md b/changelog.d/22581_max_merged_line_bytes.feature.md new file mode 100644 index 0000000000000..b7ce5502f09b2 --- /dev/null +++ b/changelog.d/22581_max_merged_line_bytes.feature.md @@ -0,0 +1,6 @@ +The `kubernetes_logs` source now supports a new configuration called `max_merged_line_bytes` which allows limiting the size +of lines even when they have been assembled via `auto_partial_merge` (the existing `max_line_bytes` field only applies +before merging, and as such makes it impossible to limit lines assembled via merging, short of specifying a size so small +that the continuation character isn't reached, and merging doesn't happen at all). + +authors: ganelo From dd5fad04abf7c50cd6c937e5da344da8f36f0204 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Mon, 3 Mar 2025 16:28:04 -0500 Subject: [PATCH 4/5] Format --- .../kubernetes_logs/partial_events_merger.rs | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index e7270872d5976..12899e2473a1b 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -119,7 +119,12 @@ pub fn merge_partial_events( log_namespace: LogNamespace, max_merged_line_bytes: usize, ) -> impl Stream { - merge_partial_events_with_custom_expiration(stream, log_namespace, EXPIRATION_TIME, max_merged_line_bytes) + merge_partial_events_with_custom_expiration( + stream, + log_namespace, + EXPIRATION_TIME, + max_merged_line_bytes, + ) } // internal function that allows customizing the expiration time (for testing) @@ -168,7 +173,13 @@ fn merge_partial_events_with_custom_expiration( .map(|x| x.to_string()) .unwrap_or_default(); - state.add_event(event, &file, &message_path, expiration_time, max_merged_line_bytes); + state.add_event( + event, + &file, + &message_path, + expiration_time, + max_merged_line_bytes, + ); if !is_partial { if let Some(log_event) = state.remove_event(&file) { emitter.emit(log_event); @@ -200,7 +211,8 @@ mod test { e_1.insert("foo", 1); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -232,7 +244,8 @@ mod test { e_2.insert("foo2", 1); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -270,7 +283,8 @@ mod test { e_1.insert("_partial", true); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Legacy, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Legacy, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -316,7 +330,7 @@ mod test { input_stream, LogNamespace::Legacy, Duration::from_secs(1), - 50*1024*1024, + 50 * 1024 * 1024, ); let output: Vec = output_stream.take(2).collect().await; @@ -340,7 +354,8 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); @@ -370,7 +385,8 @@ mod test { ); let input_stream = futures::stream::iter([e_1.into(), e_2.into()]); - let output_stream = merge_partial_events(input_stream, LogNamespace::Vector, 50*1024*1024); + let output_stream = + merge_partial_events(input_stream, LogNamespace::Vector, 50 * 1024 * 1024); let output: Vec = output_stream.collect().await; assert_eq!(output.len(), 1); From 354008cc9896cca81535f3f455bddd528c231f86 Mon Sep 17 00:00:00 2001 From: Orri Ganel Date: Fri, 7 Mar 2025 14:29:43 -0500 Subject: [PATCH 5/5] Increment component_discarded_events_total on violation of max_line_size and max_merged_line_size --- lib/file-source/src/buffer.rs | 46 ++++++++++------ lib/file-source/src/file_server.rs | 16 +++++- lib/file-source/src/file_watcher/mod.rs | 54 ++++++++++++++----- .../src/file_watcher/tests/experiment.rs | 9 ++-- .../tests/experiment_no_truncations.rs | 14 +++-- lib/file-source/src/fingerprinter.rs | 3 ++ lib/file-source/src/internal_events.rs | 9 ++++ src/internal_events/file.rs | 38 +++++++++++++ src/internal_events/kubernetes_logs.rs | 24 +++++++++ .../kubernetes_logs/partial_events_merger.rs | 20 ++++--- 10 files changed, 187 insertions(+), 46 deletions(-) diff --git a/lib/file-source/src/buffer.rs b/lib/file-source/src/buffer.rs index c8dbe1f400905..bfdb860b0ddbf 100644 --- a/lib/file-source/src/buffer.rs +++ b/lib/file-source/src/buffer.rs @@ -2,10 +2,14 @@ use std::io::{self, BufRead}; use bstr::Finder; use bytes::BytesMut; -use tracing::warn; use crate::FilePosition; +pub struct ReadResult { + pub successfully_read: Option, + pub discarded_for_size: Vec, +} + /// Read up to `max_size` bytes from `reader`, splitting by `delim` /// /// The function reads up to `max_size` bytes from `reader`, splitting the input @@ -29,17 +33,18 @@ use crate::FilePosition; /// Benchmarks indicate that this function processes in the high single-digit /// GiB/s range for buffers of length 1KiB. For buffers any smaller than this /// the overhead of setup dominates our benchmarks. -pub fn read_until_with_max_size( - reader: &mut R, - position: &mut FilePosition, - delim: &[u8], - buf: &mut BytesMut, +pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>( + reader: &'a mut R, + position: &'a mut FilePosition, + delim: &'a [u8], + buf: &'a mut BytesMut, max_size: usize, -) -> io::Result> { +) -> io::Result { let mut total_read = 0; let mut discarding = false; let delim_finder = Finder::new(delim); let delim_len = delim.len(); + let mut discarded_for_size = Vec::new(); loop { let available: &[u8] = match reader.fill_buf() { Ok(n) => n, @@ -68,16 +73,16 @@ pub fn read_until_with_max_size( total_read += used; if !discarding && buf.len() > max_size { - warn!( - message = "Found line that exceeds max_line_bytes; discarding.", - internal_log_rate_limit = true - ); + discarded_for_size.push(buf.clone()); discarding = true; } if done { if !discarding { - return Ok(Some(total_read)); + return Ok(ReadResult { + successfully_read: Some(total_read), + discarded_for_size, + }); } else { discarding = false; buf.clear(); @@ -87,7 +92,10 @@ pub fn read_until_with_max_size( // us to observe an incomplete write. We return None here and let the loop continue // next time the method is called. This is safe because the buffer is specific to this // FileWatcher. - return Ok(None); + return Ok(ReadResult { + successfully_read: None, + discarded_for_size, + }); } } } @@ -99,6 +107,8 @@ mod test { use bytes::{BufMut, BytesMut}; use quickcheck::{QuickCheck, TestResult}; + use crate::buffer::ReadResult; + use super::read_until_with_max_size; fn qc_inner(chunks: Vec>, delim: u8, max_size: NonZeroU8) -> TestResult { @@ -181,7 +191,10 @@ mod test { ) .unwrap() { - None => { + ReadResult { + successfully_read: None, + discarded_for_size: _, + } => { // Subject only returns None if this is the last chunk _and_ // the chunk did not contain a delimiter _or_ the delimiter // was outside the max_size range _or_ the current chunk is empty. @@ -190,7 +203,10 @@ mod test { .any(|details| ((details.chunk_index == idx) && details.within_max_size)); assert!(chunk.is_empty() || !has_valid_delimiter) } - Some(total_read) => { + ReadResult { + successfully_read: Some(total_read), + discarded_for_size: _, + } => { // Now that the function has returned we confirm that the // returned details match our `first_delim` and also that // the `buffer` is populated correctly. diff --git a/lib/file-source/src/file_server.rs b/lib/file-source/src/file_server.rs index 510b20694d0eb..d460299976e70 100644 --- a/lib/file-source/src/file_server.rs +++ b/lib/file-source/src/file_server.rs @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace}; use crate::{ checkpointer::{Checkpointer, CheckpointsView}, - file_watcher::FileWatcher, + file_watcher::{FileWatcher, RawLineResult}, fingerprinter::{FileFingerprint, Fingerprinter}, paths_provider::PathsProvider, FileSourceInternalEvents, ReadFrom, @@ -240,7 +240,19 @@ where let start = time::Instant::now(); let mut bytes_read: usize = 0; - while let Ok(Some(line)) = watcher.read_line() { + while let Ok(RawLineResult { + raw_line: Some(line), + discarded_for_size, + }) = watcher.read_line() + { + discarded_for_size.iter().for_each(|buf| { + self.emitter.emit_file_line_too_long( + &buf.clone(), + self.max_line_bytes, + buf.len(), + ) + }); + let sz = line.bytes.len(); trace!( message = "Read bytes.", diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index 80db2b9bc876c..662374ad3c8e4 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -12,7 +12,9 @@ use tracing::debug; use vector_common::constants::GZIP_MAGIC; use crate::{ - buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom, + buffer::{read_until_with_max_size, ReadResult}, + metadata_ext::PortableFileExt, + FilePosition, ReadFrom, }; #[cfg(test)] mod tests; @@ -28,6 +30,12 @@ pub(super) struct RawLine { pub bytes: Bytes, } +#[derive(Debug)] +pub struct RawLineResult { + pub raw_line: Option, + pub discarded_for_size: Vec, +} + /// The `FileWatcher` struct defines the polling based state machine which reads /// from a file path, transparently updating the underlying file descriptor when /// the file has been rolled over, as is common for logs. @@ -207,7 +215,7 @@ impl FileWatcher { /// This function will attempt to read a new line from its file, blocking, /// up to some maximum but unspecified amount of time. `read_line` will open /// a new file handler as needed, transparently to the caller. - pub(super) fn read_line(&mut self) -> io::Result> { + pub(super) fn read_line(&mut self) -> io::Result { self.track_read_attempt(); let reader = &mut self.reader; @@ -220,14 +228,23 @@ impl FileWatcher { &mut self.buf, self.max_line_bytes, ) { - Ok(Some(_)) => { + Ok(ReadResult { + successfully_read: Some(_), + discarded_for_size, + }) => { self.track_read_success(); - Ok(Some(RawLine { - offset: initial_position, - bytes: self.buf.split().freeze(), - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: self.buf.split().freeze(), + }), + discarded_for_size, + }) } - Ok(None) => { + Ok(ReadResult { + successfully_read: None, + discarded_for_size, + }) => { if !self.file_findable() { self.set_dead(); // File has been deleted, so return what we have in the buffer, even though it @@ -237,16 +254,25 @@ impl FileWatcher { if buf.is_empty() { // EOF self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size, + }) } else { - Ok(Some(RawLine { - offset: initial_position, - bytes: buf, - })) + Ok(RawLineResult { + raw_line: Some(RawLine { + offset: initial_position, + bytes: buf, + }), + discarded_for_size, + }) } } else { self.reached_eof = true; - Ok(None) + Ok(RawLineResult { + raw_line: None, + discarded_for_size, + }) } } Err(e) => { diff --git a/lib/file-source/src/file_watcher/tests/experiment.rs b/lib/file-source/src/file_watcher/tests/experiment.rs index decdbdab98240..cbbfabe0a67b7 100644 --- a/lib/file-source/src/file_watcher/tests/experiment.rs +++ b/lib/file-source/src/file_watcher/tests/experiment.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -96,11 +96,14 @@ fn experiment(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; continue; } diff --git a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs index ee8a24a9f95bf..78aa7536b9a38 100644 --- a/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs +++ b/lib/file-source/src/file_watcher/tests/experiment_no_truncations.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use quickcheck::{QuickCheck, TestResult}; use crate::{ - file_watcher::{tests::*, FileWatcher}, + file_watcher::{tests::*, FileWatcher, RawLineResult}, ReadFrom, }; @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec) { Err(_) => { unreachable!(); } - Ok(Some(line)) if line.bytes.is_empty() => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) if line.bytes.is_empty() => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(None) => { + Ok(RawLineResult { raw_line: None, .. }) => { attempts -= 1; assert!(fwfiles[read_index].read_line().is_none()); continue; } - Ok(Some(line)) => { + Ok(RawLineResult { + raw_line: Some(line), + .. + }) => { let exp = fwfiles[read_index].read_line().expect("could not readline"); assert_eq!(exp.into_bytes(), line.bytes); // assert_eq!(sz, buf.len() + 1); diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 542e67393da35..0e864c6cfac73 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -382,6 +382,7 @@ mod test { time::Duration, }; + use bytes::BytesMut; use flate2::write::GzEncoder; use tempfile::{tempdir, TempDir}; @@ -803,5 +804,7 @@ mod test { fn emit_files_open(&self, _: usize) {} fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {} + + fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {} } } diff --git a/lib/file-source/src/internal_events.rs b/lib/file-source/src/internal_events.rs index 9eb60e65397a1..726ec09f29999 100644 --- a/lib/file-source/src/internal_events.rs +++ b/lib/file-source/src/internal_events.rs @@ -1,5 +1,7 @@ use std::{io::Error, path::Path, time::Duration}; +use bytes::BytesMut; + /// Every internal event in this crate has a corresponding /// method in this trait which should emit the event. pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { @@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static { fn emit_files_open(&self, count: usize); fn emit_path_globbing_failed(&self, path: &Path, error: &Error); + + fn emit_file_line_too_long( + &self, + bytes: &BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ); } diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index fba86d6ad1b72..acc8212940c5f 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -106,6 +106,7 @@ impl InternalEvent for FileIoError<'_, P> { mod source { use std::{io::Error, path::Path, time::Duration}; + use bytes::BytesMut; use metrics::counter; use vector_lib::file_source::FileSourceInternalEvents; @@ -496,6 +497,30 @@ mod source { } } + #[derive(Debug)] + pub struct FileLineTooBig<'a> { + pub bytes: &'a BytesMut, + pub configured_limit: usize, + pub encountered_size_so_far: usize, + } + + impl InternalEvent for FileLineTooBig<'_> { + fn emit(self) { + warn!( + message = "Found line that exceeds max_line_bytes; discarding.", + bytes = ?self.bytes, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + ); + counter!( + "component_discarded_events_total", + "intentional" => "true", + ) + .increment(1); + } + } + #[derive(Clone)] pub struct FileSourceInternalEventsEmitter { pub include_file_metric_tag: bool, @@ -578,5 +603,18 @@ mod source { fn emit_path_globbing_failed(&self, path: &Path, error: &Error) { emit!(PathGlobbingError { path, error }); } + + fn emit_file_line_too_long( + &self, + bytes: &bytes::BytesMut, + configured_limit: usize, + encountered_size_so_far: usize, + ) { + emit!(FileLineTooBig { + bytes, + configured_limit, + encountered_size_so_far + }); + } } } diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs index 77dac30b1945b..c3ea2cec51587 100644 --- a/src/internal_events/kubernetes_logs.rs +++ b/src/internal_events/kubernetes_logs.rs @@ -205,3 +205,27 @@ impl InternalEvent for KubernetesLifecycleError { }); } } + +#[derive(Debug)] +pub struct KubernetesMergedLineTooBig<'a> { + pub event: &'a Event, + pub configured_limit: usize, + pub encountered_size_so_far: usize, +} + +impl InternalEvent for KubernetesMergedLineTooBig<'_> { + fn emit(self) { + warn!( + message = "Found line that exceeds max_merged_line_bytes; discarding.", + event = ?self.event, + configured_limit = self.configured_limit, + encountered_size_so_far = self.encountered_size_so_far, + internal_log_rate_limit = true, + ); + counter!( + "component_discarded_events_total", + "intentional" => "true", + ) + .increment(1); + } +} diff --git a/src/sources/kubernetes_logs/partial_events_merger.rs b/src/sources/kubernetes_logs/partial_events_merger.rs index 12899e2473a1b..809172569b2f5 100644 --- a/src/sources/kubernetes_logs/partial_events_merger.rs +++ b/src/sources/kubernetes_logs/partial_events_merger.rs @@ -11,6 +11,7 @@ use vrl::owned_value_path; use crate::event; use crate::event::{Event, LogEvent, Value}; +use crate::internal_events::KubernetesMergedLineTooBig; use crate::sources::kubernetes_logs::transform_utils::get_message_path; /// The key we use for `file` field. @@ -49,10 +50,11 @@ impl PartialEventMergeState { // drop event if it's bigger than max allowed if bytes_mut.len() > max_merged_line_bytes { bucket.too_big = true; - warn!( - message = "Found line that exceeds max_merged_line_bytes; discarding.", - internal_log_rate_limit = true - ); + emit!(KubernetesMergedLineTooBig { + event: &Event::Log(event), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); } *prev_value = bytes_mut.freeze(); @@ -67,10 +69,12 @@ impl PartialEventMergeState { let too_big = bytes_mut.len() > max_merged_line_bytes; if too_big { - warn!( - message = "Found line that exceeds max_merged_line_bytes; discarding.", - internal_log_rate_limit = true - ); + // perf impact of clone should be minimal since being here means no further processing of this event will occur + emit!(KubernetesMergedLineTooBig { + event: &Event::Log(event.clone()), + configured_limit: max_merged_line_bytes, + encountered_size_so_far: bytes_mut.len() + }); } self.buckets.insert(