Skip to content

Commit

Permalink
Increment component_discarded_events_total on violation of max_line_s…
Browse files Browse the repository at this point in the history
…ize and max_merged_line_size
  • Loading branch information
oganel committed Mar 7, 2025
1 parent dd5fad0 commit 354008c
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 46 deletions.
46 changes: 31 additions & 15 deletions lib/file-source/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use std::io::{self, BufRead};

use bstr::Finder;
use bytes::BytesMut;
use tracing::warn;

use crate::FilePosition;

pub struct ReadResult {
pub successfully_read: Option<usize>,
pub discarded_for_size: Vec<BytesMut>,
}

/// Read up to `max_size` bytes from `reader`, splitting by `delim`
///
/// The function reads up to `max_size` bytes from `reader`, splitting the input
Expand All @@ -29,17 +33,18 @@ use crate::FilePosition;
/// Benchmarks indicate that this function processes in the high single-digit
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
/// the overhead of setup dominates our benchmarks.
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
reader: &mut R,
position: &mut FilePosition,
delim: &[u8],
buf: &mut BytesMut,
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
reader: &'a mut R,
position: &'a mut FilePosition,
delim: &'a [u8],
buf: &'a mut BytesMut,
max_size: usize,
) -> io::Result<Option<usize>> {
) -> io::Result<ReadResult> {
let mut total_read = 0;
let mut discarding = false;
let delim_finder = Finder::new(delim);
let delim_len = delim.len();
let mut discarded_for_size = Vec::new();
loop {
let available: &[u8] = match reader.fill_buf() {
Ok(n) => n,
Expand Down Expand Up @@ -68,16 +73,16 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
total_read += used;

if !discarding && buf.len() > max_size {
warn!(
message = "Found line that exceeds max_line_bytes; discarding.",
internal_log_rate_limit = true
);
discarded_for_size.push(buf.clone());
discarding = true;
}

if done {
if !discarding {
return Ok(Some(total_read));
return Ok(ReadResult {
successfully_read: Some(total_read),
discarded_for_size,
});
} else {
discarding = false;
buf.clear();
Expand All @@ -87,7 +92,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
// us to observe an incomplete write. We return None here and let the loop continue
// next time the method is called. This is safe because the buffer is specific to this
// FileWatcher.
return Ok(None);
return Ok(ReadResult {
successfully_read: None,
discarded_for_size,
});
}
}
}
Expand All @@ -99,6 +107,8 @@ mod test {
use bytes::{BufMut, BytesMut};
use quickcheck::{QuickCheck, TestResult};

use crate::buffer::ReadResult;

use super::read_until_with_max_size;

fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
Expand Down Expand Up @@ -181,7 +191,10 @@ mod test {
)
.unwrap()
{
None => {
ReadResult {
successfully_read: None,
discarded_for_size: _,
} => {
// Subject only returns None if this is the last chunk _and_
// the chunk did not contain a delimiter _or_ the delimiter
// was outside the max_size range _or_ the current chunk is empty.
Expand All @@ -190,7 +203,10 @@ mod test {
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
assert!(chunk.is_empty() || !has_valid_delimiter)
}
Some(total_read) => {
ReadResult {
successfully_read: Some(total_read),
discarded_for_size: _,
} => {
// Now that the function has returned we confirm that the
// returned details match our `first_delim` and also that
// the `buffer` is populated correctly.
Expand Down
16 changes: 14 additions & 2 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};

use crate::{
checkpointer::{Checkpointer, CheckpointsView},
file_watcher::FileWatcher,
file_watcher::{FileWatcher, RawLineResult},
fingerprinter::{FileFingerprint, Fingerprinter},
paths_provider::PathsProvider,
FileSourceInternalEvents, ReadFrom,
Expand Down Expand Up @@ -240,7 +240,19 @@ where

let start = time::Instant::now();
let mut bytes_read: usize = 0;
while let Ok(Some(line)) = watcher.read_line() {
while let Ok(RawLineResult {
raw_line: Some(line),
discarded_for_size,
}) = watcher.read_line()
{
discarded_for_size.iter().for_each(|buf| {
self.emitter.emit_file_line_too_long(
&buf.clone(),
self.max_line_bytes,
buf.len(),
)
});

let sz = line.bytes.len();
trace!(
message = "Read bytes.",
Expand Down
54 changes: 40 additions & 14 deletions lib/file-source/src/file_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tracing::debug;
use vector_common::constants::GZIP_MAGIC;

use crate::{
buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom,
buffer::{read_until_with_max_size, ReadResult},
metadata_ext::PortableFileExt,
FilePosition, ReadFrom,
};
#[cfg(test)]
mod tests;
Expand All @@ -28,6 +30,12 @@ pub(super) struct RawLine {
pub bytes: Bytes,
}

#[derive(Debug)]
pub struct RawLineResult {
pub raw_line: Option<RawLine>,
pub discarded_for_size: Vec<BytesMut>,
}

/// The `FileWatcher` struct defines the polling based state machine which reads
/// from a file path, transparently updating the underlying file descriptor when
/// the file has been rolled over, as is common for logs.
Expand Down Expand Up @@ -207,7 +215,7 @@ impl FileWatcher {
/// This function will attempt to read a new line from its file, blocking,
/// up to some maximum but unspecified amount of time. `read_line` will open
/// a new file handler as needed, transparently to the caller.
pub(super) fn read_line(&mut self) -> io::Result<Option<RawLine>> {
pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
self.track_read_attempt();

let reader = &mut self.reader;
Expand All @@ -220,14 +228,23 @@ impl FileWatcher {
&mut self.buf,
self.max_line_bytes,
) {
Ok(Some(_)) => {
Ok(ReadResult {
successfully_read: Some(_),
discarded_for_size,
}) => {
self.track_read_success();
Ok(Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}),
discarded_for_size,
})
}
Ok(None) => {
Ok(ReadResult {
successfully_read: None,
discarded_for_size,
}) => {
if !self.file_findable() {
self.set_dead();
// File has been deleted, so return what we have in the buffer, even though it
Expand All @@ -237,16 +254,25 @@ impl FileWatcher {
if buf.is_empty() {
// EOF
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size,
})
} else {
Ok(Some(RawLine {
offset: initial_position,
bytes: buf,
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: buf,
}),
discarded_for_size,
})
}
} else {
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size,
})
}
}
Err(e) => {
Expand Down
9 changes: 6 additions & 3 deletions lib/file-source/src/file_watcher/tests/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -96,11 +96,14 @@ fn experiment(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(Some(line)) => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) => {
let exp = fwfiles[read_index].read_line().expect("could not readline");
assert_eq!(exp.into_bytes(), line.bytes);
// assert_eq!(sz, buf.len() + 1);
Expand Down
3 changes: 3 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ mod test {
time::Duration,
};

use bytes::BytesMut;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

Expand Down Expand Up @@ -803,5 +804,7 @@ mod test {
fn emit_files_open(&self, _: usize) {}

fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
}
}
9 changes: 9 additions & 0 deletions lib/file-source/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;

/// Every internal event in this crate has a corresponding
/// method in this trait which should emit the event.
pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
Expand All @@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_files_open(&self, count: usize);

fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_file_line_too_long(
&self,
bytes: &BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
);
}
38 changes: 38 additions & 0 deletions src/internal_events/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
mod source {
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;
use metrics::counter;
use vector_lib::file_source::FileSourceInternalEvents;

Expand Down Expand Up @@ -496,6 +497,30 @@ mod source {
}
}

#[derive(Debug)]
pub struct FileLineTooBig<'a> {
pub bytes: &'a BytesMut,
pub configured_limit: usize,
pub encountered_size_so_far: usize,
}

impl InternalEvent for FileLineTooBig<'_> {
fn emit(self) {
warn!(
message = "Found line that exceeds max_line_bytes; discarding.",
bytes = ?self.bytes,
configured_limit = self.configured_limit,
encountered_size_so_far = self.encountered_size_so_far,
internal_log_rate_limit = true,
);
counter!(
"component_discarded_events_total",
"intentional" => "true",
)
.increment(1);
}
}

#[derive(Clone)]
pub struct FileSourceInternalEventsEmitter {
pub include_file_metric_tag: bool,
Expand Down Expand Up @@ -578,5 +603,18 @@ mod source {
fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
emit!(PathGlobbingError { path, error });
}

fn emit_file_line_too_long(
&self,
bytes: &bytes::BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
) {
emit!(FileLineTooBig {
bytes,
configured_limit,
encountered_size_so_far
});
}
}
}
Loading

0 comments on commit 354008c

Please sign in to comment.