Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(kubernetes_logs source): Allow specification of a maximum line size to be applied after merging instead of just before #22582

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/22581_max_merged_line_bytes.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The `kubernetes_logs` source now supports a new configuration called `max_merged_line_bytes` which allows limiting the size
of lines even when they have been assembled via `auto_partial_merge` (the existing `max_line_bytes` field only applies
before merging, and as such makes it impossible to limit lines assembled via merging, short of specifying a size so small
that the continuation character isn't reached, and merging doesn't happen at all).

authors: ganelo
46 changes: 31 additions & 15 deletions lib/file-source/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ use std::io::{self, BufRead};

use bstr::Finder;
use bytes::BytesMut;
use tracing::warn;

use crate::FilePosition;

pub struct ReadResult {
pub successfully_read: Option<usize>,
pub discarded_for_size: Vec<BytesMut>,
}

/// Read up to `max_size` bytes from `reader`, splitting by `delim`
///
/// The function reads up to `max_size` bytes from `reader`, splitting the input
Expand All @@ -29,17 +33,18 @@ use crate::FilePosition;
/// Benchmarks indicate that this function processes in the high single-digit
/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
/// the overhead of setup dominates our benchmarks.
pub fn read_until_with_max_size<R: BufRead + ?Sized>(
reader: &mut R,
position: &mut FilePosition,
delim: &[u8],
buf: &mut BytesMut,
pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
reader: &'a mut R,
position: &'a mut FilePosition,
delim: &'a [u8],
buf: &'a mut BytesMut,
max_size: usize,
) -> io::Result<Option<usize>> {
) -> io::Result<ReadResult> {
let mut total_read = 0;
let mut discarding = false;
let delim_finder = Finder::new(delim);
let delim_len = delim.len();
let mut discarded_for_size = Vec::new();
loop {
let available: &[u8] = match reader.fill_buf() {
Ok(n) => n,
Expand Down Expand Up @@ -68,16 +73,16 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
total_read += used;

if !discarding && buf.len() > max_size {
warn!(
message = "Found line that exceeds max_line_bytes; discarding.",
internal_log_rate_limit = true
);
discarded_for_size.push(buf.clone());
discarding = true;
}

if done {
if !discarding {
return Ok(Some(total_read));
return Ok(ReadResult {
successfully_read: Some(total_read),
discarded_for_size,
});
} else {
discarding = false;
buf.clear();
Expand All @@ -87,7 +92,10 @@ pub fn read_until_with_max_size<R: BufRead + ?Sized>(
// us to observe an incomplete write. We return None here and let the loop continue
// next time the method is called. This is safe because the buffer is specific to this
// FileWatcher.
return Ok(None);
return Ok(ReadResult {
successfully_read: None,
discarded_for_size,
});
}
}
}
Expand All @@ -99,6 +107,8 @@ mod test {
use bytes::{BufMut, BytesMut};
use quickcheck::{QuickCheck, TestResult};

use crate::buffer::ReadResult;

use super::read_until_with_max_size;

fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
Expand Down Expand Up @@ -181,7 +191,10 @@ mod test {
)
.unwrap()
{
None => {
ReadResult {
successfully_read: None,
discarded_for_size: _,
} => {
// Subject only returns None if this is the last chunk _and_
// the chunk did not contain a delimiter _or_ the delimiter
// was outside the max_size range _or_ the current chunk is empty.
Expand All @@ -190,7 +203,10 @@ mod test {
.any(|details| ((details.chunk_index == idx) && details.within_max_size));
assert!(chunk.is_empty() || !has_valid_delimiter)
}
Some(total_read) => {
ReadResult {
successfully_read: Some(total_read),
discarded_for_size: _,
} => {
// Now that the function has returned we confirm that the
// returned details match our `first_delim` and also that
// the `buffer` is populated correctly.
Expand Down
16 changes: 14 additions & 2 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tracing::{debug, error, info, trace};

use crate::{
checkpointer::{Checkpointer, CheckpointsView},
file_watcher::FileWatcher,
file_watcher::{FileWatcher, RawLineResult},
fingerprinter::{FileFingerprint, Fingerprinter},
paths_provider::PathsProvider,
FileSourceInternalEvents, ReadFrom,
Expand Down Expand Up @@ -240,7 +240,19 @@ where

let start = time::Instant::now();
let mut bytes_read: usize = 0;
while let Ok(Some(line)) = watcher.read_line() {
while let Ok(RawLineResult {
raw_line: Some(line),
discarded_for_size,
}) = watcher.read_line()
{
discarded_for_size.iter().for_each(|buf| {
self.emitter.emit_file_line_too_long(
&buf.clone(),
self.max_line_bytes,
buf.len(),
)
});

let sz = line.bytes.len();
trace!(
message = "Read bytes.",
Expand Down
54 changes: 40 additions & 14 deletions lib/file-source/src/file_watcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use tracing::debug;
use vector_common::constants::GZIP_MAGIC;

use crate::{
buffer::read_until_with_max_size, metadata_ext::PortableFileExt, FilePosition, ReadFrom,
buffer::{read_until_with_max_size, ReadResult},
metadata_ext::PortableFileExt,
FilePosition, ReadFrom,
};
#[cfg(test)]
mod tests;
Expand All @@ -28,6 +30,12 @@ pub(super) struct RawLine {
pub bytes: Bytes,
}

#[derive(Debug)]
pub struct RawLineResult {
pub raw_line: Option<RawLine>,
pub discarded_for_size: Vec<BytesMut>,
}

/// The `FileWatcher` struct defines the polling based state machine which reads
/// from a file path, transparently updating the underlying file descriptor when
/// the file has been rolled over, as is common for logs.
Expand Down Expand Up @@ -207,7 +215,7 @@ impl FileWatcher {
/// This function will attempt to read a new line from its file, blocking,
/// up to some maximum but unspecified amount of time. `read_line` will open
/// a new file handler as needed, transparently to the caller.
pub(super) fn read_line(&mut self) -> io::Result<Option<RawLine>> {
pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
self.track_read_attempt();

let reader = &mut self.reader;
Expand All @@ -220,14 +228,23 @@ impl FileWatcher {
&mut self.buf,
self.max_line_bytes,
) {
Ok(Some(_)) => {
Ok(ReadResult {
successfully_read: Some(_),
discarded_for_size,
}) => {
self.track_read_success();
Ok(Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: self.buf.split().freeze(),
}),
discarded_for_size,
})
}
Ok(None) => {
Ok(ReadResult {
successfully_read: None,
discarded_for_size,
}) => {
if !self.file_findable() {
self.set_dead();
// File has been deleted, so return what we have in the buffer, even though it
Expand All @@ -237,16 +254,25 @@ impl FileWatcher {
if buf.is_empty() {
// EOF
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size,
})
} else {
Ok(Some(RawLine {
offset: initial_position,
bytes: buf,
}))
Ok(RawLineResult {
raw_line: Some(RawLine {
offset: initial_position,
bytes: buf,
}),
discarded_for_size,
})
}
} else {
self.reached_eof = true;
Ok(None)
Ok(RawLineResult {
raw_line: None,
discarded_for_size,
})
}
}
Err(e) => {
Expand Down
9 changes: 6 additions & 3 deletions lib/file-source/src/file_watcher/tests/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -96,11 +96,14 @@ fn experiment(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use bytes::Bytes;
use quickcheck::{QuickCheck, TestResult};

use crate::{
file_watcher::{tests::*, FileWatcher},
file_watcher::{tests::*, FileWatcher, RawLineResult},
ReadFrom,
};

Expand Down Expand Up @@ -63,17 +63,23 @@ fn experiment_no_truncations(actions: Vec<FileWatcherAction>) {
Err(_) => {
unreachable!();
}
Ok(Some(line)) if line.bytes.is_empty() => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) if line.bytes.is_empty() => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(None) => {
Ok(RawLineResult { raw_line: None, .. }) => {
attempts -= 1;
assert!(fwfiles[read_index].read_line().is_none());
continue;
}
Ok(Some(line)) => {
Ok(RawLineResult {
raw_line: Some(line),
..
}) => {
let exp = fwfiles[read_index].read_line().expect("could not readline");
assert_eq!(exp.into_bytes(), line.bytes);
// assert_eq!(sz, buf.len() + 1);
Expand Down
3 changes: 3 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ mod test {
time::Duration,
};

use bytes::BytesMut;
use flate2::write::GzEncoder;
use tempfile::{tempdir, TempDir};

Expand Down Expand Up @@ -803,5 +804,7 @@ mod test {
fn emit_files_open(&self, _: usize) {}

fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
}
}
9 changes: 9 additions & 0 deletions lib/file-source/src/internal_events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;

/// Every internal event in this crate has a corresponding
/// method in this trait which should emit the event.
pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
Expand All @@ -26,4 +28,11 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_files_open(&self, count: usize);

fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_file_line_too_long(
&self,
bytes: &BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
);
}
38 changes: 38 additions & 0 deletions src/internal_events/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
mod source {
use std::{io::Error, path::Path, time::Duration};

use bytes::BytesMut;
use metrics::counter;
use vector_lib::file_source::FileSourceInternalEvents;

Expand Down Expand Up @@ -496,6 +497,30 @@ mod source {
}
}

#[derive(Debug)]
pub struct FileLineTooBig<'a> {
pub bytes: &'a BytesMut,
pub configured_limit: usize,
pub encountered_size_so_far: usize,
}

impl InternalEvent for FileLineTooBig<'_> {
fn emit(self) {
warn!(
message = "Found line that exceeds max_line_bytes; discarding.",
bytes = ?self.bytes,
configured_limit = self.configured_limit,
encountered_size_so_far = self.encountered_size_so_far,
internal_log_rate_limit = true,
);
counter!(
"component_discarded_events_total",
"intentional" => "true",
)
.increment(1);
}
}

#[derive(Clone)]
pub struct FileSourceInternalEventsEmitter {
pub include_file_metric_tag: bool,
Expand Down Expand Up @@ -578,5 +603,18 @@ mod source {
fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
emit!(PathGlobbingError { path, error });
}

fn emit_file_line_too_long(
&self,
bytes: &bytes::BytesMut,
configured_limit: usize,
encountered_size_so_far: usize,
) {
emit!(FileLineTooBig {
bytes,
configured_limit,
encountered_size_so_far
});
}
}
}
Loading