Skip to content

Commit

Permalink
feat: expr analyzer for buffer to filter table chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 20, 2025
1 parent 1d8d3d6 commit daa3fe7
Show file tree
Hide file tree
Showing 5 changed files with 590 additions and 355 deletions.
181 changes: 178 additions & 3 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,37 @@ pub mod paths;
pub mod persister;
pub mod write_buffer;

use anyhow::Context;
use async_trait::async_trait;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr};
use datafusion::{
catalog::Session,
common::{Column, DFSchema},
error::DataFusionError,
execution::context::ExecutionProps,
physical_expr::{
analyze, create_physical_expr,
utils::{Guarantee, LiteralGuarantee},
AnalysisContext, ExprBoundaries,
},
prelude::Expr,
scalar::ScalarValue,
};
use hashbrown::{HashMap, HashSet};
use influxdb3_cache::{
distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider},
last_cache::LastCacheProvider,
};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal,
WalFileSequenceNumber,
};
use iox_query::QueryChunk;
use iox_time::Time;
use observability_deps::tracing::{debug, info, warn};
use schema::{InfluxColumnType, TIME_COLUMN_NAME};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
Expand All @@ -41,6 +57,9 @@ pub enum Error {

#[error("persister error: {0}")]
Persister(#[from] persister::Error),

#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -90,7 +109,16 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
fn wal(&self) -> Arc<dyn Wal>;

/// Returns the parquet files for a given database and table
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile>;
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.parquet_files_filtered(db_id, table_id, &BufferFilter::default())
}

fn parquet_files_filtered(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
) -> Vec<ParquetFile>;

/// A channel to watch for when new persisted snapshots are created
fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>>;
Expand Down Expand Up @@ -462,6 +490,153 @@ pub(crate) mod test_help {
}
}

#[derive(Debug, Default)]
pub struct BufferFilter {
time_lower_bound: Option<i64>,
time_upper_bound: Option<i64>,
guarantees: HashMap<ColumnId, BufferGuarantee>,
}

#[derive(Debug)]
pub struct BufferGuarantee {
pub guarantee: Guarantee,
pub literals: HashSet<Arc<str>>,
}

impl BufferFilter {
pub fn generate(table_def: &Arc<TableDefinition>, exprs: &[Expr]) -> Result<Self> {
let mut time_lower_bound = None;
let mut time_upper_bound = None;
let arrow_schema = table_def.schema.as_arrow();
let mut guarantees = HashMap::new();
let df_schema = DFSchema::try_from(Arc::clone(&arrow_schema))
.context("table schema was not able to convert to datafusion schema")?;
let props = ExecutionProps::new();
info!(?exprs, "analyzing expressions");
for expr in exprs.iter().filter(|e| {
matches!(
e,
Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_) | Expr::InList(_)
)
}) {
let Ok(physical_expr) = create_physical_expr(expr, &df_schema, &props) else {
continue;
};
if expr
.column_refs()
.contains(&Column::new_unqualified(TIME_COLUMN_NAME))
{
debug!(">>> has time col expr");
let time_col_index = arrow_schema
.fields()
.iter()
.position(|f| f.name() == TIME_COLUMN_NAME)
.expect("table should have a time column");
// Determine time bounds, if provided:
let boundaries = ExprBoundaries::try_new_unbounded(&arrow_schema)
.context("unable to create unbounded expr boundaries on incoming expression")?;
let analysis = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
&arrow_schema,
)
.inspect_err(|error| {
warn!(?physical_expr, ?arrow_schema, ?error, "failed to analyze")
})
.context("unable to analyze provided filters")?;
// Set the time boundaries by the analyzed expression, if they have not already been
// set. If they have been set, we remove the bounds, because it is not clear how to
// evaluate multiple intervals.
if let Some(ExprBoundaries { interval, .. }) =
analysis.boundaries.get(time_col_index)
{
debug!(?interval, ">>> got the interval");
if let ScalarValue::TimestampNanosecond(Some(lower), _) = interval.lower() {
if time_lower_bound.take().is_none() {
time_lower_bound.replace(*lower);
}
}
if let ScalarValue::TimestampNanosecond(Some(upper), _) = interval.upper() {
if time_upper_bound.take().is_none() {
time_upper_bound.replace(*upper);
}
}
}
}

// Determine any literal guarantees made on tag columns:
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr);
for LiteralGuarantee {
column,
guarantee,
literals,
} in literal_guarantees
{
// NOTE: only retaining string literals for matching
// on tag columns for the buffer index:
let Some((column_id, InfluxColumnType::Tag)) = table_def
.column_definition(column.name())
.map(|def| (def.id, def.data_type))
else {
continue;
};
let literals = literals
.into_iter()
.filter_map(|l| match l {
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => {
Some(Arc::<str>::from(s.as_str()))
}
_ => None,
})
.collect::<HashSet<Arc<str>>>();
guarantees
.entry(column_id)
.and_modify(|e: &mut BufferGuarantee| {
// NOTE: it seems unlikely that there would be
// multiple literal guarantees on a single
// column from the Expr set. But we handle
// that here:
use Guarantee::*;
match (e.guarantee, guarantee) {
(In, In) | (NotIn, NotIn) => {
e.literals = e.literals.union(&literals).cloned().collect()
}
(In, NotIn) => {
e.literals = e.literals.difference(&literals).cloned().collect()
}
(NotIn, In) => {
e.literals = literals.difference(&e.literals).cloned().collect()
}
}
})
.or_insert(BufferGuarantee {
guarantee,
literals,
});
}
}

