Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into merge-down-v0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
jszwedko committed Oct 8, 2021
2 parents 3d34cde + 6444c26 commit 79c266f
Show file tree
Hide file tree
Showing 97 changed files with 2,945 additions and 1,484 deletions.
17 changes: 11 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,14 @@ tonic-build = { version = "0.5", default-features = false, features = ["transpor

[dev-dependencies]
approx = "0.5.0"
assert_cmd = "2.0.1"
assert_cmd = "2.0.2"
base64 = "0.13.0"
criterion = { version = "0.3.5", features = ["html_reports", "async_tokio"] }
libc = "0.2.103"
libz-sys = "1.1.3"
matches = "0.1.9"
pretty_assertions = "1.0.0"
reqwest = { version = "0.11.4", features = ["json"] }
reqwest = { version = "0.11.5", features = ["json"] }
tempfile = "3.2.0"
tokio = { version = "1.12.0", features = ["test-util"] }
tokio-test = "0.4.2"
Expand Down Expand Up @@ -454,7 +454,7 @@ sources-exec = ["codecs"]
sources-file = ["file-source"]
sources-fluent = ["base64", "listenfd", "tokio-util/net", "rmpv", "rmp-serde", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "serde_bytes", "codecs"]
sources-generator = ["fakedata", "codecs"]
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query"]
sources-heroku_logs = ["sources-utils-http", "sources-utils-http-query", "codecs"]
sources-host_metrics = ["heim"]
sources-http = ["sources-utils-http", "codecs", "sources-utils-http-query"]
sources-internal_logs = []
Expand Down
12 changes: 6 additions & 6 deletions benches/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ fn benchmark_batch(c: &mut Criterion) {
group.sampling_mode(SamplingMode::Flat);

let cases = [
(Compression::None, bytesize::mib(2u64)),
(Compression::None, bytesize::kib(500u64)),
(Compression::gzip_default(), bytesize::mib(2u64)),
(Compression::gzip_default(), bytesize::kib(500u64)),
(Compression::None, 2_000_000),
(Compression::None, 500_000),
(Compression::gzip_default(), 2_000_000),
(Compression::gzip_default(), 500_000),
];

let input: Vec<_> = random_lines(event_len)
Expand All @@ -38,7 +38,7 @@ fn benchmark_batch(c: &mut Criterion) {
let rt = runtime();
let (acker, _) = Acker::new_for_testing();
let batch = BatchSettings::default()
.bytes(*batch_size as u64)
.bytes(*batch_size)
.events(num_events)
.size;
let batch_sink = PartitionBatchSink::new(
Expand Down Expand Up @@ -72,7 +72,7 @@ fn benchmark_batch(c: &mut Criterion) {
let rt = runtime();
let (acker, _) = Acker::new_for_testing();
let batch = BatchSettings::default()
.bytes(*batch_size as u64)
.bytes(*batch_size)
.events(num_events)
.size;
let batch_sink = BatchSink::new(
Expand Down
5 changes: 4 additions & 1 deletion benches/enrichment_tables_file.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::prelude::*;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use enrichment::Case;
use std::collections::BTreeMap;
use std::{collections::BTreeMap, time::SystemTime};
use vector::enrichment_tables::{file::File, Condition, Table};
use vrl::Value;

Expand Down Expand Up @@ -38,6 +38,9 @@ fn benchmark_enrichment_tables_file(c: &mut Criterion) {
.collect::<Vec<_>>();

let mut file = File::new(
Default::default(),
Default::default(),
SystemTime::now(),
data,
// Headers.
(0..10)
Expand Down
20 changes: 20 additions & 0 deletions benches/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ fn bench_add_fields(c: &mut Criterion) {
stream::iter(buf.into_iter())
}))
}
// ignoring multiple outputs for now
Transform::FallibleFunction(t) => {
let mut t = t.clone();
Box::pin(rx.flat_map(move |v| {
let mut buf = Vec::with_capacity(1);
let mut err_buf = Vec::with_capacity(1);
t.transform(&mut buf, &mut err_buf, v);
stream::iter(buf.into_iter())
}))
}
Transform::Task(t) => t.transform(Box::pin(rx)),
};

Expand Down Expand Up @@ -148,6 +158,16 @@ fn bench_field_filter(c: &mut Criterion) {
stream::iter(buf.into_iter())
}))
}
// ignoring multiple outputs for now
Transform::FallibleFunction(t) => {
let mut t = t.clone();
Box::pin(rx.flat_map(move |v| {
let mut buf = Vec::with_capacity(1);
let mut err_buf = Vec::with_capacity(1);
t.transform(&mut buf, &mut err_buf, v);
stream::iter(buf.into_iter())
}))
}
Transform::Task(t) => t.transform(Box::pin(rx)),
};

Expand Down
2 changes: 1 addition & 1 deletion config/examples/docs_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ data_dir = "/var/lib/vector"
compression = "gzip" # compress final objects
encoding = "ndjson" # new line delimited JSON
[sinks.s3_archives.batch]
max_size = 10000000 # 10mb uncompressed
max_bytes = 10000000 # 10mb uncompressed
7 changes: 3 additions & 4 deletions docs/specs/component.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ or receiving one or more Vector events.

#### EventsSent

*All components* MUST emit an `EventsSent` event immediately before sending the
event down stream. This should happen before any transmission preparation, such
as encoding.
*All components* MUST emit an `EventsSent` event immediately after
sending the events down stream, if the transmission was successful.

* Properties
* `count` - The count of Vector events.
Expand All @@ -174,7 +173,7 @@ as encoding.
#### BytesSent

*Sinks* MUST emit a `BytesSent` event immediately after sending bytes to the
downstream target regardless if the transmission was successful or not.
downstream target, if the transmission was successful.

* Properties
* `byte_size`
Expand Down
6 changes: 6 additions & 0 deletions lib/enrichment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub trait Table: DynClone {
/// # Errors
/// Errors if the fields are not in the table.
fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, String>;

/// Returns a list of the field names that are in each index
fn index_fields(&self) -> Vec<(Case, Vec<String>)>;

/// Returns true if the underlying data has changed and the table needs reloading.
fn needs_reload(&self) -> bool;
}

dyn_clone::clone_trait_object!(Table);
Expand Down
97 changes: 88 additions & 9 deletions lib/enrichment/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ pub struct TableRegistry {
}

impl TableRegistry {
/// Load the given Enrichment Tables into the registry.
/// Load the given Enrichment Tables into the registry. This can be new tables
/// loaded from the config, or tables that need to be reloaded because the
/// underlying data has changed.
///
/// If there are no tables currently loaded into the registry, this is a
/// simple operation, we simply load the tables into the `loading` field.
Expand All @@ -69,9 +71,6 @@ impl TableRegistry {
/// Once loading is complete, the data is swapped out of `loading` and we
/// return to a single copy of the tables.
///
/// TODO This function currently does nothing to reload the the underlying
/// data should it have changed in the enrichment source.
///
/// # Panics
///
/// Panics if the Mutex is poisoned.
Expand All @@ -80,11 +79,13 @@ impl TableRegistry {
let existing = self.tables.load();
if let Some(existing) = &**existing {
// We already have some tables
tables.extend(
existing
.iter()
.map(|(key, value)| (key.clone(), value.clone())),
);
let extend = existing
.iter()
.filter(|(key, _)| !tables.contains_key(*key))
.map(|(key, value)| (key.clone(), value.clone()))
.collect::<HashMap<_, _>>();

tables.extend(extend);
}
match *loading {
None => *loading = Some(tables),
Expand Down Expand Up @@ -151,6 +152,30 @@ impl TableRegistry {
pub fn as_readonly(&self) -> TableSearch {
TableSearch(self.tables.clone())
}

/// Returns the indexes that have been applied to the given table.
/// If the table is reloaded we need these to reapply them to the new reloaded tables.
pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
match &**self.tables.load() {
Some(tables) => tables
.get(table)
.map(|table| table.index_fields())
.unwrap_or_default(),
None => Vec::new(),
}
}

/// Checks if the table needs reloading.
/// If in doubt (the table isn't in our list) we return true.
pub fn needs_reload(&self, table: &str) -> bool {
match &**self.tables.load() {
Some(tables) => tables
.get(table)
.map(|table| table.needs_reload())
.unwrap_or(true),
None => true,
}
}
}

impl std::fmt::Debug for TableRegistry {
Expand Down Expand Up @@ -370,4 +395,58 @@ mod tests {

assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
}

#[test]
fn reloads_existing_tables() {
let mut tables: TableMap = HashMap::new();
tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));

let registry = super::TableRegistry::default();
registry.load(tables);
registry.finish_load();

// After we finish load there are no tables in the list
assert!(registry.table_ids().is_empty());

let mut new_data = BTreeMap::new();
new_data.insert("thing".to_string(), Value::Null);

let mut tables: TableMap = HashMap::new();
tables.insert(
"dummy2".to_string(),
Box::new(DummyEnrichmentTable::new_with_data(new_data)),
);

// A load should put both tables back into the list.
registry.load(tables);
let tables = registry.loading.lock().unwrap();
let tables = tables.clone().unwrap();

// dummy1 should still have old data.
assert_eq!(
Value::from("result"),
tables
.get("dummy1")
.unwrap()
.find_table_row(Case::Sensitive, &Vec::new(), None, None)
.unwrap()
.get("field")
.cloned()
.unwrap()
);

// dummy2 should have new data.
assert_eq!(
Value::Null,
tables
.get("dummy2")
.unwrap()
.find_table_row(Case::Sensitive, &Vec::new(), None, None)
.unwrap()
.get("thing")
.cloned()
.unwrap()
);
}
}
15 changes: 15 additions & 0 deletions lib/enrichment/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ impl DummyEnrichmentTable {
indexes,
}
}

pub(crate) fn new_with_data(data: BTreeMap<String, Value>) -> Self {
Self {
data,
indexes: Default::default(),
}
}
}

