From 371066d8b5f572be66111925ad03211c976878ad Mon Sep 17 00:00:00 2001 From: suremarc <8771538+suremarc@users.noreply.github.com> Date: Thu, 26 Dec 2024 20:03:37 +0000 Subject: [PATCH] port MV dependency code --- Cargo.toml | 1 + src/materialized.rs | 43 +- src/materialized/dependencies.rs | 1494 ++++++++++++++++++++++++++++ src/materialized/file_metadata.rs | 78 +- src/materialized/hive_partition.rs | 2 +- src/materialized/row_metadata.rs | 126 ++- src/materialized/util.rs | 24 + 7 files changed, 1687 insertions(+), 81 deletions(-) create mode 100644 src/materialized/util.rs diff --git a/Cargo.toml b/Cargo.toml index ff06e56..00ad412 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ datafusion-common = "43" datafusion-expr = "43" datafusion-functions = "43" datafusion-functions-aggregate = "43" +datafusion-optimizer = "43" datafusion-physical-expr = "43" datafusion-physical-plan = "43" datafusion-sql = "43" diff --git a/src/materialized.rs b/src/materialized.rs index 0151920..98ef0c9 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -16,7 +16,7 @@ // under the License. /// Track dependencies of materialized data in object storage -mod dependencies; +pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance pub mod row_metadata; @@ -27,6 +27,9 @@ pub mod file_metadata; /// A UDF that parses Hive partition elements from object storage paths. mod hive_partition; +/// Some private utility functions +mod util; + use std::{ any::{type_name, Any, TypeId}, fmt::Debug, @@ -95,6 +98,21 @@ pub trait Materialized: ListingTableLike { fn query(&self) -> LogicalPlan; } +/// Register a [`Materialized`] implementation in this registry. +/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`] +/// into a [`Materialized`] where possible. +/// +/// Note that this will also register `T` as a [`ListingTableLike`]. +pub fn register_materialized() { + TABLE_TYPE_REGISTRY.register_materialized::(); +} + +/// Attempt to cast the given TableProvider into a [`Materialized`]. +/// If the table's type has not been registered using [`register_materialized`], will return `None`. +pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> { + TABLE_TYPE_REGISTRY.cast_to_materialized(table) +} + type Downcaster = Arc Option<&T> + Send + Sync>; /// A registry for implementations of [`ListingTableLike`], used for downcasting @@ -104,6 +122,7 @@ type Downcaster = Arc Option<&T> + Send + Sync>; /// By default, [`ListingTable`] is registered. struct TableTypeRegistry { listing_table_accessors: DashMap)>, + materialized_accessors: DashMap)>, } impl Debug for TableTypeRegistry { @@ -125,6 +144,7 @@ impl Default for TableTypeRegistry { fn default() -> Self { let new = Self { listing_table_accessors: DashMap::new(), + materialized_accessors: DashMap::new(), }; new.register_listing_table::(); @@ -143,6 +163,18 @@ impl TableTypeRegistry { ); } + fn register_materialized(&self) { + self.materialized_accessors.insert( + TypeId::of::(), + ( + type_name::(), + Arc::new(|any| any.downcast_ref::().map(|t| t as &dyn Materialized)), + ), + ); + + self.register_listing_table::(); + } + fn cast_to_listing_table<'a>( &'a self, table: &'a dyn TableProvider, @@ -151,4 +183,13 @@ impl TableTypeRegistry { .get(&table.as_any().type_id()) .and_then(|r| r.value().1(table.as_any())) } + + fn cast_to_materialized<'a>( + &'a self, + table: &'a dyn TableProvider, + ) -> Option<&'a dyn Materialized> { + self.materialized_accessors + .get(&table.as_any().type_id()) + .and_then(|r| r.value().1(table.as_any())) + } } diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index b248758..d3bb829 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -14,3 +14,1497 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + +use datafusion::{ + catalog::CatalogProviderList, + config::{CatalogOptions, ConfigOptions}, + datasource::{function::TableFunctionImpl, TableProvider, ViewTable}, + prelude::{flatten, get_field, make_array}, +}; +use datafusion_common::{ + alias::AliasGenerator, + internal_err, + tree_node::{Transformed, TreeNode}, + DFSchema, DataFusionError, Result, ScalarValue, +}; +use datafusion_expr::{col, lit, utils::split_conjunction, Expr, LogicalPlan, TableScan}; +use datafusion_functions::string::expr_fn::{concat, concat_ws}; +use datafusion_optimizer::{analyzer::expand_wildcard_rule::ExpandWildcardRule, AnalyzerRule}; +use datafusion_sql::TableReference; +use itertools::{Either, Itertools}; +use std::{collections::HashSet, sync::Arc}; + +use crate::materialized::META_COLUMN; + +use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized}; + +/// Table function that shows build targets and dependencies for a materialized view. +pub fn file_dependencies( + catalog_list: Arc, + row_metadata_registry: Arc, + options: &ConfigOptions, +) -> Arc { + Arc::new(FileDependenciesUdtf::new( + catalog_list, + row_metadata_registry, + options, + )) +} + +#[derive(Debug)] +struct FileDependenciesUdtf { + catalog_list: Arc, + config_options: ConfigOptions, + row_metadata_registry: Arc, +} + +impl FileDependenciesUdtf { + fn new( + catalog_list: Arc, + row_metadata_registry: Arc, + options: &ConfigOptions, + ) -> Self { + Self { + catalog_list, + config_options: options.clone(), + row_metadata_registry, + } + } +} + +impl TableFunctionImpl for FileDependenciesUdtf { + fn call(&self, args: &[Expr]) -> Result> { + let table_name = get_table_name(args)?; + + let table_ref = TableReference::from(table_name).resolve( + &self.config_options.catalog.default_catalog, + &self.config_options.catalog.default_schema, + ); + + let table = util::get_table(self.catalog_list.as_ref(), &table_ref) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + + let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan( + "file_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(), + ))?; + + Ok(Arc::new(ViewTable::try_new( + expected_source_files_by_target( + mv, + &self.config_options, + self.row_metadata_registry.clone(), + )?, + None, + )?)) + } +} + +/// Extract table name from args passed to TableFunctionImpl::call() +fn get_table_name(args: &[Expr]) -> Result<&String> { + match &args[0] { + Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name), + _ => Err(DataFusionError::Plan( + "expected a single string literal argument to file_dependencies".to_string(), + )), + } +} + +/// List expected target partitions for this materialized view, +/// together with the dependencies for each partition. +pub fn expected_source_files_by_target( + materialized_view: &dyn Materialized, + config_options: &ConfigOptions, + row_metadata_registry: Arc, +) -> Result { + use datafusion_expr::logical_plan::*; + + let plan = materialized_view.query().clone(); + + let partition_cols = materialized_view.partition_columns(); + let partition_col_indices = plan + .schema() + .fields() + .iter() + .enumerate() + .filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i)) + .collect(); + + // First expand all wildcards + let plan = ExpandWildcardRule {}.analyze(plan, config_options)?; + + // Prune non-partition columns from all table scans + let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?; + + // Now bubble up file metadata to the top of the plan + let pruned_plan_with_source_files = + push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)?; + + // We now have data in the following form: + // (partition_col0, partition_col1, ..., __meta) + // The last column is a list of structs containing the row metadata + // We need to unnest it + + // Find the single column with the name '__meta' + let files = pruned_plan_with_source_files + .schema() + .columns() + .into_iter() + .find(|c| c.name.starts_with(META_COLUMN)) + .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?; + let files_col = Expr::Column(files.clone()); + + LogicalPlanBuilder::from(pruned_plan_with_source_files) + .unnest_column(files)? + .project(vec![ + construct_target_path_from_partition_columns(materialized_view).alias("target"), + get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), + get_field(files_col.clone(), "table_schema").alias("source_table_schema"), + get_field(files_col.clone(), "table_name").alias("source_table_name"), + get_field(files_col.clone(), "source_uri").alias("source_uri"), + get_field(files_col.clone(), "last_modified").alias("source_last_modified"), + ])? + .distinct()? + .build() +} + +fn construct_target_path_from_partition_columns(materialized_view: &dyn Materialized) -> Expr { + let table_path = lit(materialized_view.table_paths()[0] + .as_str() + // Trim the / (we'll add it back later if we need it) + .trim_end_matches("/")); + // Construct the paths for the build targets + let mut hive_column_path_elements = materialized_view + .partition_columns() + .iter() + .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec())) + .collect::>(); + hive_column_path_elements.insert(0, table_path); + + concat(vec![ + // concat_ws doesn't work if there are < 2 elements to concat + if hive_column_path_elements.len() == 1 { + hive_column_path_elements.pop().unwrap() + } else { + concat_ws(lit("/"), hive_column_path_elements) + }, + // Always need a trailing slash on directory paths + lit("/"), + ]) +} + +/// An implementation of "inexact" projection pushdown that eliminates aggregations, windows, sorts, & limits. +/// Does not preserve order or row multiplicity and may return rows outside of the original projection. +/// However, it has the following property: +/// Let P be a projection operator. +/// If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. +/// +/// The purpose is to be as aggressive as possible with projection pushdown at the sacrifice of exactness. +fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> Result { + use datafusion_expr::logical_plan::*; + + let plan_formatted = format!("{}", plan.display()); + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let new_exprs = expr + .into_iter() + .enumerate() + .filter_map(|(i, expr)| indices.contains(&i).then_some(expr)) + .collect_vec(); + + let child_indices = new_exprs + .iter() + .flat_map(|e| e.column_refs().into_iter()) + .map(|c| input.schema().index_of_column(c).unwrap()) + .collect::>(); + + Projection::try_new( + new_exprs, + pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Projection) + } + LogicalPlan::Filter(ref filter) => { + let mut indices = indices.clone(); + + let new_filter = widen_filter(&filter.predicate, &mut indices, &plan)?; + + let filter = match plan { + LogicalPlan::Filter(filter) => filter, + _ => unreachable!(), + }; + + Filter::try_new( + new_filter, + pushdown_projection_inexact(Arc::unwrap_or_clone(filter.input), &indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Filter) + } + LogicalPlan::Window(Window { + input, + window_expr: _, + .. + }) => { + // Window nodes take their input and append window expressions to the end. + // If our projection doesn't include window expressions, we can just turn + // the window into a regular projection. + let num_non_window_cols = input.schema().fields().len(); + if indices.iter().any(|&i| i >= num_non_window_cols) { + return internal_err!("Can't push down projection through window functions"); + } + + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + } + LogicalPlan::Aggregate(Aggregate { + input, group_expr, .. + }) => { + // Aggregate node schemas are the GROUP BY expressions followed by the aggregate expressions. + let num_group_exprs = group_expr.len(); + if indices.iter().any(|&i| i >= num_group_exprs) { + return internal_err!("Can't push down projection through aggregate functions"); + } + + let new_exprs = group_expr + .into_iter() + .enumerate() + .filter_map(|(i, expr)| indices.contains(&i).then_some(expr)) + .collect_vec(); + + let child_indices = new_exprs + .iter() + .flat_map(|e| e.column_refs().into_iter()) + .map(|c| input.schema().index_of_column(c).unwrap()) + .collect::>(); + + Projection::try_new( + new_exprs, + pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices) + .map(Arc::new)?, + ) + .map(LogicalPlan::Projection) + } + LogicalPlan::Join(ref join) => { + let join_type = join.join_type; + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {} + _ => { + return Err(DataFusionError::Internal(format!( + "unsupported join type: {join_type}" + ))) + } + }; + + let mut indices = indices.clone(); + + // Relax the filter so that it can be computed from the + // "pruned" children + let filter = join + .filter + .as_ref() + .map(|f| widen_filter(f, &mut indices, &plan)) + .transpose()?; + + let (mut left_child_indices, mut right_child_indices) = + indices.iter().partition_map(|&i| { + if i < join.left.schema().fields().len() { + Either::Left(i) + } else { + Either::Right(i - join.left.schema().fields().len()) + } + }); + + let on = join.on.iter().try_fold(vec![], |mut v, (lexpr, rexpr)| { + // The ON clause includes filters like `lexpr = rexpr` + // If either side is considered 'relevant', we include it. + // See documentation for [`expr_is_relevant`]. + if expr_is_relevant(lexpr, &left_child_indices, &join.left)? + || expr_is_relevant(rexpr, &right_child_indices, &join.right)? + { + add_all_columns_to_indices(lexpr, &mut left_child_indices, &join.left)?; + add_all_columns_to_indices(rexpr, &mut right_child_indices, &join.right)?; + v.push((lexpr.clone(), rexpr.clone())) + } + + Ok::<_, DataFusionError>(v) + })?; + + let join = match plan { + LogicalPlan::Join(join) => join, + _ => unreachable!(), + }; + + let left = + pushdown_projection_inexact(Arc::unwrap_or_clone(join.left), &left_child_indices) + .map(Arc::new)?; + let right = + pushdown_projection_inexact(Arc::unwrap_or_clone(join.right), &right_child_indices) + .map(Arc::new)?; + + let schema = project_dfschema(join.schema.as_ref(), &indices).map(Arc::new)?; + + Ok(LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + schema, + ..join + })) + } + LogicalPlan::Union(Union { inputs, schema, .. }) => { + let inputs = inputs + .into_iter() + .map(Arc::unwrap_or_clone) + .map(|plan| pushdown_projection_inexact(plan, indices)) + .map_ok(Arc::new) + .collect::>>()?; + + Ok(LogicalPlan::Union(Union { + inputs, + schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?, + })) + } + LogicalPlan::TableScan(ref scan) => { + let mut indices = indices.clone(); + let filters = scan + .filters + .iter() + .map(|f| widen_filter(f, &mut indices, &plan)) + .collect::>>()?; + + let new_projection = scan + .projection + .clone() + .unwrap_or((0..scan.source.schema().fields().len()).collect()) + .into_iter() + .enumerate() + .filter_map(|(i, j)| indices.contains(&i).then_some(j)) + .collect_vec(); + + let scan = match plan { + LogicalPlan::TableScan(scan) => scan, + _ => unreachable!(), + }; + + TableScan::try_new( + scan.table_name, + scan.source, + Some(new_projection), + filters, + None, + ) + .map(LogicalPlan::TableScan) + } + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema, + }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?, + })), + LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => SubqueryAlias::try_new( + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices).map(Arc::new)?, + alias, + ) + .map(LogicalPlan::SubqueryAlias), + LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) => { + // Ignore sorts/limits entirely and remove them from the plan + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + } + LogicalPlan::Values(Values { schema, values }) => { + let schema = project_dfschema(&schema, indices).map(Arc::new)?; + let values = values + .into_iter() + .map(|row| { + row.into_iter() + .enumerate() + .filter_map(|(i, v)| indices.contains(&i).then_some(v)) + .collect_vec() + }) + .collect_vec(); + + Ok(LogicalPlan::Values(Values { schema, values })) + } + LogicalPlan::Distinct(Distinct::All(input)) => { + pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices) + .map(Arc::new) + .map(Distinct::All) + .map(LogicalPlan::Distinct) + } + LogicalPlan::Unnest(unnest) => { + // Map parent indices to child indices. + // The columns of an unnest node have a many-to-one relation + // to the columns of the input. + let child_indices = indices + .iter() + .map(|&i| unnest.dependency_indices[i]) + .collect::>(); + + let input_using_columns = unnest.input.using_columns()?; + let input_schema = unnest.input.schema(); + let columns_to_unnest = + unnest + .exec_columns + .into_iter() + .try_fold(vec![], |mut v, c| { + let c = c.normalize_with_schemas_and_ambiguity_check( + &[&[input_schema.as_ref()]], + &input_using_columns, + )?; + let idx = input_schema.index_of_column(&c)?; + if child_indices.contains(&idx) { + v.push(c); + } + + Ok::<_, DataFusionError>(v) + })?; + + let columns_to_project = unnest + .schema + .columns() + .into_iter() + .enumerate() + .filter_map(|(i, c)| indices.contains(&i).then_some(c)) + .map(Expr::Column) + .collect_vec(); + + LogicalPlanBuilder::from(pushdown_projection_inexact( + Arc::unwrap_or_clone(unnest.input), + &child_indices, + )?) + .unnest_columns_with_options(columns_to_unnest, unnest.options)? + .project(columns_to_project)? + .build() + } + + _ => internal_err!("Unsupported logical plan node: {}", plan.display()), + } + .map_err(|e| e.context(format!("plan: \n{plan_formatted}"))) +} + +/// 'Widen' a filter, i.e. given a predicate P, +/// compute P' such that P' is true whenever P is. +/// In particular, P' should be computed using columns whose indices are in `indices`. +/// +/// # Mutating `indices` +/// +/// Currently under some conditions this function will add new entries to `indices`. +/// This is particularly important in some cases involving joins. For example, +/// consider the following plan: +/// +/// ```ignore +/// Projection: t2.year, t2.month, t2.day, t2.feed, t2.column2, t3.column1 +/// Inner Join: Using t2.year = t3.year +/// TableScan: t2 +/// TableScan: t3 +/// ``` +/// +/// If we want to prune all parts of the plan not related to t2.year, we'd get something like this: +/// +/// ```ignore +/// Projection: t2.year +/// Inner Join: Using +/// TableScan: t2 projection=[year] +/// TableScan: t3 projection=[] +/// ``` +/// +/// Notice that the filter in the inner join is gone. This is because `t3.year` is not obviously referenced in the definition of `t2.year`; +/// it is only implicitly used in the join filter. +/// +/// To get around this, we look at filter expressions, and if they contain a _single_ column in the index set, +/// we add the rest of the columns from the filter to the index set, to ensure all of the filter's inputs +/// will be present. +fn widen_filter( + predicate: &Expr, + indices: &mut HashSet, + parent: &LogicalPlan, +) -> Result { + let conjunctions = split_conjunction(predicate); + + conjunctions.into_iter().try_fold(lit(true), |a, b| { + Ok(if expr_is_relevant(b, indices, parent)? { + add_all_columns_to_indices(b, indices, parent)?; + a.and(b.clone()) + } else { + a + }) + }) +} + +/// An expression is considered 'relevant' if a single column is inside our index set. +fn expr_is_relevant(expr: &Expr, indices: &HashSet, parent: &LogicalPlan) -> Result { + let schemas = parent + .inputs() + .iter() + .map(|input| input.schema().as_ref()) + .collect_vec(); + let using_columns = parent.using_columns()?; + + for c in expr.column_refs() { + let normalized_column = c + .clone() + .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?; + let column_idx = parent.schema().index_of_column(&normalized_column)?; + + if indices.contains(&column_idx) { + return Ok(true); + } + } + + Ok(false) +} + +/// Get all referenced columns in the expression, +/// and add them to the index set. +fn add_all_columns_to_indices( + expr: &Expr, + indices: &mut HashSet, + parent: &LogicalPlan, +) -> Result<()> { + let schemas = parent + .inputs() + .iter() + .map(|input| input.schema().as_ref()) + .collect_vec(); + let using_columns = parent.using_columns()?; + + for c in expr.column_refs() { + let normalized_column = c + .clone() + .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?; + let column_idx = parent.schema().index_of_column(&normalized_column)?; + + indices.insert(column_idx); + } + + Ok(()) +} + +fn project_dfschema(schema: &DFSchema, indices: &HashSet) -> Result { + let qualified_fields = (0..schema.fields().len()) + .filter_map(|i| { + indices.contains(&i).then_some({ + let (reference, field) = schema.qualified_field(i); + (reference.cloned(), Arc::new(field.clone())) + }) + }) + .collect_vec(); + + // todo: handle functional dependencies + DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone()) +} + +/// Rewrite TableScans on top of the file metadata table, +/// assuming the query only uses the S3 partition columns. +/// Then push up the file metadata to the output of this plan. +/// +/// The result will have a single new column with an autogenerated name "__meta_" +/// which contains the source file metadata for a given row in the output. +fn push_up_file_metadata( + plan: LogicalPlan, + catalog_options: &CatalogOptions, + row_metadata_registry: Arc, +) -> Result { + let alias_generator = AliasGenerator::new(); + plan.transform_up(|plan| { + match plan { + LogicalPlan::TableScan(scan) => { + scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry.clone()) + } + plan => project_row_metadata_from_input(plan, &alias_generator), + } + .and_then(LogicalPlan::recompute_schema) + .map(Transformed::yes) + }) + .map(|t| t.data) +} + +/// Assuming the input has any columns of the form "__meta_", +/// push up the file columns through the output of this LogicalPlan node. +/// The output will have a single new column of the form "__meta_". +fn project_row_metadata_from_input( + plan: LogicalPlan, + alias_generator: &AliasGenerator, +) -> Result { + use datafusion_expr::logical_plan::*; + + // find all file metadata columns and collapse them into one concatenated list + match plan { + LogicalPlan::Projection(Projection { expr, input, .. }) => { + let file_md_columns = input + .schema() + .columns() + .into_iter() + .filter_map(|c| c.name.starts_with(META_COLUMN).then_some(Expr::Column(c))) + .collect_vec(); + Projection::try_new( + expr.into_iter() + .chain(Some( + flatten(make_array(file_md_columns)) + .alias(alias_generator.next(META_COLUMN)), + )) + .collect_vec(), + input, + ) + .map(LogicalPlan::Projection) + } + _ => { + let plan = plan.recompute_schema()?; + let (file_md_columns, original_columns) = plan + .schema() + .columns() + .into_iter() + .partition::, _>(|c| c.name.starts_with(META_COLUMN)); + + Projection::try_new( + original_columns + .into_iter() + .map(Expr::Column) + .chain(Some( + flatten(make_array( + file_md_columns.into_iter().map(Expr::Column).collect_vec(), + )) + .alias(alias_generator.next(META_COLUMN)), + )) + .collect_vec(), + Arc::new(plan), + ) + .map(LogicalPlan::Projection) + } + } +} + +/// Turn a TableScan into an equivalent scan on the row metadata source, +/// assuming that every column in the table scan is a partition column; +/// also adds a new column to the TableScan, "__meta" +/// which is a List of Struct column including the row metadata. +fn scan_columns_from_row_metadata( + scan: TableScan, + catalog_options: &CatalogOptions, + row_metadata_registry: Arc, +) -> Result { + let table_ref = scan.table_name.clone().resolve( + &catalog_options.default_catalog, + &catalog_options.default_schema, + ); + + let source = row_metadata_registry.get_source(&table_ref)?; + + // [`RowMetadataSource`] returns a Struct, + // but the MV algorithm expects a list of structs at each node in the plan. + let mut exprs = scan + .projected_schema + .fields() + .iter() + .map(|f| col((None, f))) + .collect_vec(); + exprs.push(make_array(vec![col(META_COLUMN)]).alias(META_COLUMN)); + + source + .row_metadata(table_ref, &scan)? + .project(exprs)? + .alias(scan.table_name.clone())? + .filter( + scan.filters + .clone() + .into_iter() + .fold(lit(true), |a, b| a.and(b)), + )? + .build() +} + +#[cfg(test)] +mod test { + use std::{any::Any, collections::HashSet, sync::Arc}; + + use arrow::util::pretty::pretty_format_batches; + use arrow_schema::SchemaRef; + use datafusion::{ + assert_batches_eq, assert_batches_sorted_eq, + catalog::{Session, TableProvider}, + datasource::listing::ListingTableUrl, + execution::session_state::SessionStateBuilder, + prelude::{DataFrame, SessionConfig, SessionContext}, + }; + use datafusion_common::{Column, Result, ScalarValue}; + use datafusion_expr::{Expr, JoinType, LogicalPlan, TableType}; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_sql::TableReference; + use itertools::Itertools; + + use crate::materialized::{ + dependencies::pushdown_projection_inexact, + register_materialized, + row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry, RowMetadataSource}, + ListingTableLike, Materialized, + }; + + use super::file_dependencies; + + /// A mock materialized view. + #[derive(Debug)] + struct MockMaterializedView { + table_path: ListingTableUrl, + partition_columns: Vec, + query: LogicalPlan, + file_ext: &'static str, + } + + #[async_trait::async_trait] + impl TableProvider for MockMaterializedView { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::new(self.query.schema().as_arrow().clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + _projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + unimplemented!() + } + } + + impl ListingTableLike for MockMaterializedView { + fn table_paths(&self) -> Vec { + vec![self.table_path.clone()] + } + + fn partition_columns(&self) -> Vec { + self.partition_columns.clone() + } + + fn file_ext(&self) -> String { + self.file_ext.to_string() + } + } + + impl Materialized for MockMaterializedView { + fn query(&self) -> LogicalPlan { + self.query.clone() + } + } + + async fn setup() -> Result { + let _ = env_logger::builder().is_test(true).try_init(); + + register_materialized::(); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config( + SessionConfig::new() + .with_default_catalog_and_schema("datafusion", "test") + .set( + "datafusion.explain.logical_plan_only", + &ScalarValue::Boolean(Some(true)), + ) + .set( + "datafusion.sql_parser.dialect", + &ScalarValue::Utf8(Some("duckdb".into())), + ) + .set( + // See discussion in this issue: + // https://github.com/apache/datafusion/issues/13065 + "datafusion.execution.skip_physical_aggregate_schema_check", + &ScalarValue::Boolean(Some(true)), + ), + ) + .build(); + + let ctx = SessionContext::new_with_state(state); + + ctx.sql( + "CREATE TABLE t1 AS VALUES + ('2021', 3, 'A'), + ('2022', 4, 'B'), + ('2023', 5, 'C')", + ) + .await? + .collect() + .await?; + + ctx.sql( + "CREATE TABLE t2 ( + year STRING, + month STRING, + day STRING, + feed CHAR, + column2 INTEGER + ) AS VALUES + ('2023', '01', '01', 'A', 1), + ('2023', '01', '02', 'B', 2), + ('2023', '01', '03', 'C', 3), + ('2024', '12', '04', 'X', 4), + ('2024', '12', '05', 'Y', 5), + ('2024', '12', '06', 'Z', 6)", + ) + .await? + .collect() + .await?; + + ctx.sql( + "CREATE TABLE t3 ( + year STRING, + column1 INTEGER + ) AS VALUES + (2023, 1), + (2024, 2)", + ) + .await? + .collect() + .await?; + + ctx.sql( + // create a fake file metadata table to use as a mock + "CREATE TABLE file_metadata ( + table_catalog STRING, + table_schema STRING, + table_name STRING, + file_path STRING, + last_modified TIMESTAMP, + size BIGINT UNSIGNED + ) AS VALUES + ('datafusion', 'test', 't1', 's3://t1/column1=2021/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't1', 's3://t1/column1=2022/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't1', 's3://t1/column1=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet', '2023-07-11T16:29:26Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet', '2023-07-11T16:45:22Z', 0), + ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't3', 's3://t3/year=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0), + ('datafusion', 'test', 't3', 's3://t3/year=2024/data.01.parquet', '2023-07-11T16:45:44Z', 0) + " + ) + .await? + .collect() + .await?; + + let row_metadata_registry = Arc::new(RowMetadataRegistry::default()); + let t1_ref = TableReference::parse_str("t1").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + let t2_ref = TableReference::parse_str("t2").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + let t3_ref = TableReference::parse_str("t3").resolve( + &ctx.state().config_options().catalog.default_catalog, + &ctx.state().config_options().catalog.default_schema, + ); + + let metadata_table = ctx.table_provider("file_metadata").await?; + let object_store_metadata_source = Arc::new( + ObjectStoreRowMetadataSource::with_file_metadata(metadata_table), + ); + + for r in [t1_ref, t2_ref, t3_ref] { + row_metadata_registry.register_source( + &r, + Arc::clone(&object_store_metadata_source) as Arc, + ); + } + + ctx.register_udtf( + "file_dependencies", + file_dependencies( + ctx.state().catalog_list().clone(), + row_metadata_registry.clone(), + ctx.copied_config().options(), + ), + ); + + Ok(ctx) + } + + #[tokio::test] + async fn test_deps() { + struct TestCase { + name: &'static str, + query_to_analyze: &'static str, + table_name: &'static str, + table_path: ListingTableUrl, + partition_cols: Vec<&'static str>, + file_extension: &'static str, + expected_output: Vec<&'static str>, + file_metadata: &'static str, + } + + let cases = &[ + TestCase { + name: "un-transformed partition column", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + table_name: "m1", + table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + partition_cols: vec!["partition_column"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ], + // second file is old + file_metadata: " + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "transform year/month/day partition into timestamp partition", + query_to_analyze: " + SELECT DISTINCT + to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp, + feed + FROM t2", + table_name: "m2", + table_path: ListingTableUrl::parse("s3://m2/").unwrap(), + partition_cols: vec!["timestamp", "feed"], + file_extension: ".parquet", + expected_output: vec![ + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-01T00:00:00/feed=A/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-02T00:00:00/feed=B/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-03T00:00:00/feed=C/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-04T00:00:00/feed=X/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-05T00:00:00/feed=Y/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-06T00:00:00/feed=Z/data.01.parquet', '2023-07-10T16:00:00Z', 0) + ", + }, + TestCase { + name: "materialized view has no partitions", + query_to_analyze: "SELECT column1 AS output FROM t3", + table_name: "m3", + table_path: ListingTableUrl::parse("s3://m3/").unwrap(), + partition_cols: vec![], + file_extension: ".parquet", + expected_output: vec![ + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm3', 's3://m3/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "simple equijoin on year", + query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "triangular join on year", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + INNER JOIN t3 + ON (t2.year <= t3.year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + TestCase { + name: "triangular left join, strict <", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + LEFT JOIN t3 + ON (t2.year < t3.year)", + table_name: "m4", + table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + partition_cols: vec!["year"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + }, + ]; + + async fn run_test(case: &TestCase) -> Result<()> { + let context = setup().await.unwrap(); + + let plan = context + .sql(case.query_to_analyze) + .await? + .into_optimized_plan()?; + + println!("original plan: \n{}", plan.display_indent()); + + let partition_col_indices = plan + .schema() + .columns() + .into_iter() + .enumerate() + .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i)) + .collect(); + println!("indices: {:?}", partition_col_indices); + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!( + "inexact projection pushdown:\n{}", + analyzed.display_indent() + ); + + context + .register_table( + case.table_name, + Arc::new(MockMaterializedView { + table_path: case.table_path.clone(), + partition_columns: case + .partition_cols + .iter() + .map(|s| s.to_string()) + .collect(), + query: plan, + file_ext: case.file_extension, + }), + ) + .expect("couldn't register materialized view"); + + context + .sql(&format!( + "INSERT INTO file_metadata VALUES {}", + case.file_metadata, + )) + .await? + .collect() + .await?; + + let df = context + .sql(&format!( + "SELECT * FROM file_dependencies('{}', 'v2')", + case.table_name, + )) + .await + .map_err(|e| e.context("get file dependencies"))?; + df.clone().explain(false, false)?.show().await?; + df.clone().show().await?; + + assert_batches_sorted_eq!(case.expected_output, &df.collect().await?); + + Ok(()) + } + + for case in cases { + run_test(case) + .await + .unwrap_or_else(|e| panic!("{} failed: {e}", case.name)); + } + } + + #[tokio::test] + async fn test_projection_pushdown_inexact() -> Result<()> { + struct TestCase { + name: &'static str, + query_to_analyze: &'static str, + projection: &'static [&'static str], + expected_plan: Vec<&'static str>, + expected_output: Vec<&'static str>, + } + + let cases = &[ + TestCase { + name: "simple projection", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + projection: &["partition_column"], + expected_plan: vec![ + "+--------------+--------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------+", + "| logical_plan | Projection: t1.column1 AS partition_column |", + "| | TableScan: t1 projection=[column1] |", + "+--------------+--------------------------------------------+", + ], + expected_output: vec![ + "+------------------+", + "| partition_column |", + "+------------------+", + "| 2021 |", + "| 2022 |", + "| 2023 |", + "+------------------+", + ], + }, + TestCase { + name: "compound expressions", + query_to_analyze: " + SELECT DISTINCT + to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp, + feed + FROM t2", + projection: &["timestamp", "feed"], + expected_plan: vec![ + "+--------------+-------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: to_timestamp_nanos(concat_ws(Utf8(\"-\"), t2.year, t2.month, t2.day)) AS timestamp, t2.feed |", + "| | TableScan: t2 projection=[year, month, day, feed] |", + "+--------------+-------------------------------------------------------------------------------------------------------+", + ] + , + expected_output: vec![ + "+---------------------+------+", + "| timestamp | feed |", + "+---------------------+------+", + "| 2023-01-01T00:00:00 | A |", + "| 2023-01-02T00:00:00 | B |", + "| 2023-01-03T00:00:00 | C |", + "| 2024-12-04T00:00:00 | X |", + "| 2024-12-05T00:00:00 | Y |", + "| 2024-12-06T00:00:00 | Z |", + "+---------------------+------+", + ], + }, + TestCase { + name: "empty projection", + query_to_analyze: "SELECT column1 AS output FROM t3", + projection: &[], + expected_plan: vec![ + "+--------------+-----------------------------+", + "| plan_type | plan |", + "+--------------+-----------------------------+", + "| logical_plan | TableScan: t3 projection=[] |", + "+--------------+-----------------------------+", + ], + expected_output: vec![ + "++", + "++", + "++", + ], + }, + TestCase { + name: "simple equijoin on year", + query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", + projection: &["year"], + expected_plan: vec![ + "+--------------+-------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------+", + "| logical_plan | Projection: t2.year |", + "| | Inner Join: t2.year = t3.year |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+-------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + }, + TestCase { + name: "triangular join on year", + query_to_analyze: " + SELECT + t2.*, + t3.* EXCLUDE(year), + t3.year AS \"t3.year\" + FROM t2 + INNER JOIN t3 + ON (t2.year <= t3.year)", + projection: &["year"], + expected_plan: vec![ + "+--------------+-------------------------------------------+", + "| plan_type | plan |", + "+--------------+-------------------------------------------+", + "| logical_plan | Projection: t2.year |", + "| | Inner Join: Filter: t2.year <= t3.year |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+-------------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + }, + TestCase { + name: "window & unnest", + query_to_analyze: " + SELECT + \"unnest_placeholder(date).year\" AS year, + \"unnest_placeholder(date).month\" AS month, + \"unnest_placeholder(date).day\" AS day, + arr + FROM ( + SELECT + unnest(date), + unnest(arr) AS arr + FROM ( + SELECT + named_struct('year', year, 'month', month, 'day', day) AS date, + array_agg(column2) + OVER (ORDER BY year, month, day) + AS arr + FROM t2 + ) + )", + projection: &["year", "month", "day"], + expected_plan: vec![ + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | Projection: unnest_placeholder(date).year AS year, unnest_placeholder(date).month AS month, unnest_placeholder(date).day AS day |", + "| | Unnest: lists[] structs[unnest_placeholder(date)] |", + "| | Projection: named_struct(Utf8(\"year\"), t2.year, Utf8(\"month\"), t2.month, Utf8(\"day\"), t2.day) AS unnest_placeholder(date) |", + "| | TableScan: t2 projection=[year, month, day] |", + "+--------------+---------------------------------------------------------------------------------------------------------------------------------+", + ], + expected_output: vec![ + "+------+-------+-----+", + "| year | month | day |", + "+------+-------+-----+", + "| 2023 | 01 | 01 |", + "| 2023 | 01 | 02 |", + "| 2023 | 01 | 03 |", + "| 2024 | 12 | 04 |", + "| 2024 | 12 | 05 |", + "| 2024 | 12 | 06 |", + "+------+-------+-----+", + ], + }, + TestCase { + name: "outer join + union", + query_to_analyze: " + SELECT + COALESCE(t1.year, t2.year) AS year, + t1.column2 + FROM (SELECT column1 AS year, column2 FROM t1) t1 + FULL OUTER JOIN (SELECT year, column2 FROM t2) t2 + USING (year) + UNION ALL + SELECT year, column1 AS column2 FROM t3 + ", + projection: &["year"], + expected_plan: vec![ + "+--------------+--------------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------------+", + "| logical_plan | Union |", + "| | Projection: coalesce(t1.year, t2.year) AS year |", + "| | Full Join: Using t1.year = t2.year |", + "| | SubqueryAlias: t1 |", + "| | Projection: t1.column1 AS year |", + "| | TableScan: t1 projection=[column1] |", + "| | SubqueryAlias: t2 |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+--------------------------------------------------+", + ], + expected_output: vec![ + "+------+", + "| year |", + "+------+", + "| 2021 |", + "| 2022 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2023 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "| 2024 |", + "+------+", + ], + } + ]; + + async fn run_test(case: &TestCase) -> Result<()> { + let context = setup().await?; + + let df = context.sql(case.query_to_analyze).await?; + df.clone().explain(false, false)?.show().await?; + + let plan = df.clone().into_optimized_plan()?; + + let indices = case + .projection + .iter() + .map(|&name| { + plan.schema() + .index_of_column(&Column::new_unqualified(name)) + }) + .collect::>>()?; + + let analyzed = DataFrame::new( + context.state(), + pushdown_projection_inexact(plan.clone(), &indices)?, + ); + analyzed.clone().explain(false, false)?.show().await?; + + // Check the following property of pushdown_projection_inexact: + // if A' = pushdown_projection_inexact(A, P), where P is the projection, + // then PA ⊆ A'. + if !case.projection.is_empty() { + let select_original = df + .clone() + .select( + case.projection + .iter() + .map(|&name| Expr::Column(Column::new_unqualified(name))) + .collect_vec(), + ) + .map_err(|e| e.context("select projection from original plan"))? + .distinct()?; + + let excess = analyzed + .clone() + .distinct()? + .join( + select_original.clone(), + JoinType::RightAnti, + case.projection, + case.projection, + None, + ) + .map_err(|e| e.context("join in subset inclusion test"))?; + + assert_eq!( + excess + .clone() + .count() + .await + .map_err(|e| e.context("execute subset inclusion test"))?, + 0, + "unexpected extra rows in transformed query:\n{} + original:\n{} + inexact pushdown:\n{} + ", + pretty_format_batches(&excess.collect().await?)?, + pretty_format_batches(&select_original.collect().await?)?, + pretty_format_batches(&analyzed.clone().distinct()?.collect().await?)? + ); + } + + assert_batches_eq!( + case.expected_plan, + &analyzed.clone().explain(false, false)?.collect().await? + ); + assert_batches_sorted_eq!(case.expected_output, &analyzed.collect().await?); + + Ok(()) + } + + for case in cases { + run_test(case) + .await + .unwrap_or_else(|e| panic!("{} failed: {e}", case.name)); + } + + Ok(()) + } +} diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 0e79e5d..fd57cc0 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, Session}; use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::{provider_as_source, TableProvider}; +use datafusion::datasource::TableProvider; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties}; use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; @@ -47,9 +47,6 @@ use std::any::Any; use std::sync::Arc; use crate::materialized::cast_to_listing_table; -use crate::materialized::{hive_partition::hive_partition, META_COLUMN}; - -use super::row_metadata::RowMetadataSource; /// A virtual file metadata table, inspired by the information schema column table. #[derive(Debug, Clone)] @@ -126,79 +123,6 @@ impl TableProvider for FileMetadata { } } -impl RowMetadataSource for FileMetadata { - fn name(&self) -> &str { - "FileMetadata" - } - - /// Scan for partition column values using object store metadata. - /// This allows us to efficiently scan for distinct partition column values without - /// ever reading from a table directly, which is useful for low-overhead - /// incremental view maintenance. - fn row_metadata( - self: Arc, - table: datafusion_sql::ResolvedTableReference, - scan: &datafusion_expr::TableScan, - ) -> Result { - use datafusion::datasource::source_as_provider; - use datafusion::functions::core::expr_fn::named_struct; - use datafusion::prelude::*; - - // Check that the remaining columns in the source table scans are indeed partition columns - let partition_cols = cast_to_listing_table(source_as_provider(&scan.source)?.as_ref()) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Table '{}' was not registered in TableTypeRegistry", - scan.table_name - )) - })? - .partition_columns(); - - for column in scan.projected_schema.columns() { - if !partition_cols.contains(&column.name) { - return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name))); - } - } - - let fields = scan.projected_schema.fields(); - - let row_metadata_expr = make_array(vec![named_struct(vec![ - lit("table_catalog"), - col("table_catalog"), - lit("table_schema"), - col("table_schema"), - lit("table_name"), - col("table_name"), - lit("source_uri"), // Map file_path to source_uri - col("file_path"), - lit("last_modified"), - col("last_modified"), - ])]) - .alias(META_COLUMN); - - datafusion_expr::LogicalPlanBuilder::scan("file_metadata", provider_as_source(self), None)? - .filter( - col("table_catalog") - .eq(lit(table.catalog.as_ref())) - .and(col("table_schema").eq(lit(table.schema.as_ref()))) - .and(col("table_name").eq(lit(table.table.as_ref()))), - )? - .project( - fields - .iter() - .map(|field| { - // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name - cast( - hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]), - field.data_type().clone(), - ) - .alias(field.name()) - }) - .chain(Some(row_metadata_expr)), - ) - } -} - /// An [`ExecutionPlan`] that scans object store metadata. pub struct FileMetadataExec { table_schema: SchemaRef, diff --git a/src/materialized/hive_partition.rs b/src/materialized/hive_partition.rs index 34bb7ae..075750e 100644 --- a/src/materialized/hive_partition.rs +++ b/src/materialized/hive_partition.rs @@ -53,7 +53,7 @@ pub static HIVE_PARTITION_UDF_NAME: &str = "hive_partition"; /// SELECT /// column_name, /// hive_partition( -/// 's3://atlas/sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet', +/// 's3://sip/trades/year=2006/month=01/day=02/trades-2006-01-02.parquet', /// column_name /// ) AS partition_value /// FROM (VALUES ('year'), ('month'), ('day')) AS partition_columns (column_name); diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index d0df6e0..445e34b 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -16,10 +16,13 @@ // under the License. use dashmap::DashMap; +use datafusion::catalog::TableProvider; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{LogicalPlanBuilder, TableScan}; use datafusion_sql::ResolvedTableReference; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; + +use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_COLUMN}; /// Registry that manages metadata sources for different tables. /// Provides a centralized way to register and retrieve metadata sources @@ -29,6 +32,21 @@ pub struct RowMetadataRegistry { metadata_sources: DashMap>, } +impl std::fmt::Debug for RowMetadataRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RowMetadataRegistry") + .field( + "metadata_sources", + &self + .metadata_sources + .iter() + .map(|r| (r.key().clone(), r.value().name().to_string())) + .collect::>(), + ) + .finish() + } +} + impl RowMetadataRegistry { /// Registers a metadata source for a specific table. /// Returns the previously registered source for this table, if any @@ -81,8 +99,112 @@ pub trait RowMetadataSource: Send + Sync { /// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least /// one row (but potentially more) with the same values, plus the `__meta` column. fn row_metadata( - self: Arc, + &self, table: ResolvedTableReference, scan: &TableScan, ) -> Result; } + +/// A [`RowMetadataSource`] that uses an object storage API to retrieve +/// partition columns and timestamp metadata. +/// +/// Object store metadata by default comes from [`FileMetadata`], but +/// may be overrided with a custom [`TableSource`] using +/// [`Self::with_file_metadata`]. +#[derive(Debug, Clone)] +pub struct ObjectStoreRowMetadataSource { + file_metadata: Arc, +} + +impl ObjectStoreRowMetadataSource { + /// Create a new [`ObjectStoreRowMetadataSource`] from the [`FileMetadata`] table + pub fn new(file_metadata: Arc) -> Self { + Self::with_file_metadata(file_metadata) + } + + /// Create a new [`ObjectStoreRowMetadataSource`] using a custom file metadata source + pub fn with_file_metadata(file_metadata: Arc) -> Self { + Self { file_metadata } + } +} + +impl RowMetadataSource for ObjectStoreRowMetadataSource { + fn name(&self) -> &str { + "ObjectStoreRowMetadataSource" + } + + /// Scan for partition column values using object store metadata. + /// This allows us to efficiently scan for distinct partition column values without + /// ever reading from a table directly, which is useful for low-overhead + /// incremental view maintenance. + fn row_metadata( + &self, + table: datafusion_sql::ResolvedTableReference, + scan: &datafusion_expr::TableScan, + ) -> Result { + use datafusion::{datasource::provider_as_source, prelude::*}; + + // Disable this check in tests + #[cfg(not(test))] + { + // Check that the remaining columns in the source table scans are indeed partition columns + let partition_cols = super::cast_to_listing_table( + datafusion::datasource::source_as_provider(&scan.source)?.as_ref(), + ) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Table '{}' was not registered in TableTypeRegistry", + scan.table_name + )) + })? + .partition_columns(); + + for column in scan.projected_schema.columns() { + if !partition_cols.contains(&column.name) { + return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name))); + } + } + } + + let fields = scan.projected_schema.fields(); + + let row_metadata_expr = named_struct(vec![ + lit("table_catalog"), + col("table_catalog"), + lit("table_schema"), + col("table_schema"), + lit("table_name"), + col("table_name"), + lit("source_uri"), // Map file_path to source_uri + col("file_path"), + lit("last_modified"), + col("last_modified"), + ]) + .alias(META_COLUMN); + + datafusion_expr::LogicalPlanBuilder::scan( + "file_metadata", + provider_as_source(Arc::clone(&self.file_metadata)), + None, + )? + .filter( + col("table_catalog") + .eq(lit(table.catalog.as_ref())) + .and(col("table_schema").eq(lit(table.schema.as_ref()))) + .and(col("table_name").eq(lit(table.table.as_ref()))), + )? + .project( + fields + .iter() + .map(|field| { + // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name + cast( + hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]), + field.data_type().clone(), + ) + .alias(field.name()) + }) + .chain(Some(row_metadata_expr)), + ) + } +} diff --git a/src/materialized/util.rs b/src/materialized/util.rs new file mode 100644 index 0000000..7466d88 --- /dev/null +++ b/src/materialized/util.rs @@ -0,0 +1,24 @@ +use std::sync::Arc; + +use datafusion::catalog::{CatalogProviderList, TableProvider}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_sql::ResolvedTableReference; + +pub fn get_table( + catalog_list: &dyn CatalogProviderList, + table_ref: &ResolvedTableReference, +) -> Result> { + let catalog = catalog_list + .catalog(table_ref.catalog.as_ref()) + .ok_or_else(|| DataFusionError::Plan(format!("no such catalog {}", table_ref.catalog)))?; + + let schema = catalog + .schema(table_ref.schema.as_ref()) + .ok_or_else(|| DataFusionError::Plan(format!("no such schema {}", table_ref.schema)))?; + + // NOTE: this is bad, we are calling async code in a sync context. + // We should file an issue about async in UDTFs. + futures::executor::block_on(schema.table(table_ref.table.as_ref())) + .map_err(|e| e.context(format!("couldn't get table '{}'", table_ref.table)))? + .ok_or_else(|| DataFusionError::Plan(format!("no such table {}", table_ref.schema))) +}