Skip to content

Commit

Permalink
feat: expr analyzer for buffer to filter table chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj committed Jan 20, 2025
1 parent 1d8d3d6 commit bab428f
Show file tree
Hide file tree
Showing 5 changed files with 1,044 additions and 358 deletions.
219 changes: 216 additions & 3 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,38 @@ pub mod paths;
pub mod persister;
pub mod write_buffer;

use anyhow::Context;
use async_trait::async_trait;
use data_types::{NamespaceName, TimestampMinMax};
use datafusion::{catalog::Session, error::DataFusionError, prelude::Expr};
use datafusion::{
catalog::Session,
common::{Column, DFSchema},
error::DataFusionError,
execution::context::ExecutionProps,
logical_expr::interval_arithmetic::Interval,
physical_expr::{
analyze, create_physical_expr,
utils::{Guarantee, LiteralGuarantee},
AnalysisContext, ExprBoundaries,
},
prelude::Expr,
scalar::ScalarValue,
};
use hashbrown::{HashMap, HashSet};
use influxdb3_cache::{
distinct_cache::{CreateDistinctCacheArgs, DistinctCacheProvider},
last_cache::LastCacheProvider,
};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema};
use influxdb3_catalog::catalog::{Catalog, CatalogSequenceNumber, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, ParquetFileId, SerdeVecMap, TableId};
use influxdb3_wal::{
DistinctCacheDefinition, LastCacheDefinition, SnapshotSequenceNumber, Wal,
WalFileSequenceNumber,
};
use iox_query::QueryChunk;
use iox_time::Time;
use observability_deps::tracing::debug;
use schema::{InfluxColumnType, TIME_COLUMN_NAME};
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
Expand All @@ -41,6 +58,9 @@ pub enum Error {

#[error("persister error: {0}")]
Persister(#[from] persister::Error),

#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -90,7 +110,16 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
fn wal(&self) -> Arc<dyn Wal>;

/// Returns the parquet files for a given database and table
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile>;
fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.parquet_files_filtered(db_id, table_id, &BufferFilter::default())
}

fn parquet_files_filtered(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
) -> Vec<ParquetFile>;

/// A channel to watch for when new persisted snapshots are created
fn watch_persisted_snapshots(&self) -> tokio::sync::watch::Receiver<Option<PersistedSnapshot>>;
Expand Down Expand Up @@ -462,6 +491,190 @@ pub(crate) mod test_help {
}
}

/// A derived set of filters that are used to prune data in the buffer when serving queries
#[derive(Debug, Default)]
pub struct BufferFilter {
time_lower_bound: Option<i64>,
time_upper_bound: Option<i64>,
guarantees: HashMap<ColumnId, BufferGuarantee>,
}

#[derive(Debug)]
pub struct BufferGuarantee {
pub guarantee: Guarantee,
pub literals: HashSet<Arc<str>>,
}

impl BufferFilter {
/// Create a new `BufferFilter` given a [`TableDefinition`] and set of filter [`Expr`]s from
/// a logical query plan.
///
/// This method analyzes the incoming `exprs` to do two things:
///
/// - determine if there are any filters on the `time` column, in which case, attempt to derive
/// an interval that defines the boundaries on `time` from the query.
/// - determine if there are any _literal guarantees_ on tag columns contained in the filter
/// predicates of the query.
pub fn new(table_def: &Arc<TableDefinition>, exprs: &[Expr]) -> Result<Self> {
debug!(input = ?exprs, ">>> creating buffer filter");
let mut time_interval: Option<Interval> = None;
let arrow_schema = table_def.schema.as_arrow();
let time_col_index = arrow_schema
.fields()
.iter()
.position(|f| f.name() == TIME_COLUMN_NAME)
.context("table should have a time column")?;
let mut guarantees = HashMap::new();

// DF schema and execution properties used for handling physical expressions:
let df_schema = DFSchema::try_from(Arc::clone(&arrow_schema))
.context("table schema was not able to convert to datafusion schema")?;
let props = ExecutionProps::new();

for expr in exprs.iter().filter(|e| {
// NOTE: filter out most expression types, as they are not relevant to time bound
// analysis, or deriving literal guarantees on tag columns
matches!(
e,
Expr::BinaryExpr(_) | Expr::Not(_) | Expr::Between(_) | Expr::InList(_)
)
}) {
let Ok(physical_expr) = create_physical_expr(expr, &df_schema, &props) else {
continue;
};
// Check if the expression refers to the `time` column:
if expr
.column_refs()
.contains(&Column::new_unqualified(TIME_COLUMN_NAME))
{
// Determine time bounds, if provided:
let boundaries = ExprBoundaries::try_new_unbounded(&arrow_schema)
.context("unable to create unbounded expr boundaries on incoming expression")?;
let mut analysis = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
&arrow_schema,
)
.context("unable to analyze provided filters for a boundary on the time column")?;

// Set the boundaries on the time column using the evaluated interval, if it exisxts
// If an interval was already derived from a previous expression, we take their
// intersection, or produce an error if:
// - the derived intervals are not compatible (different types)
// - the derived intervals do not intersect, this should be a user error, i.e., a
// poorly formed query
if let Some(ExprBoundaries { interval, .. }) = (time_col_index
< analysis.boundaries.len())
.then_some(analysis.boundaries.remove(time_col_index))
{
if let Some(existing) = time_interval.take() {
let intersection = existing.intersect(interval).context(
"failed to derive a time interval from provided filters",
)?.context("provided filters on time column did not produce a valid set of boundaries")?;
time_interval.replace(intersection);
} else {
time_interval.replace(interval);
}
}
}

// Determine any literal guarantees made on tag columns:
let literal_guarantees = LiteralGuarantee::analyze(&physical_expr);
for LiteralGuarantee {
column,
guarantee,
literals,
} in literal_guarantees
{
// We are only interested in literal guarantees on tag columns for the buffer index:
let Some((column_id, InfluxColumnType::Tag)) = table_def
.column_definition(column.name())
.map(|def| (def.id, def.data_type))
else {
continue;
};

// We are only interested in string literals with respect to tag columns:
let literals = literals
.into_iter()
.filter_map(|l| match l {
ScalarValue::Utf8(Some(s)) | ScalarValue::Utf8View(Some(s)) => {
Some(Arc::<str>::from(s.as_str()))
}
_ => None,
})
.collect::<HashSet<Arc<str>>>();

if literals.is_empty() {
continue;
}

// Update the guarantees on this column. We handle multiple guarantees here, i.e.,
// if there are multiple Expr's that lead to multiple guarantees on a given column.
guarantees
.entry(column_id)
.and_modify(|e: &mut BufferGuarantee| {
debug!(current = ?e.guarantee, incoming = ?guarantee, ">>> updating existing guarantee");
use Guarantee::*;
match (e.guarantee, guarantee) {
(In, In) | (NotIn, NotIn) => {
e.literals = e.literals.union(&literals).cloned().collect()
}
(In, NotIn) => {
e.literals = e.literals.difference(&literals).cloned().collect()
}
(NotIn, In) => {
e.literals = literals.difference(&e.literals).cloned().collect()
}
}
})
.or_insert(BufferGuarantee {
guarantee,
literals,
});
debug!(?guarantees, ">>> updated guarantees");
}
}

// Determine the lower and upper bound from the derived interval on time:
let (time_lower_bound, time_upper_bound) = if let Some(i) = time_interval {
let low = if let ScalarValue::TimestampNanosecond(Some(l), _) = i.lower() {
Some(*l)
} else {
None
};
let high = if let ScalarValue::TimestampNanosecond(Some(h), _) = i.upper() {
Some(*h)
} else {
None
};

(low, high)
} else {
(None, None)
};

Ok(Self {
time_lower_bound,
time_upper_bound,
guarantees,
})
}