impl Table for DummyEnrichmentTable {
Expand Down Expand Up @@ -53,6 +60,14 @@ impl Table for DummyEnrichmentTable {
indexes.push(fields.iter().map(|s| (*s).to_string()).collect());
Ok(IndexHandle(indexes.len() - 1))
}

fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
Vec::new()
}

fn needs_reload(&self) -> bool {
false
}
}

/// Create a table registry with dummy data
Expand Down
2 changes: 1 addition & 1 deletion lib/k8s-e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ futures = "0.3"
k8s-openapi = { version = "0.13.0", default-features = false, features = ["v1_16"] }
k8s-test-framework = { version = "0.1", path = "../k8s-test-framework" }
regex = "1"
reqwest = { version = "0.11.4", features = ["json"] }
reqwest = { version = "0.11.5", features = ["json"] }
serde_json = "1"
tokio = { version = "1.12.0", features = ["full"] }
indoc = "1.0.3"
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-api-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ tokio-stream = { version = "0.1.7", features = ["sync"] }
graphql_client = "0.10.0"

# HTTP / WebSockets
reqwest = { version = "0.11.4", features = ["json"] }
reqwest = { version = "0.11.5", features = ["json"] }
tokio-tungstenite = { version = "0.13.0", features = ["tls"] }

# External libs
Expand Down
Loading

0 comments on commit 79c266f

Please sign in to comment.