Ok(Self {
time_lower_bound,
time_upper_bound,
guarantees,
})
}

pub fn test_time_stamp_min_max(&self, min: i64, max: i64) -> bool {
match (self.time_lower_bound, self.time_upper_bound) {
(None, None) => true,
(None, Some(u)) => min <= u,
(Some(l), None) => max >= l,
(Some(l), Some(u)) => min <= u && max >= l,
}
}

pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &BufferGuarantee)> {
self.guarantees.iter()
}
}

#[cfg(test)]
mod tests {
use influxdb3_catalog::catalog::CatalogSequenceNumber;
Expand Down
46 changes: 30 additions & 16 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager,
ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
BufferFilter, BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager,
LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
};
use async_trait::async_trait;
use data_types::{
Expand Down Expand Up @@ -319,30 +319,35 @@ impl WriteBufferImpl {
DataFusionError::Execution(format!("database {} not found", database_name))
})?;

let (table_id, table_schema) =
db_schema.table_id_and_schema(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;
let table_def = db_schema.table_definition(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;

let buffer_filter = BufferFilter::generate(&table_def, filters)
.inspect_err(|error| warn!(?error, "buffer filter generation failed"))
.map_err(|error| DataFusionError::External(Box::new(error)))?;

let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
table_name,
filters,
&buffer_filter,
projection,
ctx,
)?;

let parquet_files = self.persisted_files.get_files(db_schema.id, table_id);
let parquet_files =
self.persisted_files
.get_files(db_schema.id, table_def.table_id, &buffer_filter);

let mut chunk_order = chunks.len() as i64;

for parquet_file in parquet_files {
let parquet_chunk = parquet_chunk_from_file(
&parquet_file,
&table_schema,
&table_def.schema,
self.persister.object_store_url().clone(),
self.persister.object_store(),
chunk_order,
Expand Down Expand Up @@ -427,8 +432,13 @@ impl Bufferer for WriteBufferImpl {
Arc::clone(&self.wal)
}

fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.buffer.persisted_parquet_files(db_id, table_id)
fn parquet_files_filtered(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
) -> Vec<ParquetFile> {
self.buffer.persisted_parquet_files(db_id, table_id, filter)
}

fn watch_persisted_snapshots(&self) -> Receiver<Option<PersistedSnapshot>> {
Expand Down Expand Up @@ -2092,7 +2102,9 @@ mod tests {
verify_snapshot_count(1, &wbuf.persister).await;

// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id);
let persisted_files =
wbuf.persisted_files()
.get_files(db_id, tbl_id, &BufferFilter::default());
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());

Expand Down Expand Up @@ -2198,7 +2210,9 @@ mod tests {
verify_snapshot_count(1, &wbuf.persister).await;

// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id);
let persisted_files =
wbuf.persisted_files()
.get_files(db_id, tbl_id, &BufferFilter::default());
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());

Expand Down
9 changes: 8 additions & 1 deletion influxdb3_write/src/write_buffer/persisted_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! When queries come in they will combine whatever chunks exist from `QueryableBuffer` with
//! the persisted files to get the full set of data to query.
use crate::BufferFilter;
use crate::{ParquetFile, PersistedSnapshot};
use hashbrown::HashMap;
use influxdb3_id::DbId;
Expand Down Expand Up @@ -47,7 +48,12 @@ impl PersistedFiles {
}

/// Get the list of files for a given database and table, always return in descending order of min_time
pub fn get_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
pub fn get_files(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
) -> Vec<ParquetFile> {
let three_days_ago = (self.time_provider.now() - crate::THREE_DAYS).timestamp_nanos();
let mut files = {
let inner = self.inner.read();
Expand All @@ -58,6 +64,7 @@ impl PersistedFiles {
.cloned()
.unwrap_or_default()
.into_iter()
.filter(|file| filter.test_time_stamp_min_max(file.min_time, file.max_time))
.filter(|file| dbg!(file.min_time) > dbg!(three_days_ago))
.collect::<Vec<_>>()
};
Expand Down
Loading

0 comments on commit daa3fe7

Please sign in to comment.