pub fn test_time_stamp_min_max(&self, min: i64, max: i64) -> bool {
match (self.time_lower_bound, self.time_upper_bound) {
(None, None) => true,
(None, Some(u)) => min <= u,
(Some(l), None) => max >= l,
(Some(l), Some(u)) => min <= u && max >= l,
}
}

pub fn guarantees(&self) -> impl Iterator<Item = (&ColumnId, &BufferGuarantee)> {
self.guarantees.iter()
}
}

#[cfg(test)]
mod tests {
use influxdb3_catalog::catalog::CatalogSequenceNumber;
Expand Down
46 changes: 30 additions & 16 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::write_buffer::queryable_buffer::QueryableBuffer;
use crate::write_buffer::validator::WriteValidator;
use crate::{chunk::ParquetChunk, DatabaseManager};
use crate::{
BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager, LastCacheManager,
ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
BufferFilter, BufferedWriteRequest, Bufferer, ChunkContainer, DistinctCacheManager,
LastCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError,
};
use async_trait::async_trait;
use data_types::{
Expand Down Expand Up @@ -319,30 +319,35 @@ impl WriteBufferImpl {
DataFusionError::Execution(format!("database {} not found", database_name))
})?;

let (table_id, table_schema) =
db_schema.table_id_and_schema(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;
let table_def = db_schema.table_definition(table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"table {} not found in db {}",
table_name, database_name
))
})?;

let buffer_filter = BufferFilter::new(&table_def, filters)
.inspect_err(|error| warn!(?error, "buffer filter generation failed"))
.map_err(|error| DataFusionError::External(Box::new(error)))?;

let mut chunks = self.buffer.get_table_chunks(
Arc::clone(&db_schema),
table_name,
filters,
&buffer_filter,
projection,
ctx,
)?;

let parquet_files = self.persisted_files.get_files(db_schema.id, table_id);
let parquet_files =
self.persisted_files
.get_files(db_schema.id, table_def.table_id, &buffer_filter);

let mut chunk_order = chunks.len() as i64;

for parquet_file in parquet_files {
let parquet_chunk = parquet_chunk_from_file(
&parquet_file,
&table_schema,
&table_def.schema,
self.persister.object_store_url().clone(),
self.persister.object_store(),
chunk_order,
Expand Down Expand Up @@ -427,8 +432,13 @@ impl Bufferer for WriteBufferImpl {
Arc::clone(&self.wal)
}

fn parquet_files(&self, db_id: DbId, table_id: TableId) -> Vec<ParquetFile> {
self.buffer.persisted_parquet_files(db_id, table_id)
fn parquet_files_filtered(
&self,
db_id: DbId,
table_id: TableId,
filter: &BufferFilter,
) -> Vec<ParquetFile> {
self.buffer.persisted_parquet_files(db_id, table_id, filter)
}

fn watch_persisted_snapshots(&self) -> Receiver<Option<PersistedSnapshot>> {
Expand Down Expand Up @@ -2092,7 +2102,9 @@ mod tests {
verify_snapshot_count(1, &wbuf.persister).await;

// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id);
let persisted_files =
wbuf.persisted_files()
.get_files(db_id, tbl_id, &BufferFilter::default());
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());

Expand Down Expand Up @@ -2198,7 +2210,9 @@ mod tests {
verify_snapshot_count(1, &wbuf.persister).await;

// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_id, tbl_id);
let persisted_files =
wbuf.persisted_files()
.get_files(db_id, tbl_id, &BufferFilter::default());
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());

Expand Down
Loading

0 comments on commit bab428f

Please sign in to comment.